编写 Genkit 遥测插件

Firebase Genkit 库使用 OpenTelemetry 进行插桩,从而支持收集跟踪记录、指标和日志。Genkit 用户可以通过安装一个插件来将此遥测数据导出到监控和可视化工具,该插件会将 OpenTelemetry Go SDK 配置为导出到支持 OpenTelemetry 的特定系统。

Genkit 包含一个插件,用于配置 OpenTelemetry 以将数据导出到 Google Cloud Monitoring 和 Cloud Logging。如需支持其他监控系统,您可以通过编写遥测插件来扩展 Genkit,如本页面中所述。

准备工作

如需了解如何编写任何类型的 Genkit 插件(包括遥测插件),请参阅编写 Genkit 插件。特别要注意,每个插件都必须导出一个 Init 函数,用户应在使用此插件之前调用该函数。

导出器和日志记录器

如上文所述,遥测插件的主要工作是配置 OpenTelemetry(Genkit 已使用其进行了插桩),以将数据导出到特定服务。为此,您需要以下各项:

  • OpenTelemetry SpanExporter 接口的实现,用于将数据导出到您选择的服务。
  • OpenTelemetry metric.Exporter 接口的实现,用于将数据导出到您选择的服务。
  • slog.Loggerslog.Handler 接口的实现,用于将日志导出到您选择的服务。

根据您要导出到的服务,这可能需要进行相对较少的工作,也可能需要进行大量工作。

由于 OpenTelemetry 是一个业界标准,因此许多监控服务已经拥有实现这些接口的库。例如,Genkit 的 googlecloud 插件使用由 Google Cloud 团队维护的 opentelemetry-operations-go 库。同样,许多监控服务都提供实现标准 slog 接口的库。

另一方面,如果您的服务没有可用的此类库,实现必要的接口可能是一个重大的项目。

请查看 OpenTelemetry 注册表或监控服务的文档,了解是否已提供集成。

如果您需要自行构建这些集成,请查看官方 OpenTelemetry 导出器的源代码以及 slog 处理程序编写指南页面。

构建插件

依赖项

每个遥测插件都需要导入 Genkit 核心库和几个 OpenTelemetry 库:

import {
	// Import the Genkit core library.

	"github.com/firebase/genkit/go/genkit"

	// Import the OpenTelemetry libraries.
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/sdk/metric"
	"go.opentelemetry.io/otel/sdk/trace"
}

如果您要围绕现有的 OpenTelemetry 或 slog 集成构建插件,则还需要导入它们。

Config

遥测插件至少应支持以下配置选项:

type Config struct {
	// Export even in the dev environment.
	ForceExport bool

	// The interval for exporting metric data.
	// The default is 60 seconds.
	MetricInterval time.Duration

	// The minimum level at which logs will be written.
	// Defaults to [slog.LevelInfo].
	LogLevel slog.Leveler
}

以下示例假定您已提供这些选项,并将就如何处理这些选项提供一些指导。

大多数插件还包含所导出到的服务的配置设置(API 密钥、项目名称等)。

Init()

遥测插件的 Init() 函数应执行以下所有操作:

  • 如果 Genkit 在开发环境中运行(例如使用 genkit start 运行时)且 Config.ForceExport 选项未设置,则提前返回:

    shouldExport := cfg.ForceExport || os.Getenv("GENKIT_ENV") != "dev"
    if !shouldExport {
    	return nil
    }
    
  • 初始化跟踪记录 span 导出器并将其注册到 Genkit:

    spanProcessor := trace.NewBatchSpanProcessor(YourCustomSpanExporter{})
    genkit.RegisterSpanProcessor(g, spanProcessor)
    
  • 初始化指标导出器并将其注册到 OpenTelemetry 库中:

    r := metric.NewPeriodicReader(
    	YourCustomMetricExporter{},
    	metric.WithInterval(cfg.MetricInterval),
    )
    mp := metric.NewMeterProvider(metric.WithReader(r))
    otel.SetMeterProvider(mp)
    

    在初始化 PeriodicReader 时,使用用户配置的收集时间间隔 (Config.MetricInterval)。

  • slog 处理程序注册为默认日志记录器:

    logger := slog.New(YourCustomHandler{
    	Options: &slog.HandlerOptions{Level: cfg.LogLevel},
    })
    slog.SetDefault(logger)
    

    您应将处理程序配置为遵循用户指定的最小日志级别 (Config.LogLevel)。

个人身份信息隐去

由于大多数生成式 AI flow 都以某种形式的用户输入开始,因此某些 flow 跟踪记录很可能包含个人身份信息 (PII)。为了保护用户信息,您应在导出跟踪记录之前从中隐去 PII。

如果您要构建自己的 span 导出器,则可以在其中构建此功能。

如果您要围绕现有的 OpenTelemetry 集成构建插件,则可以使用执行此任务的自定义导出器封装所提供的 span 导出器。例如,googlecloud 插件会移除每个 span 中的 genkit:inputgenkit:output 属性,然后使用如下所示的封装容器导出这些 span:

type redactingSpanExporter struct {
	trace.SpanExporter
}

func (e *redactingSpanExporter) ExportSpans(ctx context.Context, spanData []trace.ReadOnlySpan) error {
	var redacted []trace.ReadOnlySpan
	for _, s := range spanData {
		redacted = append(redacted, redactedSpan{s})
	}
	return e.SpanExporter.ExportSpans(ctx, redacted)
}

func (e *redactingSpanExporter) Shutdown(ctx context.Context) error {
	return e.SpanExporter.Shutdown(ctx)
}

type redactedSpan struct {
	trace.ReadOnlySpan
}

func (s redactedSpan) Attributes() []attribute.KeyValue {
	// Omit input and output, which may contain PII.
	var ts []attribute.KeyValue
	for _, a := range s.ReadOnlySpan.Attributes() {
		if a.Key == "genkit:input" || a.Key == "genkit:output" {
			continue
		}
		ts = append(ts, a)
	}
	return ts
}

问题排查

如果您在让数据显示在预期位置时遇到问题,可以使用 OpenTelemetry 提供的实用诊断工具来帮助您找到问题根源。

代码优先框架,用于编排、部署和监控生成式 AI 工作流。

更新于 Feb 25, 2025

代码优先框架,用于编排、部署和监控生成式 AI 工作流。

更新于 Feb 25, 2025

Firebase gives you the tools and infrastructure you need to build better mobile and web apps, improve app quality, and grow your business.

更新于 Apr 28, 2021