Os fluxos são funções encapsuladas com algumas características adicionais em relação às chamadas diretas: são fortemente tipados, passíveis de fluxo, podem ser chamados local e remotamente e totalmente observáveis. O Firebase Genkit fornece ferramentas de CLI e interface do desenvolvedor para executar e depurar fluxos.
Como definir fluxos
Na forma mais simples, um fluxo envolve apenas uma função:
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
suggestion := makeMenuItemSuggestion(restaurantTheme)
return suggestion, nil
})
Isso permite executar a função a partir da CLI do Genkit e da IU do desenvolvedor e é um requisito para muitos dos recursos do Genkit, incluindo implantação e observabilidade.
Uma vantagem importante que os fluxos do Genkit têm em comparação com a chamada direta a uma API de modelo é
segurança de tipo de entradas e saídas.
Os tipos de argumento e resultado de um fluxo podem ser valores simples ou estruturados.
O Genkit produzirá esquemas JSON para esses valores usando
invopop/jsonschema
.
O fluxo a seguir usa um string
como entrada e gera um struct
:
type MenuSuggestion struct {
ItemName string `json:"item_name"`
Description string `json:"description"`
Calories int `json:"calories"`
}
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
return suggestion, nil
},
)
Fluxos em execução
Para executar um fluxo no seu código:
suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")
Também é possível usar a CLI para executar fluxos:
genkit flow:run menuSuggestionFlow '"French"'
Transmitido
Confira um exemplo simples de fluxo que pode transmitir valores:
// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string
menuSuggestionFlow := genkit.DefineStreamingFlow(
"menuSuggestionFlow",
func(
ctx context.Context,
restaurantTheme InputType,
callback func(context.Context, StreamType) error,
) (OutputType, error) {
var menu strings.Builder
menuChunks := make(chan StreamType)
go makeFullMenuSuggestion(restaurantTheme, menuChunks)
for {
chunk, ok := <-menuChunks
if !ok {
break
}
if callback != nil {
callback(context.Background(), chunk)
}
menu.WriteString(string(chunk))
}
return OutputType(menu.String()), nil
},
)
O retorno de chamada de streaming pode ser indefinido. Só é definido se o cliente invocador estiver solicitando uma resposta transmitida.
Para invocar um fluxo no modo de streaming:
menuSuggestionFlow.Stream(
context.Background(),
"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
if err != nil {
// handle err
return false
}
if !sfv.Done {
fmt.Print(sfv.Stream)
return true
} else {
fmt.Print(sfv.Output)
return false
}
})
Se o fluxo não implementar o streaming, o StreamFlow()
vai se comportar de forma idêntica a
RunFlow()
.
Também é possível usar a CLI para transmitir fluxos:
genkit flow:run menuSuggestionFlow '"French"' -s
Como implantar fluxos
Se quiser acessar seu fluxo por HTTP, você precisará implantá-lo primeiro.
Para implantar fluxos usando o Cloud Run e serviços semelhantes, defina seus fluxos e,
em seguida, chame Init()
:
func main() {
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
// ...
return "", nil
},
)
if err := genkit.Init(context.Background(), nil); err != nil {
log.Fatal(err)
}
}
Init
inicia um servidor net/http
que expõe seus fluxos como endpoints HTTP
(por exemplo, http://localhost:3400/menuSuggestionFlow
).
O segundo parâmetro é um Options
opcional que especifica o seguinte:
FlowAddr
: endereço e porta a serem detectados. Se não for especificado, o servidor escuta na porta especificada pela variável de ambiente PORT. Se estiver vazio, ele usa o padrão da porta 3400.Flows
: quais fluxos serão exibidos. Se não for especificado,Init
exibirá todo os fluxos definidos.
Para disponibilizar fluxos no mesmo host e porta que outros endpoints, você
pode definir FlowAddr
como -
e chamar NewFlowServeMux()
para receber um gerenciador
para seus fluxos do Genkit, que você pode multiplexar com seus outros gerenciadores de rota:
mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))
Observabilidade do fluxo
Às vezes, ao usar SDKs de terceiros que não são instrumentados para observabilidade,
convém vê-los como uma etapa de rastreamento separada na interface do desenvolvedor. Você só precisa
unir o código na função run
.
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
// ...
return "", nil
})
// ...
return themes, err
})