Os fluxos são funções encapsuladas com algumas características adicionais chamadas: elas são fortemente tipadas, transmissíveis, chamáveis local e remotamente e totalmente observáveis. O Firebase Genkit fornece ferramentas de CLI e interface do desenvolvedor para executar e depurar fluxos.
Como definir fluxos
Na forma mais simples, um fluxo envolve apenas uma função:
Go
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
suggestion := makeMenuItemSuggestion(restaurantTheme)
return suggestion, nil
})
Isso permite executar a função a partir da CLI do Genkit e da IU do desenvolvedor e é um requisito para muitos dos recursos do Genkit, incluindo implantação e observabilidade.
Uma vantagem importante que os fluxos do Genkit têm em comparação com a chamada direta a uma API de modelo é segurança de tipo de entradas e saídas:
Go
Os tipos de argumento e resultado de um fluxo podem ser valores simples ou estruturados.
O Genkit produzirá esquemas JSON para esses valores usando
invopop/jsonschema
.
O fluxo a seguir usa um string
como entrada e gera um struct
:
type MenuSuggestion struct {
ItemName string `json:"item_name"`
Description string `json:"description"`
Calories int `json:"calories"`
}
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
return suggestion, nil
},
)
Fluxos em execução
Para executar um fluxo no seu código:
Go
suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")
Também é possível usar a CLI para executar fluxos:
genkit flow:run menuSuggestionFlow '"French"'
Transmitido
Confira um exemplo simples de fluxo que pode transmitir valores:
Go
// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string
menuSuggestionFlow := genkit.DefineStreamingFlow(
"menuSuggestionFlow",
func(
ctx context.Context,
restaurantTheme InputType,
callback func(context.Context, StreamType) error,
) (OutputType, error) {
var menu strings.Builder
menuChunks := make(chan StreamType)
go makeFullMenuSuggestion(restaurantTheme, menuChunks)
for {
chunk, ok := <-menuChunks
if !ok {
break
}
if callback != nil {
callback(context.Background(), chunk)
}
menu.WriteString(string(chunk))
}
return OutputType(menu.String()), nil
},
)
O callback de streaming pode ser indefinido. Ela só é definida se o Invocando cliente está solicitando uma resposta de stream.
Para invocar um fluxo no modo de streaming:
Go
menuSuggestionFlow.Stream(
context.Background(),
"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
if !sfv.Done {
fmt.Print(sfv.Output)
return true
} else {
return false
}
})
Se o fluxo não implementar o streaming, o StreamFlow()
vai se comportar de forma idêntica a
RunFlow()
.
Também é possível usar a CLI para transmitir fluxos:
genkit flow:run menuSuggestionFlow '"French"' -s
Como implantar fluxos
Implante seu fluxo por HTTP se quiser acessá-lo. primeiro.
Go
Para implantar fluxos usando o Cloud Run e serviços semelhantes, defina seus fluxos e
Em seguida, chame Init()
:
func main() {
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
// ...
return "", nil
},
)
if err := genkit.Init(context.Background(), nil); err != nil {
log.Fatal(err)
}
}
Init
inicia um servidor net/http
que expõe seus fluxos como HTTP
endpoints (por exemplo, http://localhost:3400/menuSuggestionFlow
).
O segundo parâmetro é um Options
opcional que especifica o seguinte:
FlowAddr
: endereço e porta a serem detectados. Se não for especificado, o servidor detecta na porta especificada pela variável de ambiente PORT; Se ela estiver vazia, será usado o padrão da porta 3400.Flows
: quais fluxos serão exibidos. Se não for especificado,Init
exibirá todo o os fluxos definidos.
Para disponibilizar fluxos no mesmo host e porta que outros endpoints,
pode definir FlowAddr
como -
e chamar NewFlowServeMux()
para receber um gerenciador.
para seus fluxos do Genkit, que você pode multiplexar com seus outros gerenciadores de rota:
mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))
Observabilidade do fluxo
Às vezes, ao usar SDKs de terceiros que não são instrumentados para observabilidade,
convém vê-los como uma etapa de rastreamento separada na interface do desenvolvedor. Você só precisa
é preciso unir o código na função run
.
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
// ...
return "", nil
})
// ...
return themes, err
})