Flujos

Los flujos son funciones unidas con algunas características adicionales sobre las llamadas: se escriben de forma sólida, se pueden transmitir, se pueden llamar de manera local y remota, y sea totalmente observable. Firebase Genkit proporciona herramientas de CLI y de IU para desarrolladores para ejecutar y depurar flujos.

Cómo definir flujos

En su forma más sencilla, un flujo solo une una función:

Go

menuSuggestionFlow := genkit.DefineFlow(
  "menuSuggestionFlow",
  func(ctx context.Context, restaurantTheme string) (string, error) {
      suggestion := makeMenuItemSuggestion(restaurantTheme)
      return suggestion, nil
  })

Hacerlo te permite ejecutar la función desde la CLI de Genkit y la IU para desarrolladores, y un requisito para muchas de las funciones de Genkit, incluidas las funciones de observabilidad.

Una ventaja importante que tienen los flujos de Genkit en comparación con llamar directamente a una API de modelo es de tipo de entrada y salida:

Go

Los tipos de argumento y resultado de un flujo pueden ser valores simples o estructurados. Genkit producirá esquemas JSON para estos valores con invopop/jsonschema.

El siguiente flujo toma un string como entrada y genera un struct como resultado:

type MenuSuggestion struct {
  ItemName    string `json:"item_name"`
  Description string `json:"description"`
  Calories    int    `json:"calories"`
}
menuSuggestionFlow := genkit.DefineFlow(
  "menuSuggestionFlow",
  func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
      suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
      return suggestion, nil
  },
)

Flujos en ejecución

Para ejecutar un flujo en tu código, haz lo siguiente:

Go

suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")

También puedes usar la CLI para ejecutar flujos:

genkit flow:run menuSuggestionFlow '"French"'

Contenido transmitido

El siguiente es un ejemplo sencillo de un flujo que puede transmitir valores:

Go

// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string
menuSuggestionFlow := genkit.DefineStreamingFlow(
  "menuSuggestionFlow",
  func(
      ctx context.Context,
      restaurantTheme InputType,
      callback func(context.Context, StreamType) error,
  ) (OutputType, error) {
      var menu strings.Builder
      menuChunks := make(chan StreamType)
      go makeFullMenuSuggestion(restaurantTheme, menuChunks)
      for {
          chunk, ok := <-menuChunks
          if !ok {
              break
          }
          if callback != nil {
              callback(context.Background(), chunk)
          }
          menu.WriteString(string(chunk))
      }
      return OutputType(menu.String()), nil
  },
)

Ten en cuenta que la devolución de llamada de transmisión puede ser indefinida. Solo se define si los que invoca el cliente solicita una respuesta transmitida.

Para invocar un flujo en modo de transmisión, sigue estos pasos:

Go

menuSuggestionFlow.Stream(
  context.Background(),
  "French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
  if !sfv.Done {
      fmt.Print(sfv.Output)
      return true
  } else {
      return false
  }
})

Si el flujo no implementa la transmisión, StreamFlow() se comporta de manera idéntica a RunFlow()

También puedes usar la CLI para transmitir flujos:

genkit flow:run menuSuggestionFlow '"French"' -s

Implementa flujos

Si quieres acceder a tu flujo a través de HTTP, deberás implementarlo antes de empezar.

Go

Para implementar flujos con Cloud Run y servicios similares, define tus flujos. Luego, llama a Init():

func main() {
  genkit.DefineFlow(
      "menuSuggestionFlow",
      func(ctx context.Context, restaurantTheme string) (string, error) {
          // ...
          return "", nil
      },
  )
  if err := genkit.Init(context.Background(), nil); err != nil {
      log.Fatal(err)
  }
}

Init inicia un servidor net/http que expone tus flujos como HTTP. extremos (por ejemplo, http://localhost:3400/menuSuggestionFlow).

El segundo parámetro es un Options opcional que especifica lo siguiente:

  • FlowAddr: Es la dirección y el puerto que se escucharán. Si no se especifica, el servidor escucha en el puerto especificado por la variable de entorno PORT; si está vacío, usa el puerto predeterminado 3400.
  • Flows: Qué flujos se deben entregar. Si no se especifica, Init publica todos los flujos definidos.

Si quieres entregar flujos en el mismo host y puerto que otros extremos, puedes Puedes establecer FlowAddr en - y, en su lugar, llamar a NewFlowServeMux() para obtener un controlador. para tus flujos de Genkit, que puedes multiplexar con tus otros controladores de rutas:

mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))

Observabilidad del flujo

A veces, cuando se usan SDKs de terceros que no están instrumentados para la observabilidad, es posible que quieras verlos como un paso de registro separado en la IU para desarrolladores. Todo lo que que debes hacer es unir el código en la función run.

genkit.DefineFlow(
    "menuSuggestionFlow",
    func(ctx context.Context, restaurantTheme string) (string, error) {
        themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
            // ...
            return "", nil
        })

        // ...
        return themes, err
    })