フロー

フローとは、直接の呼び出しをラップし、いくつかの特性を追加した関数です。この関数には、厳密な型付け、ストリーミング可能、ローカルとリモートで呼び出し可能、完全なオブザーバビリティという特性があります。Firebase Genkit には、フローを実行してデバッグするための CLI ツールとデベロッパー UI ツールが用意されています。

フローを定義する

最も単純な形式のフローは、関数をラップしただけのものです。

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		suggestion := makeMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	})

これにより、Genkit CLI とデベロッパー UI からこの関数を実行できます。またこれは、デプロイとオブザーバビリティなど、Genkit の多くの機能の要件でもあります。

Genkit フローがモデル API を直接呼び出すよりも優れている点は、入力と出力の両方の型安全性が確保される点です。フローの引数と結果の型には、単純な値または構造化された値が許されます。Genkit は、invopop/jsonschema を使用してこれらの値の JSON スキーマを生成します。

次のフローは、string を入力として受け取り、struct を出力します。

type MenuSuggestion struct {
	ItemName    string `json:"item_name"`
	Description string `json:"description"`
	Calories    int    `json:"calories"`
}

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
		suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	},
)

フローを実行する

コードでフローを実行するには:

suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")

CLI を使用してフローも実行できます。

genkit flow:run menuSuggestionFlow '"French"'

ストリーミング

値をストリーミングできるフローのサンプルを次に示します。

// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string

menuSuggestionFlow := genkit.DefineStreamingFlow(
	"menuSuggestionFlow",
	func(
		ctx context.Context,
		restaurantTheme InputType,
		callback func(context.Context, StreamType) error,
	) (OutputType, error) {
		var menu strings.Builder
		menuChunks := make(chan StreamType)
		go makeFullMenuSuggestion(restaurantTheme, menuChunks)
		for {
			chunk, ok := <-menuChunks
			if !ok {
				break
			}
			if callback != nil {
				callback(context.Background(), chunk)
			}
			menu.WriteString(string(chunk))
		}
		return OutputType(menu.String()), nil
	},
)

ストリーミング コールバックは未定義にできることに注意してください。呼び出し元のクライアントがストリーミング レスポンスをリクエストしている場合にのみ定義されます。

ストリーミング モードでフローを呼び出すには:

menuSuggestionFlow.Stream(
	context.Background(),
	"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
	if err != nil {
		// handle err
		return false
	}
	if !sfv.Done {
		fmt.Print(sfv.Stream)
		return true
	} else {
		fmt.Print(sfv.Output)
		return false
	}
})

フローがストリーミングを実装していない場合、StreamFlow()RunFlow() と同じ動作をします。

CLI を使用してフローをストリーミングすることもできます。

genkit flow:run menuSuggestionFlow '"French"' -s

フローをデプロイする

HTTP 経由でフローへのアクセスを可能にするには、まずフローをデプロイする必要があります。Cloud Run などのサービスを使用してフローをデプロイするには、フローを定義してから Init() を呼び出します。

func main() {
	genkit.DefineFlow(
		"menuSuggestionFlow",
		func(ctx context.Context, restaurantTheme string) (string, error) {
			// ...
			return "", nil
		},
	)
	if err := genkit.Init(context.Background(), nil); err != nil {
		log.Fatal(err)
	}
}

Init は、フローを HTTP エンドポイント(http://localhost:3400/menuSuggestionFlow など)として公開する net/http サーバーを起動します。

2 番目のパラメータは省略可能な Options で、次のことを指定します。

  • FlowAddr: リッスンするアドレスとポート。指定しない場合、サーバーは PORT 環境変数で指定されたポートをリッスンします。空の場合は、デフォルトのポート 3400 を使用します。
  • Flows: 提供するフロー。指定しない場合、Init は定義済みのすべてのフローを提供します。

他のエンドポイントと同じホストとポートでフローを提供する場合は、FlowAddr- に設定し、代わりに NewFlowServeMux() を呼び出して Genkit フローのハンドラを取得します。このハンドラは、他のルートハンドラと多重化できます。

mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))

フローのオブザーバビリティ

オブザーバビリティ用にインストルメント化されていないサードパーティ SDK を使用する場合、デベロッパー UI で個別のトレースステップとして表示することをおすすめします。このためには、コードを run 関数でラップするだけです。

genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
			// ...
			return "", nil
		})

		// ...
		return themes, err
	})