Fluxos

Os fluxos são funções encapsuladas com algumas características adicionais em relação às chamadas diretas: são fortemente tipados, passíveis de fluxo, podem ser chamados local e remotamente e totalmente observáveis. O Firebase Genkit fornece ferramentas de CLI e interface do desenvolvedor para executar e depurar fluxos.

Como definir fluxos

Na forma mais simples, um fluxo envolve apenas uma função:

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		suggestion := makeMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	})

Isso permite executar a função a partir da CLI do Genkit e da IU do desenvolvedor e é um requisito para muitos dos recursos do Genkit, incluindo implantação e observabilidade.

Uma vantagem importante que os fluxos do Genkit têm em comparação com a chamada direta a uma API de modelo é segurança de tipo de entradas e saídas. Os tipos de argumento e resultado de um fluxo podem ser valores simples ou estruturados. O Genkit produzirá esquemas JSON para esses valores usando invopop/jsonschema.

O fluxo a seguir usa um string como entrada e gera um struct:

type MenuSuggestion struct {
	ItemName    string `json:"item_name"`
	Description string `json:"description"`
	Calories    int    `json:"calories"`
}

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
		suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	},
)

Fluxos em execução

Para executar um fluxo no seu código:

suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")

Também é possível usar a CLI para executar fluxos:

genkit flow:run menuSuggestionFlow '"French"'

Transmitido

Confira um exemplo simples de fluxo que pode transmitir valores:

// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string

menuSuggestionFlow := genkit.DefineStreamingFlow(
	"menuSuggestionFlow",
	func(
		ctx context.Context,
		restaurantTheme InputType,
		callback func(context.Context, StreamType) error,
	) (OutputType, error) {
		var menu strings.Builder
		menuChunks := make(chan StreamType)
		go makeFullMenuSuggestion(restaurantTheme, menuChunks)
		for {
			chunk, ok := <-menuChunks
			if !ok {
				break
			}
			if callback != nil {
				callback(context.Background(), chunk)
			}
			menu.WriteString(string(chunk))
		}
		return OutputType(menu.String()), nil
	},
)

O retorno de chamada de streaming pode ser indefinido. Só é definido se o cliente invocador estiver solicitando uma resposta transmitida.

Para invocar um fluxo no modo de streaming:

menuSuggestionFlow.Stream(
	context.Background(),
	"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
	if err != nil {
		// handle err
		return false
	}
	if !sfv.Done {
		fmt.Print(sfv.Stream)
		return true
	} else {
		fmt.Print(sfv.Output)
		return false
	}
})

Se o fluxo não implementar o streaming, o StreamFlow() vai se comportar de forma idêntica a RunFlow().

Também é possível usar a CLI para transmitir fluxos:

genkit flow:run menuSuggestionFlow '"French"' -s

Como implantar fluxos

Se quiser acessar seu fluxo por HTTP, você precisará implantá-lo primeiro. Para implantar fluxos usando o Cloud Run e serviços semelhantes, defina seus fluxos e, em seguida, chame Init():

func main() {
	genkit.DefineFlow(
		"menuSuggestionFlow",
		func(ctx context.Context, restaurantTheme string) (string, error) {
			// ...
			return "", nil
		},
	)
	if err := genkit.Init(context.Background(), nil); err != nil {
		log.Fatal(err)
	}
}

Init inicia um servidor net/http que expõe seus fluxos como endpoints HTTP (por exemplo, http://localhost:3400/menuSuggestionFlow).

O segundo parâmetro é um Options opcional que especifica o seguinte:

  • FlowAddr: endereço e porta a serem detectados. Se não for especificado, o servidor escuta na porta especificada pela variável de ambiente PORT. Se estiver vazio, ele usa o padrão da porta 3400.
  • Flows: quais fluxos serão exibidos. Se não for especificado, Init exibirá todo os fluxos definidos.

Para disponibilizar fluxos no mesmo host e porta que outros endpoints, você pode definir FlowAddr como - e chamar NewFlowServeMux() para receber um gerenciador para seus fluxos do Genkit, que você pode multiplexar com seus outros gerenciadores de rota:

mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))

Observabilidade do fluxo

Às vezes, ao usar SDKs de terceiros que não são instrumentados para observabilidade, convém vê-los como uma etapa de rastreamento separada na interface do desenvolvedor. Você só precisa unir o código na função run.

genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
			// ...
			return "", nil
		})

		// ...
		return themes, err
	})