Fluxos

Os fluxos são funções encapsuladas com algumas características adicionais chamadas: elas são fortemente tipadas, transmissíveis, chamáveis local e remotamente e totalmente observáveis. O Firebase Genkit fornece ferramentas de CLI e interface do desenvolvedor para executar e depurar fluxos.

Como definir fluxos

Na forma mais simples, um fluxo envolve apenas uma função:

Go

menuSuggestionFlow := genkit.DefineFlow(
  "menuSuggestionFlow",
  func(ctx context.Context, restaurantTheme string) (string, error) {
      suggestion := makeMenuItemSuggestion(restaurantTheme)
      return suggestion, nil
  })

Isso permite executar a função a partir da CLI do Genkit e da IU do desenvolvedor e é um requisito para muitos dos recursos do Genkit, incluindo implantação e observabilidade.

Uma vantagem importante que os fluxos do Genkit têm em comparação com a chamada direta a uma API de modelo é segurança de tipo de entradas e saídas:

Go

Os tipos de argumento e resultado de um fluxo podem ser valores simples ou estruturados. O Genkit produzirá esquemas JSON para esses valores usando invopop/jsonschema.

O fluxo a seguir usa um string como entrada e gera um struct:

type MenuSuggestion struct {
  ItemName    string `json:"item_name"`
  Description string `json:"description"`
  Calories    int    `json:"calories"`
}
menuSuggestionFlow := genkit.DefineFlow(
  "menuSuggestionFlow",
  func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
      suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
      return suggestion, nil
  },
)

Fluxos em execução

Para executar um fluxo no seu código:

Go

suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")

Também é possível usar a CLI para executar fluxos:

genkit flow:run menuSuggestionFlow '"French"'

Transmitido

Confira um exemplo simples de fluxo que pode transmitir valores:

Go

// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string
menuSuggestionFlow := genkit.DefineStreamingFlow(
  "menuSuggestionFlow",
  func(
      ctx context.Context,
      restaurantTheme InputType,
      callback func(context.Context, StreamType) error,
  ) (OutputType, error) {
      var menu strings.Builder
      menuChunks := make(chan StreamType)
      go makeFullMenuSuggestion(restaurantTheme, menuChunks)
      for {
          chunk, ok := <-menuChunks
          if !ok {
              break
          }
          if callback != nil {
              callback(context.Background(), chunk)
          }
          menu.WriteString(string(chunk))
      }
      return OutputType(menu.String()), nil
  },
)

O callback de streaming pode ser indefinido. Ela só é definida se o Invocando cliente está solicitando uma resposta de stream.

Para invocar um fluxo no modo de streaming:

Go

menuSuggestionFlow.Stream(
  context.Background(),
  "French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
  if !sfv.Done {
      fmt.Print(sfv.Output)
      return true
  } else {
      return false
  }
})

Se o fluxo não implementar o streaming, o StreamFlow() vai se comportar de forma idêntica a RunFlow().

Também é possível usar a CLI para transmitir fluxos:

genkit flow:run menuSuggestionFlow '"French"' -s

Como implantar fluxos

Implante seu fluxo por HTTP se quiser acessá-lo. primeiro.

Go

Para implantar fluxos usando o Cloud Run e serviços semelhantes, defina seus fluxos e Em seguida, chame Init():

func main() {
  genkit.DefineFlow(
      "menuSuggestionFlow",
      func(ctx context.Context, restaurantTheme string) (string, error) {
          // ...
          return "", nil
      },
  )
  if err := genkit.Init(context.Background(), nil); err != nil {
      log.Fatal(err)
  }
}

Init inicia um servidor net/http que expõe seus fluxos como HTTP endpoints (por exemplo, http://localhost:3400/menuSuggestionFlow).

O segundo parâmetro é um Options opcional que especifica o seguinte:

  • FlowAddr: endereço e porta a serem detectados. Se não for especificado, o servidor detecta na porta especificada pela variável de ambiente PORT; Se ela estiver vazia, será usado o padrão da porta 3400.
  • Flows: quais fluxos serão exibidos. Se não for especificado, Init exibirá todo o os fluxos definidos.

Para disponibilizar fluxos no mesmo host e porta que outros endpoints, pode definir FlowAddr como - e chamar NewFlowServeMux() para receber um gerenciador. para seus fluxos do Genkit, que você pode multiplexar com seus outros gerenciadores de rota:

mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))

Observabilidade do fluxo

Às vezes, ao usar SDKs de terceiros que não são instrumentados para observabilidade, convém vê-los como uma etapa de rastreamento separada na interface do desenvolvedor. Você só precisa é preciso unir o código na função run.

genkit.DefineFlow(
    "menuSuggestionFlow",
    func(ctx context.Context, restaurantTheme string) (string, error) {
        themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
            // ...
            return "", nil
        })

        // ...
        return themes, err
    })