フローとは、直接の呼び出しをラップし、いくつかの特性を追加した関数です。この関数には、厳密な型付け、ストリーミング可能、ローカルとリモートで呼び出し可能、完全なオブザーバビリティという特性があります。Firebase Genkit には、フローを実行してデバッグするための CLI ツールとデベロッパー UI ツールが用意されています。
フローを定義する
最も単純な形式のフローは、関数をラップしただけのものです。
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
suggestion := makeMenuItemSuggestion(restaurantTheme)
return suggestion, nil
})
これにより、Genkit CLI とデベロッパー UI からこの関数を実行できます。またこれは、デプロイとオブザーバビリティなど、Genkit の多くの機能の要件でもあります。
Genkit フローがモデル API を直接呼び出すよりも優れている点は、入力と出力の両方の型安全性が確保される点です。フローの引数と結果の型には、単純な値または構造化された値が許されます。Genkit は、invopop/jsonschema
を使用してこれらの値の JSON スキーマを生成します。
次のフローは、string
を入力として受け取り、struct
を出力します。
type MenuSuggestion struct {
ItemName string `json:"item_name"`
Description string `json:"description"`
Calories int `json:"calories"`
}
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
return suggestion, nil
},
)
フローを実行する
コードでフローを実行するには:
suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")
CLI を使用してフローも実行できます。
genkit flow:run menuSuggestionFlow '"French"'
ストリーミング
値をストリーミングできるフローのサンプルを次に示します。
// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string
menuSuggestionFlow := genkit.DefineStreamingFlow(
"menuSuggestionFlow",
func(
ctx context.Context,
restaurantTheme InputType,
callback func(context.Context, StreamType) error,
) (OutputType, error) {
var menu strings.Builder
menuChunks := make(chan StreamType)
go makeFullMenuSuggestion(restaurantTheme, menuChunks)
for {
chunk, ok := <-menuChunks
if !ok {
break
}
if callback != nil {
callback(context.Background(), chunk)
}
menu.WriteString(string(chunk))
}
return OutputType(menu.String()), nil
},
)
ストリーミング コールバックは未定義にできることに注意してください。呼び出し元のクライアントがストリーミング レスポンスをリクエストしている場合にのみ定義されます。
ストリーミング モードでフローを呼び出すには:
menuSuggestionFlow.Stream(
context.Background(),
"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
if err != nil {
// handle err
return false
}
if !sfv.Done {
fmt.Print(sfv.Stream)
return true
} else {
fmt.Print(sfv.Output)
return false
}
})
フローがストリーミングを実装していない場合、StreamFlow()
は RunFlow()
と同じ動作をします。
CLI を使用してフローをストリーミングすることもできます。
genkit flow:run menuSuggestionFlow '"French"' -s
フローをデプロイする
HTTP 経由でフローへのアクセスを可能にするには、まずフローをデプロイする必要があります。Cloud Run などのサービスを使用してフローをデプロイするには、フローを定義してから Init()
を呼び出します。
func main() {
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
// ...
return "", nil
},
)
if err := genkit.Init(context.Background(), nil); err != nil {
log.Fatal(err)
}
}
Init
は、フローを HTTP エンドポイント(http://localhost:3400/menuSuggestionFlow
など)として公開する net/http
サーバーを起動します。
2 番目のパラメータは省略可能な Options
で、次のことを指定します。
FlowAddr
: リッスンするアドレスとポート。指定しない場合、サーバーは PORT 環境変数で指定されたポートをリッスンします。空の場合は、デフォルトのポート 3400 を使用します。Flows
: 提供するフロー。指定しない場合、Init
は定義済みのすべてのフローを提供します。
他のエンドポイントと同じホストとポートでフローを提供する場合は、FlowAddr
を -
に設定し、代わりに NewFlowServeMux()
を呼び出して Genkit フローのハンドラを取得します。このハンドラは、他のルートハンドラと多重化できます。
mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))
フローのオブザーバビリティ
オブザーバビリティ用にインストルメント化されていないサードパーティ SDK を使用する場合、デベロッパー UI で個別のトレースステップとして表示することをおすすめします。このためには、コードを run
関数でラップするだけです。
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
// ...
return "", nil
})
// ...
return themes, err
})