Przepływy

Przepływy to funkcje z pewnymi dodatkowymi cechami: mają silnie typowy typ, mogą być strumieniowane, wywoływane lokalnie i zdalnie oraz w pełni obserwowalne. Firebase Genkit udostępnia interfejs wiersza poleceń i narzędzia interfejsu programisty do pracy z przepływami (uruchamianie, debugowanie itp.).

Definiowanie przepływów

import { defineFlow } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(restaurantTheme);

    return suggestion;
  }
);

Schematy danych wejściowych i wyjściowych dla przepływów można definiować za pomocą właściwości zod.

import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    inputSchema: z.string(),
    outputSchema: z.string(),
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(input.restaurantTheme);

    return suggestion;
  }
);

Gdy określisz schemat, Genkit sprawdzi go pod kątem danych wejściowych i wyjściowych.

Działające przepływy

Aby uruchomić przepływ, użyj funkcji runFlow:

const response = await runFlow(menuSuggestionFlow, 'French');

Za pomocą interfejsu wiersza poleceń możesz też uruchamiać przepływy:

genkit flow:run menuSuggestionFlow '"French"'

transmisja zakończona

Oto prosty przykład przepływu, który może przesyłać strumieniowo wartości z przepływu:

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    streamSchema: z.string(),
  },
  async (restaurantTheme, streamingCallback) => {
    if (streamingCallback) {
      makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
        streamingCallback(suggestion);
      });
    }
  }
);

Pamiętaj, że pole streamingCallback może być nieokreślone. Jest ona definiowana tylko wtedy, gdy klient, który wywołuje, żąda odpowiedzi strumieniowanej.

Aby wywołać przepływ w trybie strumieniowania, użyj funkcji streamFlow:

const response = streamFlow(menuSuggestionFlow, 'French');

for await (const suggestion of response.stream()) {
  console.log('suggestion', suggestion);
}

Jeśli przepływ nie obejmuje strumieniowego przesyłania danych, streamFlow będzie działać tak samo jak runFlow.

Interfejsu wiersza poleceń możesz też używać do strumieniowania przepływów:

genkit flow:run menuSuggestionFlow '"French"' -s

Wdrażanie przepływów

Jeśli chcesz mieć dostęp do przepływu przez HTTP, musisz go najpierw wdrożyć. Genkit zapewnia integrację z hostami Cloud Functions dla Firebase i Express.js, takimi jak Cloud Run.

Wdrożone przepływy obsługują te same funkcje co przepływy lokalne (np. strumieniowanie i dostrzegalność).

Funkcja w Cloud Functions dla Firebase

Aby używać przepływów w Cloud Functions dla Firebase, użyj wtyczki firebase, zamień defineFlow na onFlow i dołącz authPolicy.

import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';

export const menuSuggestionFlow = onFlow(
  {
    name: 'menuSuggestionFlow',
    authPolicy: firebaseAuth((user) => {
      if (!user.email_verified) {
        throw new Error("Verified email required to run flow");
      }
    }
  },
  async (restaurantTheme) => {
    // ....
  }
);

Express.js

Aby wdrożyć przepływy za pomocą Cloud Run i podobnych usług, zdefiniuj przepływy za pomocą defineFlow, a następnie wywołaj startFlowsServer():

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    // ....
  }
);

startFlowsServer();

Domyślnie startFlowsServer będzie obsługiwać wszystkie przepływy zdefiniowane w bazie kodu jako punkty końcowe HTTP (np. http://localhost:3400/menuSuggestionFlow).

Możesz wybrać, które przepływy mają być udostępniane przez serwer przepływów. Możesz podać port niestandardowy (będzie on używał zmiennej środowiskowej PORT, jeśli jest ustawiona). Możesz też skonfigurować ustawienia CORS.

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
  // ....
});

export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
  // ....
});

startFlowsServer({
  flows: [flowB],
  port: 4567,
  cors: {
    origin: '*',
  },
});

Dostrzegalność przepływu

Czasami, gdy korzystasz z pakietów SDK innych firm, które nie są przystosowane do dostrzegalności, warto potraktować je jako oddzielny etap śledzenia w interfejsie dewelopera. Wystarczy, że umieścisz kod w funkcji run.

import { defineFlow, run } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    outputSchema: z.array(s.string()),
  },
  async (restaurantTheme) => {
    const themes = await run('find-similar-themes', async () => {
      return await findSimilarRestaurantThemes(restaurantTheme);
    });

    const suggestions = makeMenuItemSuggestions(themes);

    return suggestions;
  }
);