Akışlar

Akışlar, doğrudan çağrılara kıyasla bazı ek özelliklere sahip sarmalanmış işlevlerdir: güçlü bir şekilde yazılmış, akışa uygun, yerel ve uzaktan çağrılabilir ve tamamen gözlemlenebilirler. Firebase Genkit, akışları çalıştırmak ve hata ayıklamak için CLI ve geliştirici kullanıcı arayüzü araçları sağlar.

Akışları tanımlama

Akış, en basit haliyle bir fonksiyonu sarmalar:

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		suggestion := makeMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	})

Bu işlem, işlevi Genkit CLI'den ve geliştirici kullanıcı arayüzünden çalıştırmanıza olanak tanır ve dağıtım ile gözlemlenebilirlik dahil olmak üzere Genkit'in birçok özelliği için gereklidir.

Genkit akışlarının doğrudan model API'ye çağrıya kıyasla önemli bir avantajı, hem giriş hem de çıkışların tür güvenliğidir. Bir akışın bağımsız değişkeni ve sonuç türleri basit veya yapılandırılmış değerler olabilir. Genkit, invopop/jsonschema kullanarak bu değerler için JSON şemaları oluşturur.

Aşağıdaki akışta giriş olarak string alınır ve struct çıkışı yapılır:

type MenuSuggestion struct {
	ItemName    string `json:"item_name"`
	Description string `json:"description"`
	Calories    int    `json:"calories"`
}

menuSuggestionFlow := genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
		suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
		return suggestion, nil
	},
)

Devam eden akışlar

Kodunuzda bir akış çalıştırmak için:

suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")

Akışları çalıştırmak için KSA'yı da kullanabilirsiniz:

genkit flow:run menuSuggestionFlow '"French"'

Yayınlandı

Değerleri aktarabilen bir akış örneğini aşağıda bulabilirsiniz:

// Types for illustrative purposes.
type InputType string
type OutputType string
type StreamType string

menuSuggestionFlow := genkit.DefineStreamingFlow(
	"menuSuggestionFlow",
	func(
		ctx context.Context,
		restaurantTheme InputType,
		callback func(context.Context, StreamType) error,
	) (OutputType, error) {
		var menu strings.Builder
		menuChunks := make(chan StreamType)
		go makeFullMenuSuggestion(restaurantTheme, menuChunks)
		for {
			chunk, ok := <-menuChunks
			if !ok {
				break
			}
			if callback != nil {
				callback(context.Background(), chunk)
			}
			menu.WriteString(string(chunk))
		}
		return OutputType(menu.String()), nil
	},
)

Akış geri çağırmasının tanımsız olabileceğini unutmayın. Yalnızca çağıran istemci akışlı yanıt istiyorsa tanımlanır.

Akış modunda bir akışı çağırmak için:

menuSuggestionFlow.Stream(
	context.Background(),
	"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
	if err != nil {
		// handle err
		return false
	}
	if !sfv.Done {
		fmt.Print(sfv.Stream)
		return true
	} else {
		fmt.Print(sfv.Output)
		return false
	}
})

Akış akışları uygulanmıyorsa StreamFlow(), RunFlow() ile aynı şekilde davranır.

Akışları aktarmak için KSA'yı da kullanabilirsiniz:

genkit flow:run menuSuggestionFlow '"French"' -s

Akışları dağıtma

Akışınıza HTTP üzerinden erişebilmek için önce dağıtmanız gerekir. Akışları Cloud Run ve benzer hizmetleri kullanarak dağıtmak için akışlarınızı tanımlayıp Init() işlevini çağırın:

func main() {
	genkit.DefineFlow(
		"menuSuggestionFlow",
		func(ctx context.Context, restaurantTheme string) (string, error) {
			// ...
			return "", nil
		},
	)
	if err := genkit.Init(context.Background(), nil); err != nil {
		log.Fatal(err)
	}
}

Init, akışlarınızı HTTP uç noktaları (örneğin, http://localhost:3400/menuSuggestionFlow) olarak gösteren bir net/http sunucusu başlatır.

İkinci parametre, aşağıdakileri belirten isteğe bağlı bir Options parametresidir:

  • FlowAddr: Dinlenecek adres ve bağlantı noktası. Belirtilmemişse sunucu, PORT ortam değişkeni tarafından belirtilen bağlantı noktasında dinler. Bu değişken boşsa varsayılan olarak 3400 bağlantı noktasını kullanır.
  • Flows: Yayınlanacak akışlar. Belirtilmezse Init, tanımlanan tüm akışlarınızı sunar.

Akışları diğer uç noktalarla aynı ana makine ve bağlantı noktasında yayınlamak istiyorsanız FlowAddr değerini - olarak ayarlayabilir ve bunun yerine Genkit akışlarınız için bir işleyici almak üzere NewFlowServeMux() işlevini çağırabilirsiniz. Bu işleyiciyi diğer rota işleyicilerinizle çoklu yayın yapabilirsiniz:

mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))

Akış gözlemlenebilirliği

Bazen, gözlemlenebilirlik için enstrümante edilmemiş üçüncü taraf SDK'ları kullanırken bunları Geliştirici kullanıcı arayüzünde ayrı bir izleme adımı olarak görmek isteyebilirsiniz. Tek yapmanız gereken kodu run işlevine sarmalamaktır.

genkit.DefineFlow(
	"menuSuggestionFlow",
	func(ctx context.Context, restaurantTheme string) (string, error) {
		themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
			// ...
			return "", nil
		})

		// ...
		return themes, err
	})