Como definir fluxos de trabalho de IA

O núcleo dos recursos de IA do seu app são solicitações de modelo generativo, mas é raro que você possa simplesmente receber a entrada do usuário, transmiti-la ao modelo e mostrar a saída do modelo de volta ao usuário. Geralmente, há etapas de pré- e pós-processamento que precisam acompanhar a chamada do modelo. Exemplo:

  • Como recuperar informações contextuais para enviar com a chamada de modelo
  • Recuperar o histórico da sessão atual do usuário, por exemplo, em um app de chat
  • Usar um modelo para reformatar a entrada do usuário de uma maneira adequada para transmitir a outro modelo
  • Avaliar a "segurança" da saída de um modelo antes de apresentá-la ao usuário
  • Como combinar a saída de vários modelos

Todas as etapas desse fluxo de trabalho precisam funcionar juntas para que qualquer tarefa relacionada à IA tenha sucesso.

No Genkit, você representa essa lógica intimamente vinculada usando uma construção chamada fluxo. Os fluxos são escritos como funções, usando o código TypeScript comum, mas adicionam recursos adicionais para facilitar o desenvolvimento de recursos de IA:

  • Segurança de tipo: esquemas de entrada e saída definidos usando o Zod, que fornece verificação de tipo estática e de tempo de execução.
  • Integração com a interface do desenvolvedor: depure fluxos de forma independente do código do aplicativo usando a interface do desenvolvedor. Na interface do desenvolvedor, é possível executar fluxos e conferir os rastros de cada etapa.
  • Implantação simplificada: implante fluxos diretamente como endpoints da API da Web usando o Cloud Functions para Firebase ou qualquer plataforma que possa hospedar um app da Web.

Ao contrário de recursos semelhantes em outros frameworks, os fluxos do Genkit são leves e não intrusivos, e não forçam o app a se conformar com nenhuma abstração específica. Toda a lógica do fluxo é escrita em TypeScript padrão, e o código dentro de um fluxo não precisa estar ciente do fluxo.

Como definir e chamar fluxos

Na forma mais simples, um fluxo envolve apenas uma função. O exemplo a seguir envolve uma função que chama generate():

export const menuSuggestionFlow = ai.defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    const { text } = await ai.generate({
      model: gemini15Flash,
      prompt: `Invent a menu item for a ${restaurantTheme} themed restaurant.`,
    });
    return text;
  }
);

Ao agrupar as chamadas generate() dessa forma, você adiciona algumas funcionalidades: isso permite executar o fluxo da CLI do Genkit e da interface do desenvolvedor, e é um requisito para vários recursos do Genkit, incluindo implantação e observabilidade (seções posteriores discutem esses tópicos).

Esquemas de entrada e saída

Uma das vantagens mais importantes que os fluxos do Genkit têm em comparação com a chamada direta a uma API de modelo é a segurança de tipo de entradas e saídas. Ao definir fluxos, você pode definir esquemas para eles usando o Zod, da mesma forma que define o esquema de saída de uma chamada generate(). No entanto, ao contrário de generate(), você também pode especificar um esquema de entrada.

Confira um refinamento do último exemplo, que define um fluxo que recebe uma string como entrada e gera um objeto:

const MenuItemSchema = z.object({
  dishname: z.string(),
  description: z.string(),
});

export const menuSuggestionFlowWithSchema = ai.defineFlow(
  {
    name: 'menuSuggestionFlow',
    inputSchema: z.string(),
    outputSchema: MenuItemSchema,
  },
  async (restaurantTheme) => {
    const { output } = await ai.generate({
      model: gemini15Flash,
      prompt: `Invent a menu item for a ${restaurantTheme} themed restaurant.`,
      output: { schema: MenuItemSchema },
    });
    if (output == null) {
      throw new Error("Response doesn't satisfy schema.");
    }
    return output;
  }
);

O esquema de um fluxo não precisa necessariamente estar alinhado com o esquema das chamadas generate() dentro do fluxo. Na verdade, um fluxo pode nem conter chamadas generate(). Confira uma variação do exemplo que transmite um esquema para generate(), mas usa a saída estruturada para formatar uma string simples, que o fluxo retorna.

export const menuSuggestionFlowMarkdown = ai.defineFlow(
  {
    name: 'menuSuggestionFlow',
    inputSchema: z.string(),
    outputSchema: z.string(),
  },
  async (restaurantTheme) => {
    const { output } = await ai.generate({
      model: gemini15Flash,
      prompt: `Invent a menu item for a ${restaurantTheme} themed restaurant.`,
      output: { schema: MenuItemSchema },
    });
    if (output == null) {
      throw new Error("Response doesn't satisfy schema.");
    }
    return `**${output.dishname}**: ${output.description}`;
  }
);

Chamar fluxos

Depois de definir um fluxo, é possível chamá-lo no código do Node.js:

const { text } = await menuSuggestionFlow('bistro');

O argumento para o fluxo precisa estar em conformidade com o esquema de entrada, se você tiver definido um.

Se você tiver definido um esquema de saída, a resposta do fluxo vai se conformar a ele. Por exemplo, se você definir o esquema de saída como MenuItemSchema, a saída do fluxo vai conter as propriedades dele:

const { dishname, description } =
  await menuSuggestionFlowWithSchema('bistro');

Fluxos de streaming

Os fluxos oferecem suporte a streaming usando uma interface semelhante à interface de streaming do generate(). O streaming é útil quando o fluxo gera uma grande quantidade de saída, porque você pode apresentar a saída ao usuário conforme ela é gerada, o que melhora a capacidade de resposta percebida do app. Como exemplo conhecido, as interfaces de LLM baseadas em chat geralmente transmitem as respostas ao usuário conforme elas são geradas.

Confira um exemplo de fluxo compatível com streaming:

export const menuSuggestionStreamingFlow = ai.defineStreamingFlow(
  {
    name: 'menuSuggestionFlow',
    inputSchema: z.string(),
    streamSchema: z.string(),
    outputSchema: z.object({ theme: z.string(), menuItem: z.string() }),
  },
  async (restaurantTheme, streamingCallback) => {
    const response = await ai.generateStream({
      model: gemini15Flash,
      prompt: `Invent a menu item for a ${restaurantTheme} themed restaurant.`,
    });

    if (streamingCallback) {
      for await (const chunk of response.stream) {
        // Here, you could process the chunk in some way before sending it to
        // the output stream via streamingCallback(). In this example, we output
        // the text of the chunk, unmodified.
        streamingCallback(chunk.text);
      }
    }

    return {
      theme: restaurantTheme,
      menuItem: (await response.response).text,
    };
  }
);
  • A opção streamSchema especifica o tipo de valores que o fluxo transmite. Ele não precisa necessariamente ser do mesmo tipo que o outputSchema, que é o tipo de saída completa do fluxo.
  • streamingCallback é uma função de callback que usa um único parâmetro do tipo especificado por streamSchema. Sempre que os dados estiverem disponíveis no fluxo, chame essa função para enviá-los ao fluxo de saída. O streamingCallback só é definido se o autor da chamada do seu fluxo solicitou a saída de streaming. Portanto, verifique se ele está definido antes de fazer a chamada.

No exemplo acima, os valores transmitidos pelo fluxo são acoplados diretamente aos valores transmitidos pela chamada generate() dentro do fluxo. Embora isso seja comum, não precisa ser assim: você pode enviar valores para o stream usando o callback com a frequência que for útil para seu fluxo.

Como chamar fluxos de streaming

Os fluxos de streaming também são acionáveis, mas retornam imediatamente um objeto de resposta em vez de uma promessa:

const response = menuSuggestionStreamingFlow('Danube');

O objeto de resposta tem uma propriedade de stream, que pode ser usada para iterar a saída de streaming do fluxo conforme ele é gerado:

for await (const chunk of response.stream) {
  console.log('chunk', chunk);
}

Também é possível receber a saída completa do fluxo, como em um fluxo não de streaming:

const output = await response.output;

A saída de streaming de um fluxo pode não ser do mesmo tipo que a saída completa. A saída de streaming está em conformidade com streamSchema, enquanto a saída completa está em conformidade com outputSchema.

Como executar fluxos na linha de comando

É possível executar fluxos na linha de comando usando a ferramenta CLI do Genkit:

genkit flow:run menuSuggestionFlow '"French"'

Para fluxos de streaming, é possível imprimir a saída de streaming no console adicionando a flag -s:

genkit flow:run menuSuggestionFlow '"French"' -s

Executar um fluxo na linha de comando é útil para testar um fluxo ou executar fluxos que realizam tarefas necessárias de forma ad hoc. Por exemplo, para executar um fluxo que ingere um documento no banco de dados vetorial.

Fluxos de depuração

Uma das vantagens de encapsular a lógica de IA em um fluxo é que você pode testar e depurar o fluxo independentemente do app usando a interface do desenvolvedor do Genkit.

Para iniciar a interface para desenvolvedores, execute os seguintes comandos no diretório do seu projeto:

genkit start -- tsx --watch src/your-code.ts

Na guia Run da interface do desenvolvedor, é possível executar qualquer um dos fluxos definidos no seu projeto:

Captura de tela do Flow runner

Depois de executar um fluxo, é possível inspecionar um rastro da invocação do fluxo clicando em View trace ou na guia Inspect.

No visualizador de rastros, você pode conferir detalhes sobre a execução de todo o fluxo, bem como detalhes de cada etapa individual no fluxo. Por exemplo, considere o fluxo a seguir, que contém várias solicitações de geração:

const PrixFixeMenuSchema = z.object({
  starter: z.string(),
  soup: z.string(),
  main: z.string(),
  dessert: z.string(),
});

export const complexMenuSuggestionFlow = ai.defineFlow(
  {
    name: 'complexMenuSuggestionFlow',
    inputSchema: z.string(),
    outputSchema: PrixFixeMenuSchema,
  },
  async (theme: string): Promise<z.infer<typeof PrixFixeMenuSchema>> => {
    const chat = ai.chat({ model: gemini15Flash });
    await chat.send('What makes a good prix fixe menu?');
    await chat.send(
      'What are some ingredients, seasonings, and cooking techniques that ' +
        `would work for a ${theme} themed menu?`
    );
    const { output } = await chat.send({
      prompt:
        `Based on our discussion, invent a prix fixe menu for a ${theme} ` +
        'themed restaurant.',
      output: {
        schema: PrixFixeMenuSchema,
      },
    });
    if (!output) {
      throw new Error('No data generated.');
    }
    return output;
  }
);

Ao executar esse fluxo, o Visualizador de rastros mostra detalhes sobre cada solicitação de geração, incluindo a saída:

Captura de tela do Inspetor de trace

Etapas do fluxo

No último exemplo, você viu que cada chamada generate() apareceu como uma etapa separada no Visualizador de rastros. Cada uma das ações fundamentais do Genkit aparece como etapas separadas de um fluxo:

  • generate()
  • Chat.send()
  • embed()
  • index()
  • retrieve()

Se você quiser incluir um código diferente do acima nos seus rastros, faça isso envolvendo o código em uma chamada run(). Isso pode ser feito para chamadas de bibliotecas de terceiros que não são compatíveis com o Genkit ou para qualquer seção crítica de código.

Por exemplo, aqui está um fluxo com duas etapas: a primeira recupera um menu usando um método não especificado, e a segunda inclui o menu como contexto para uma chamada generate().

export const menuQuestionFlow = ai.defineFlow(
  {
    name: 'menuQuestionFlow',
    inputSchema: z.string(),
    outputSchema: z.string(),
  },
  async (input: string): Promise<string> => {
    const menu = await run('retrieve-daily-menu', async (): Promise<string> => {
      // Retrieve today's menu. (This could be a database access or simply
      // fetching the menu from your website.)

      // ...

      return menu;
    });
    const { text } = await ai.generate({
      model: gemini15Flash,
      system: "Help the user answer questions about today's menu.",
      prompt: input,
      docs: [{ content: [{ text: menu }] }],
    });
    return text;
  }
);

Como a etapa de recuperação é embrulhada em uma chamada run(), ela é incluída como uma etapa no Visualizador de rastros:

Captura de tela de uma etapa definida explicitamente no Trace Inspector

Como implantar fluxos

É possível implantar seus fluxos diretamente como endpoints da API da Web, prontos para serem chamados pelos clientes do app. A implantação é discutida em detalhes em várias outras páginas, mas esta seção oferece uma visão geral das opções de implantação.

Cloud Functions para Firebase

Para implantar fluxos com o Cloud Functions para Firebase, use o plug-in firebase. Nas definições de fluxo, substitua defineFlow por onFlow e inclua um authPolicy.

import { firebaseAuth } from '@genkit-ai/firebase/auth';
import { onFlow } from '@genkit-ai/firebase/functions';

export const menuSuggestion = onFlow(
  ai,
  {
    name: 'menuSuggestionFlow',
    authPolicy: firebaseAuth((user) => {
      if (!user.email_verified) {
        throw new Error('Verified email required to run flow');
      }
    }),
  },
  async (restaurantTheme) => {
    // ...
  }
);

Para mais informações, consulte as seguintes páginas:

Express.js

Para implantar fluxos usando qualquer plataforma de hospedagem Node.js, como o Cloud Run, defina seus fluxos usando defineFlow() e chame startFlowServer():

export const menuSuggestionFlow = ai.defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    // ...
  }
);

ai.startFlowServer({
  flows: [menuSuggestionFlow],
});

Por padrão, startFlowServer vai atender todos os fluxos definidos na base de código como endpoints HTTP (por exemplo, http://localhost:3400/menuSuggestionFlow). É possível chamar um fluxo com uma solicitação POST da seguinte maneira:

curl -X POST "http://localhost:3400/menuSuggestionFlow" \
  -H "Content-Type: application/json"  -d '{"data": "banana"}'

Se necessário, você pode personalizar o servidor de fluxos para exibir uma lista específica de fluxos, conforme mostrado abaixo. Também é possível especificar uma porta personalizada (ela vai usar a variável de ambiente PORT se definida) ou especificar configurações do CORS.

export const flowA = ai.defineFlow({ name: 'flowA' }, async (subject) => {
  // ...
});

export const flowB = ai.defineFlow({ name: 'flowB' }, async (subject) => {
  // ...
});

ai.startFlowServer({
  flows: [flowB],
  port: 4567,
  cors: {
    origin: '*',
  },
});

Para informações sobre a implantação em plataformas específicas, consulte Implantar com o Cloud Run e Implantar fluxos em qualquer plataforma Node.js.