Потоки — это обернутые функции с некоторыми дополнительными характеристиками по сравнению с прямыми вызовами: они строго типизированы, потокоемки, локально и удаленно вызываются и полностью наблюдаемы. Firebase Genkit предоставляет инструменты CLI и пользовательского интерфейса разработчика для запуска и отладки потоков.
Определение потоков
В своей простейшей форме поток просто оборачивает функцию:
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
suggestion := makeMenuItemSuggestion(restaurantTheme)
return suggestion, nil
})
Это позволит вам запускать функцию из интерфейса командной строки Genkit и пользовательского интерфейса разработчика, а также является обязательным требованием для многих функций Genkit, включая развертывание и возможность наблюдения.
Важным преимуществом потоков Genkit перед прямым вызовом API модели является безопасность типов как входных, так и выходных данных. Типы аргументов и результатов потока могут быть простыми или структурированными значениями. Genkit создаст схемы JSON для этих значений, используя invopop/jsonschema
.
Следующий поток принимает string
в качестве входных данных и выводит struct
:
type MenuSuggestion struct {
ItemName string `json:"item_name"`
Description string `json:"description"`
Calories int `json:"calories"`
}
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
return suggestion, nil
},
)
Запуск потоков
Чтобы запустить поток в вашем коде:
suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")
Вы также можете использовать CLI для запуска потоков:
genkit flow:run menuSuggestionFlow '"French"'
Потоковое
Вот простой пример потока, который может передавать значения:
// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string
menuSuggestionFlow := genkit.DefineStreamingFlow(
"menuSuggestionFlow",
func(
ctx context.Context,
restaurantTheme InputType,
callback func(context.Context, StreamType) error,
) (OutputType, error) {
var menu strings.Builder
menuChunks := make(chan StreamType)
go makeFullMenuSuggestion(restaurantTheme, menuChunks)
for {
chunk, ok := <-menuChunks
if !ok {
break
}
if callback != nil {
callback(context.Background(), chunk)
}
menu.WriteString(string(chunk))
}
return OutputType(menu.String()), nil
},
)
Обратите внимание, что обратный вызов потоковой передачи может быть неопределенным. Он определяется только в том случае, если вызывающий клиент запрашивает потоковый ответ.
Чтобы вызвать поток в потоковом режиме:
menuSuggestionFlow.Stream(
context.Background(),
"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
if err != nil {
// handle err
return false
}
if !sfv.Done {
fmt.Print(sfv.Stream)
return true
} else {
fmt.Print(sfv.Output)
return false
}
})
Если поток не реализует потоковую передачу, StreamFlow()
ведет себя идентично RunFlow()
.
Вы также можете использовать CLI для потоковой передачи потоков:
genkit flow:run menuSuggestionFlow '"French"' -s
Развертывание потоков
Если вы хотите иметь доступ к своему потоку через HTTP, вам необходимо сначала его развернуть. Чтобы развернуть потоки с помощью Cloud Run и подобных сервисов, определите свои потоки, а затем вызовите Init()
:
func main() {
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
// ...
return "", nil
},
)
if err := genkit.Init(context.Background(), nil); err != nil {
log.Fatal(err)
}
}
Init
запускает сервер net/http
, который предоставляет ваши потоки как конечные точки HTTP (например, http://localhost:3400/menuSuggestionFlow
).
Второй параметр — это необязательные Options
, которые определяют следующее:
-
FlowAddr
: адрес и порт для прослушивания. Если не указано, сервер прослушивает порт, указанный в переменной среды PORT; если он пуст, используется порт 3400 по умолчанию. -
Flows
: какие потоки обслуживать. Если не указано,Init
обслуживает все определенные вами потоки.
Если вы хотите обслуживать потоки на том же хосте и порту, что и другие конечные точки, вы можете установить для FlowAddr
значение -
и вместо этого вызвать NewFlowServeMux()
, чтобы получить обработчик для ваших потоков Genkit, который вы можете мультиплексировать с другими обработчиками маршрутов:
mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))
Наблюдаемость потока
Иногда при использовании сторонних SDK, которые не поддерживают возможность наблюдения, вам может потребоваться увидеть их как отдельный этап трассировки в пользовательском интерфейсе разработчика. Все, что вам нужно сделать, это обернуть код в функцию run
.
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
// ...
return "", nil
})
// ...
return themes, err
})