diff --git a/otelconf/config_common.go b/otelconf/config_common.go index f3b135aee73..40c2890d18e 100644 --- a/otelconf/config_common.go +++ b/otelconf/config_common.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/otel/baggage" sdklog "go.opentelemetry.io/otel/sdk/log" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) @@ -22,6 +23,7 @@ const ( type configOptions struct { ctx context.Context opentelemetryConfig OpenTelemetryConfiguration + meterProviderOptions []sdkmetric.Option loggerProviderOptions []sdklog.LoggerProviderOption tracerProviderOptions []sdktrace.TracerProviderOption } diff --git a/otelconf/metric.go b/otelconf/metric.go new file mode 100644 index 00000000000..b6e12114401 --- /dev/null +++ b/otelconf/metric.go @@ -0,0 +1,591 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelconf // import "go.opentelemetry.io/contrib/otelconf" + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math" + "net" + "net/http" + "net/url" + "os" + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + otelprom "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/sdk/instrumentation" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/resource" + "google.golang.org/grpc/credentials" + + "go.opentelemetry.io/contrib/otelconf/internal/tls" +) + +var zeroScope instrumentation.Scope + +const instrumentKindUndefined = sdkmetric.InstrumentKind(0) + +func meterProvider(cfg configOptions, res *resource.Resource) (metric.MeterProvider, shutdownFunc, error) { + if cfg.opentelemetryConfig.MeterProvider == nil { + return noop.NewMeterProvider(), noopShutdown, nil + } + provider, ok := cfg.opentelemetryConfig.MeterProvider.(*MeterProviderJson) + if !ok { + return noop.NewMeterProvider(), noopShutdown, newErrInvalid("meter_provider") + } + opts := append(cfg.meterProviderOptions, sdkmetric.WithResource(res)) + + var errs []error + for _, reader := range provider.Readers { + r, err := metricReader(cfg.ctx, reader) + if err == nil { + opts = append(opts, sdkmetric.WithReader(r)) + } else { + errs = append(errs, err) + } + } + for _, vw := range provider.Views { + v, err := view(vw) + if err == nil { + opts = append(opts, sdkmetric.WithView(v)) + } else { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return noop.NewMeterProvider(), noopShutdown, errors.Join(errs...) + } + + mp := sdkmetric.NewMeterProvider(opts...) + return mp, mp.Shutdown, nil +} + +func metricReader(ctx context.Context, r MetricReader) (sdkmetric.Reader, error) { + if r.Periodic != nil && r.Pull != nil { + return nil, newErrInvalid("must not specify multiple metric reader type") + } + + if r.Periodic != nil { + var opts []sdkmetric.PeriodicReaderOption + if r.Periodic.Interval != nil { + opts = append(opts, sdkmetric.WithInterval(time.Duration(*r.Periodic.Interval)*time.Millisecond)) + } + + if r.Periodic.Timeout != nil { + opts = append(opts, sdkmetric.WithTimeout(time.Duration(*r.Periodic.Timeout)*time.Millisecond)) + } + return periodicExporter(ctx, r.Periodic.Exporter, opts...) + } + + if r.Pull != nil { + return pullReader(ctx, r.Pull.Exporter) + } + return nil, newErrInvalid("no valid metric reader") +} + +func pullReader(ctx context.Context, exporter PullMetricExporter) (sdkmetric.Reader, error) { + if exporter.PrometheusDevelopment != nil { + return prometheusReader(ctx, exporter.PrometheusDevelopment) + } + return nil, newErrInvalid("no valid metric exporter") +} + +func periodicExporter(ctx context.Context, exporter PushMetricExporter, opts ...sdkmetric.PeriodicReaderOption) (sdkmetric.Reader, error) { + exportersConfigured := 0 + var exportFunc func() (sdkmetric.Reader, error) + + if exporter.Console != nil { + exportersConfigured++ + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + + exp, err := stdoutmetric.New( + stdoutmetric.WithEncoder(enc), + ) + if err != nil { + return nil, err + } + exportFunc = func() (sdkmetric.Reader, error) { + return sdkmetric.NewPeriodicReader(exp, opts...), nil + } + } + if exporter.OTLPHttp != nil { + exportersConfigured++ + exp, err := otlpHTTPMetricExporter(ctx, exporter.OTLPHttp) + if err != nil { + return nil, err + } + exportFunc = func() (sdkmetric.Reader, error) { + return sdkmetric.NewPeriodicReader(exp, opts...), nil + } + } + if exporter.OTLPGrpc != nil { + exportersConfigured++ + exp, err := otlpGRPCMetricExporter(ctx, exporter.OTLPGrpc) + if err != nil { + return nil, err + } + exportFunc = func() (sdkmetric.Reader, error) { + return sdkmetric.NewPeriodicReader(exp, opts...), nil + } + } + if exporter.OTLPFileDevelopment != nil { + // TODO: implement file exporter https://github.com/open-telemetry/opentelemetry-go/issues/5408 + return nil, newErrInvalid("otlp_file/development") + } + + if exportersConfigured > 1 { + return nil, newErrInvalid("must not specify multiple exporters") + } + + if exportFunc != nil { + return exportFunc() + } + + return nil, newErrInvalid("no valid metric exporter") +} + +func otlpHTTPMetricExporter(ctx context.Context, otlpConfig *OTLPHttpMetricExporter) (sdkmetric.Exporter, error) { + opts := []otlpmetrichttp.Option{} + + if otlpConfig.Endpoint != nil { + u, err := url.ParseRequestURI(*otlpConfig.Endpoint) + if err != nil { + return nil, errors.Join(newErrInvalid("endpoint parsing failed"), err) + } + opts = append(opts, otlpmetrichttp.WithEndpoint(u.Host)) + + if u.Scheme == "http" { + opts = append(opts, otlpmetrichttp.WithInsecure()) + } + if u.Path != "" { + opts = append(opts, otlpmetrichttp.WithURLPath(u.Path)) + } + } + if otlpConfig.Compression != nil { + switch *otlpConfig.Compression { + case compressionGzip: + opts = append(opts, otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression)) + case compressionNone: + opts = append(opts, otlpmetrichttp.WithCompression(otlpmetrichttp.NoCompression)) + default: + return nil, newErrInvalid(fmt.Sprintf("unsupported compression %q", *otlpConfig.Compression)) + } + } + if otlpConfig.Timeout != nil { + opts = append(opts, otlpmetrichttp.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout))) + } + headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList) + if err != nil { + return nil, err + } + if len(headersConfig) > 0 { + opts = append(opts, otlpmetrichttp.WithHeaders(headersConfig)) + } + if otlpConfig.TemporalityPreference != nil { + switch *otlpConfig.TemporalityPreference { + case "delta": + opts = append(opts, otlpmetrichttp.WithTemporalitySelector(deltaTemporality)) + case "cumulative": + opts = append(opts, otlpmetrichttp.WithTemporalitySelector(cumulativeTemporality)) + case "low_memory": + opts = append(opts, otlpmetrichttp.WithTemporalitySelector(lowMemory)) + default: + return nil, newErrInvalid(fmt.Sprintf("unsupported temporality preference %q", *otlpConfig.TemporalityPreference)) + } + } + + tlsConfig, err := tls.CreateConfig(otlpConfig.CertificateFile, otlpConfig.ClientCertificateFile, otlpConfig.ClientKeyFile) + if err != nil { + return nil, errors.Join(newErrInvalid("tls configuration"), err) + } + opts = append(opts, otlpmetrichttp.WithTLSClientConfig(tlsConfig)) + + return otlpmetrichttp.New(ctx, opts...) +} + +func otlpGRPCMetricExporter(ctx context.Context, otlpConfig *OTLPGrpcMetricExporter) (sdkmetric.Exporter, error) { + var opts []otlpmetricgrpc.Option + + if otlpConfig.Endpoint != nil { + u, err := url.ParseRequestURI(*otlpConfig.Endpoint) + if err != nil { + return nil, errors.Join(newErrInvalid("endpoint parsing failed"), err) + } + // ParseRequestURI leaves the Host field empty when no + // scheme is specified (i.e. localhost:4317). This check is + // here to support the case where a user may not specify a + // scheme. The code does its best effort here by using + // otlpConfig.Endpoint as-is in that case + if u.Host != "" { + opts = append(opts, otlpmetricgrpc.WithEndpoint(u.Host)) + } else { + opts = append(opts, otlpmetricgrpc.WithEndpoint(*otlpConfig.Endpoint)) + } + if u.Scheme == "http" || (u.Scheme != "https" && otlpConfig.Insecure != nil && *otlpConfig.Insecure) { + opts = append(opts, otlpmetricgrpc.WithInsecure()) + } + } + + if otlpConfig.Compression != nil { + switch *otlpConfig.Compression { + case compressionGzip: + opts = append(opts, otlpmetricgrpc.WithCompressor(*otlpConfig.Compression)) + case compressionNone: + // none requires no options + default: + return nil, newErrInvalid(fmt.Sprintf("unsupported compression %q", *otlpConfig.Compression)) + } + } + if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 { + opts = append(opts, otlpmetricgrpc.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout))) + } + headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList) + if err != nil { + return nil, err + } + if len(headersConfig) > 0 { + opts = append(opts, otlpmetricgrpc.WithHeaders(headersConfig)) + } + if otlpConfig.TemporalityPreference != nil { + switch *otlpConfig.TemporalityPreference { + case "delta": + opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(deltaTemporality)) + case "cumulative": + opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(cumulativeTemporality)) + case "low_memory": + opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(lowMemory)) + default: + return nil, newErrInvalid(fmt.Sprintf("unsupported temporality preference %q", *otlpConfig.TemporalityPreference)) + } + } + + if otlpConfig.CertificateFile != nil || otlpConfig.ClientCertificateFile != nil || otlpConfig.ClientKeyFile != nil { + tlsConfig, err := tls.CreateConfig(otlpConfig.CertificateFile, otlpConfig.ClientCertificateFile, otlpConfig.ClientKeyFile) + if err != nil { + return nil, errors.Join(newErrInvalid("tls configuration"), err) + } + opts = append(opts, otlpmetricgrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig))) + } + + return otlpmetricgrpc.New(ctx, opts...) +} + +func cumulativeTemporality(sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.CumulativeTemporality +} + +func deltaTemporality(ik sdkmetric.InstrumentKind) metricdata.Temporality { + switch ik { + case sdkmetric.InstrumentKindCounter, sdkmetric.InstrumentKindHistogram, sdkmetric.InstrumentKindObservableCounter: + return metricdata.DeltaTemporality + default: + return metricdata.CumulativeTemporality + } +} + +func lowMemory(ik sdkmetric.InstrumentKind) metricdata.Temporality { + switch ik { + case sdkmetric.InstrumentKindCounter, sdkmetric.InstrumentKindHistogram: + return metricdata.DeltaTemporality + default: + return metricdata.CumulativeTemporality + } +} + +// newIncludeExcludeFilter returns a Filter that includes attributes +// in the include list and excludes attributes in the excludes list. +// It returns an error if an attribute is in both lists +// +// If IncludeExclude is empty an include-all filter is returned. +func newIncludeExcludeFilter(lists *IncludeExclude) (attribute.Filter, error) { + if lists == nil { + return func(attribute.KeyValue) bool { return true }, nil + } + + included := make(map[attribute.Key]struct{}) + for _, k := range lists.Included { + included[attribute.Key(k)] = struct{}{} + } + excluded := make(map[attribute.Key]struct{}) + for _, k := range lists.Excluded { + if _, ok := included[attribute.Key(k)]; ok { + return nil, fmt.Errorf("attribute cannot be in both include and exclude list: %s", k) + } + excluded[attribute.Key(k)] = struct{}{} + } + return func(kv attribute.KeyValue) bool { + // check if a value is excluded first + if _, ok := excluded[kv.Key]; ok { + return false + } + + if len(included) == 0 { + return true + } + + _, ok := included[kv.Key] + return ok + }, nil +} + +func prometheusReader(ctx context.Context, prometheusConfig *ExperimentalPrometheusMetricExporter) (sdkmetric.Reader, error) { + if prometheusConfig.Host == nil { + return nil, newErrInvalid("host must be specified") + } + if prometheusConfig.Port == nil { + return nil, newErrInvalid("port must be specified") + } + + opts, err := prometheusReaderOpts(prometheusConfig) + if err != nil { + return nil, err + } + + reg := prometheus.NewRegistry() + opts = append(opts, otelprom.WithRegisterer(reg)) + + reader, err := otelprom.New(opts...) + if err != nil { + return nil, fmt.Errorf("error creating otel prometheus exporter: %w", err) + } + + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) + server := http.Server{ + // Timeouts are necessary to make a server resilient to attacks. + // We use values from this example: https://blog.cloudflare.com/exposing-go-on-the-internet/#:~:text=There%20are%20three%20main%20timeouts + ReadTimeout: 5 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 120 * time.Second, + Handler: mux, + } + + // Remove surrounding "[]" from the host definition to allow users to define the host as "[::1]" or "::1". + host := *prometheusConfig.Host + if len(host) > 2 && host[0] == '[' && host[len(host)-1] == ']' { + host = host[1 : len(host)-1] + } + + addr := net.JoinHostPort(host, strconv.Itoa(*prometheusConfig.Port)) + lis, err := net.Listen("tcp", addr) + if err != nil { + return nil, errors.Join( + fmt.Errorf("binding address %s for Prometheus exporter: %w", addr, err), + reader.Shutdown(ctx), + ) + } + + // Only for testing reasons, add the address to the http Server, will not be used. + server.Addr = lis.Addr().String() + + go func() { + if err := server.Serve(lis); err != nil && !errors.Is(err, http.ErrServerClosed) { + otel.Handle(fmt.Errorf("the Prometheus HTTP server exited unexpectedly: %w", err)) + } + }() + + return readerWithServer{reader, &server}, nil +} + +func prometheusReaderOpts(prometheusConfig *ExperimentalPrometheusMetricExporter) ([]otelprom.Option, error) { + var opts []otelprom.Option + if prometheusConfig.WithoutScopeInfo != nil && *prometheusConfig.WithoutScopeInfo { + opts = append(opts, otelprom.WithoutScopeInfo()) + } + // TODO: fix the following to use with translation strategy + // if prometheusConfig.WithoutTypeSuffix != nil && *prometheusConfig.WithoutTypeSuffix { + // opts = append(opts, otelprom.WithoutCounterSuffixes()) //nolint:staticcheck // WithouTypeSuffix is deprecated, but we still need it for backwards compatibility. + // } + // if prometheusConfig.WithoutUnits != nil && *prometheusConfig.WithoutUnits { + // opts = append(opts, otelprom.WithoutUnits()) //nolint:staticcheck // WithouTypeSuffix is deprecated, but we still need it for backwards compatibility. + // } + if prometheusConfig.WithResourceConstantLabels != nil { + f, err := newIncludeExcludeFilter(prometheusConfig.WithResourceConstantLabels) + if err != nil { + return nil, err + } + opts = append(opts, otelprom.WithResourceAsConstantLabels(f)) + } + + return opts, nil +} + +type readerWithServer struct { + sdkmetric.Reader + server *http.Server +} + +func (rws readerWithServer) Shutdown(ctx context.Context) error { + return errors.Join( + rws.Reader.Shutdown(ctx), + rws.server.Shutdown(ctx), + ) +} + +func view(v View) (sdkmetric.View, error) { + if v.Selector == nil { + return nil, errors.New("view: no selector provided") + } + + inst, err := instrument(*v.Selector) + if err != nil { + return nil, err + } + + s, err := stream(v.Stream) + if err != nil { + return nil, err + } + return sdkmetric.NewView(inst, s), nil +} + +func instrument(vs ViewSelector) (sdkmetric.Instrument, error) { + kind, err := instrumentKind(vs.InstrumentType) + if err != nil { + return sdkmetric.Instrument{}, fmt.Errorf("view_selector: %w", err) + } + inst := sdkmetric.Instrument{ + Name: strOrEmpty(vs.InstrumentName), + Unit: strOrEmpty(vs.Unit), + Kind: kind, + Scope: instrumentation.Scope{ + Name: strOrEmpty(vs.MeterName), + Version: strOrEmpty(vs.MeterVersion), + SchemaURL: strOrEmpty(vs.MeterSchemaUrl), + }, + } + + if instrumentIsEmpty(inst) { + return sdkmetric.Instrument{}, errors.New("view_selector: empty selector not supporter") + } + return inst, nil +} + +func stream(vs *ViewStream) (sdkmetric.Stream, error) { + if vs == nil { + return sdkmetric.Stream{}, nil + } + + f, err := newIncludeExcludeFilter(vs.AttributeKeys) + if err != nil { + return sdkmetric.Stream{}, err + } + return sdkmetric.Stream{ + Name: strOrEmpty(vs.Name), + Description: strOrEmpty(vs.Description), + Aggregation: aggregation(vs.Aggregation), + AttributeFilter: f, + }, nil +} + +func aggregation(aggr *Aggregation) sdkmetric.Aggregation { + if aggr == nil { + return nil + } + + if aggr.Base2ExponentialBucketHistogram != nil { + return sdkmetric.AggregationBase2ExponentialHistogram{ + MaxSize: int32OrZero(aggr.Base2ExponentialBucketHistogram.MaxSize), + MaxScale: int32OrZero(aggr.Base2ExponentialBucketHistogram.MaxScale), + // Need to negate because config has the positive action RecordMinMax. + NoMinMax: !boolOrFalse(aggr.Base2ExponentialBucketHistogram.RecordMinMax), + } + } + if aggr.Default != nil { + // TODO: Understand what to set here. + return nil + } + if aggr.Drop != nil { + return sdkmetric.AggregationDrop{} + } + if aggr.ExplicitBucketHistogram != nil { + return sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: aggr.ExplicitBucketHistogram.Boundaries, + // Need to negate because config has the positive action RecordMinMax. + NoMinMax: !boolOrFalse(aggr.ExplicitBucketHistogram.RecordMinMax), + } + } + if aggr.LastValue != nil { + return sdkmetric.AggregationLastValue{} + } + if aggr.Sum != nil { + return sdkmetric.AggregationSum{} + } + return nil +} + +func instrumentKind(vsit *InstrumentType) (sdkmetric.InstrumentKind, error) { + if vsit == nil { + // Equivalent to instrumentKindUndefined. + return instrumentKindUndefined, nil + } + + switch *vsit { + case InstrumentTypeCounter: + return sdkmetric.InstrumentKindCounter, nil + case InstrumentTypeUpDownCounter: + return sdkmetric.InstrumentKindUpDownCounter, nil + case InstrumentTypeHistogram: + return sdkmetric.InstrumentKindHistogram, nil + case InstrumentTypeObservableCounter: + return sdkmetric.InstrumentKindObservableCounter, nil + case InstrumentTypeObservableUpDownCounter: + return sdkmetric.InstrumentKindObservableUpDownCounter, nil + case InstrumentTypeObservableGauge: + return sdkmetric.InstrumentKindObservableGauge, nil + } + + return instrumentKindUndefined, errors.New("instrument_type: invalid value") +} + +func instrumentIsEmpty(i sdkmetric.Instrument) bool { + return i.Name == "" && + i.Description == "" && + i.Kind == instrumentKindUndefined && + i.Unit == "" && + i.Scope == zeroScope +} + +func boolOrFalse(pBool *bool) bool { + if pBool == nil { + return false + } + return *pBool +} + +func int32OrZero(pInt *int) int32 { + if pInt == nil { + return 0 + } + i := *pInt + if i > math.MaxInt32 { + return math.MaxInt32 + } + if i < math.MinInt32 { + return math.MinInt32 + } + return int32(i) +} + +func strOrEmpty(pStr *string) string { + if pStr == nil { + return "" + } + return *pStr +} diff --git a/otelconf/metric_test.go b/otelconf/metric_test.go new file mode 100644 index 00000000000..0a4b623a2ab --- /dev/null +++ b/otelconf/metric_test.go @@ -0,0 +1,1609 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otelconf + +import ( + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "net" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "reflect" + "runtime" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + otelprom "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/sdk/instrumentation" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/resource" + v1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +func TestMeterProvider(t *testing.T) { + tests := []struct { + name string + cfg configOptions + wantProvider metric.MeterProvider + wantErr error + }{ + { + name: "no-meter-provider-configured", + wantProvider: noop.NewMeterProvider(), + }, + { + name: "invalid-provider", + cfg: configOptions{ + opentelemetryConfig: OpenTelemetryConfiguration{ + MeterProvider: &LoggerProviderJson{}, + }, + }, + wantProvider: noop.NewMeterProvider(), + wantErr: newErrInvalid("meter_provider"), + }, + { + name: "error-in-config", + cfg: configOptions{ + opentelemetryConfig: OpenTelemetryConfiguration{ + MeterProvider: &MeterProviderJson{ + Readers: []MetricReader{ + { + Periodic: &PeriodicMetricReader{}, + Pull: &PullMetricReader{}, + }, + }, + }, + }, + }, + wantProvider: noop.NewMeterProvider(), + wantErr: newErrInvalid("must not specify multiple metric reader type"), + }, + { + name: "multiple-errors-in-config", + cfg: configOptions{ + opentelemetryConfig: OpenTelemetryConfiguration{ + MeterProvider: &MeterProviderJson{ + Readers: []MetricReader{ + { + Periodic: &PeriodicMetricReader{}, + Pull: &PullMetricReader{}, + }, + { + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + Console: ConsoleExporter{}, + OTLPGrpc: &OTLPGrpcMetricExporter{}, + }, + }, + }, + }, + }, + }, + }, + wantProvider: noop.NewMeterProvider(), + wantErr: newErrInvalid("must not specify multiple metric reader type"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mp, shutdown, err := meterProvider(tt.cfg, resource.Default()) + require.Equal(t, tt.wantProvider, mp) + assert.ErrorIs(t, err, tt.wantErr) + require.NoError(t, shutdown(t.Context())) + }) + } +} + +func TestMeterProviderOptions(t *testing.T) { + var calls int + srv := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) { + calls++ + })) + defer srv.Close() + + cfg := OpenTelemetryConfiguration{ + MeterProvider: &MeterProviderJson{ + Readers: []MetricReader{{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Endpoint: ptr(srv.URL), + }, + }, + }, + }}, + }, + } + + var buf bytes.Buffer + stdoutmetricExporter, err := stdoutmetric.New(stdoutmetric.WithWriter(&buf)) + require.NoError(t, err) + + res := resource.NewSchemaless(attribute.String("foo", "bar")) + // TODO: re-enable this once NewSDK is added + // sdk, err := NewSDK( + // WithOpenTelemetryConfiguration(cfg), + // WithMeterProviderOptions(sdkmetric.WithReader(sdkmetric.NewPeriodicReader(stdoutmetricExporter))), + // WithMeterProviderOptions(sdkmetric.WithResource(res)), + // ) + mp, shutdown, err := meterProvider(configOptions{ + opentelemetryConfig: cfg, + meterProviderOptions: []sdkmetric.Option{sdkmetric.WithReader(sdkmetric.NewPeriodicReader(stdoutmetricExporter))}, + }, res) + require.NoError(t, err) + defer func() { + assert.NoError(t, shutdown(t.Context())) + // The exporter, which we passed in as an extra option to NewSDK, + // should be wired up to the provider in addition to the + // configuration-based OTLP exporter. + assert.NotZero(t, buf) + assert.Equal(t, 1, calls) // flushed on shutdown + + // Options provided by WithMeterProviderOptions may be overridden + // by configuration, e.g. the resource is always defined via + // configuration. + // TODO: re-enable this once NewSDK is added + // assert.NotContains(t, buf.String(), "foo") + }() + + counter, _ := mp.Meter("test").Int64Counter("counter") + counter.Add(t.Context(), 1) +} + +func TestReader(t *testing.T) { + consoleExporter, err := stdoutmetric.New( + stdoutmetric.WithPrettyPrint(), + ) + require.NoError(t, err) + ctx := t.Context() + otlpGRPCExporter, err := otlpmetricgrpc.New(ctx) + require.NoError(t, err) + otlpHTTPExporter, err := otlpmetrichttp.New(ctx) + require.NoError(t, err) + promExporter, err := otelprom.New() + require.NoError(t, err) + testCases := []struct { + name string + reader MetricReader + args any + wantErrT error + wantReader sdkmetric.Reader + }{ + { + name: "no reader", + wantErrT: newErrInvalid("no valid metric reader"), + }, + { + name: "pull/no-exporter", + reader: MetricReader{ + Pull: &PullMetricReader{}, + }, + wantErrT: newErrInvalid("no valid metric exporter"), + }, + { + name: "pull/prometheus-no-host", + reader: MetricReader{ + Pull: &PullMetricReader{ + Exporter: PullMetricExporter{ + PrometheusDevelopment: &ExperimentalPrometheusMetricExporter{}, + }, + }, + }, + wantErrT: newErrInvalid("host must be specified"), + }, + { + name: "pull/prometheus-no-port", + reader: MetricReader{ + Pull: &PullMetricReader{ + Exporter: PullMetricExporter{ + PrometheusDevelopment: &ExperimentalPrometheusMetricExporter{ + Host: ptr("localhost"), + }, + }, + }, + }, + wantErrT: newErrInvalid("port must be specified"), + }, + { + name: "pull/prometheus", + reader: MetricReader{ + Pull: &PullMetricReader{ + Exporter: PullMetricExporter{ + PrometheusDevelopment: &ExperimentalPrometheusMetricExporter{ + Host: ptr("localhost"), + Port: ptr(0), + WithoutScopeInfo: ptr(true), + // WithoutUnits: ptr(true), + // WithoutTypeSuffix: ptr(true), + WithResourceConstantLabels: &IncludeExclude{ + Included: []string{"include"}, + Excluded: []string{"exclude"}, + }, + }, + }, + }, + }, + wantReader: readerWithServer{promExporter, nil}, + }, + { + name: "periodic/otlp-grpc-exporter", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Endpoint: ptr("http://localhost:4318"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), + }, + { + name: "periodic/otlp-grpc-exporter-with-path", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Endpoint: ptr("http://localhost:4318/path/123"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), + }, + { + name: "periodic/otlp-grpc-good-ca-certificate", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Endpoint: ptr("https://localhost:4317"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + CertificateFile: ptr(filepath.Join("testdata", "ca.crt")), + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), + }, + { + name: "periodic/otlp-grpc-bad-ca-certificate", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Endpoint: ptr("https://localhost:4317"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + CertificateFile: ptr(filepath.Join("testdata", "bad_cert.crt")), + }, + }, + }, + }, + wantErrT: newErrInvalid("tls configuration"), + }, + { + name: "periodic/otlp-grpc-bad-client-certificate", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Endpoint: ptr("localhost:4317"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + ClientCertificateFile: ptr(filepath.Join("testdata", "bad_cert.crt")), + ClientKeyFile: ptr(filepath.Join("testdata", "bad_cert.crt")), + }, + }, + }, + }, + wantErrT: newErrInvalid("tls configuration"), + }, + { + name: "periodic/otlp-grpc-bad-headerslist", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Endpoint: ptr("localhost:4317"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + HeadersList: ptr("==="), + }, + }, + }, + }, + wantErrT: newErrInvalid("invalid headers_list"), + }, + { + name: "periodic/otlp-grpc-exporter-no-endpoint", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Compression: ptr("gzip"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), + }, + { + name: "periodic/otlp-grpc-exporter-socket-endpoint", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Endpoint: ptr("unix:collector.sock"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), + }, + { + name: "periodic/otlp-grpc-exporter-no-scheme", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Endpoint: ptr("localhost:4318"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), + }, + { + name: "periodic/otlp-grpc-invalid-endpoint", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Endpoint: ptr(" "), + Compression: ptr("gzip"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantErrT: newErrInvalid("endpoint parsing failed"), + }, + { + name: "periodic/otlp-grpc-none-compression", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Endpoint: ptr("localhost:4318"), + Compression: ptr("none"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), + }, + { + name: "periodic/otlp-grpc-delta-temporality", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Endpoint: ptr("localhost:4318"), + Compression: ptr("none"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + TemporalityPreference: ptr(ExporterTemporalityPreferenceDelta), + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), + }, + { + name: "periodic/otlp-grpc-cumulative-temporality", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Endpoint: ptr("localhost:4318"), + Compression: ptr("none"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + TemporalityPreference: ptr(ExporterTemporalityPreferenceCumulative), + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), + }, + { + name: "periodic/otlp-grpc-lowmemory-temporality", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Endpoint: ptr("localhost:4318"), + Compression: ptr("none"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + TemporalityPreference: ptr(ExporterTemporalityPreferenceLowMemory), + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpGRPCExporter), + }, + { + name: "periodic/otlp-grpc-invalid-temporality", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Endpoint: ptr("localhost:4318"), + Compression: ptr("none"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + TemporalityPreference: (*ExporterTemporalityPreference)(ptr("invalid")), + }, + }, + }, + }, + wantErrT: newErrInvalid("unsupported temporality preference \"invalid\""), + }, + { + name: "periodic/otlp-grpc-invalid-compression", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPGrpc: &OTLPGrpcMetricExporter{ + Endpoint: ptr("localhost:4318"), + Compression: ptr("invalid"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantErrT: newErrInvalid("unsupported compression \"invalid\""), + }, + { + name: "periodic/otlp-http-exporter", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Endpoint: ptr("http://localhost:4318"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), + }, + { + name: "periodic/otlp-http-good-ca-certificate", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Endpoint: ptr("https://localhost:4317"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + CertificateFile: ptr(filepath.Join("testdata", "ca.crt")), + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), + }, + { + name: "periodic/otlp-http-bad-ca-certificate", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Endpoint: ptr("https://localhost:4317"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + CertificateFile: ptr(filepath.Join("testdata", "bad_cert.crt")), + }, + }, + }, + }, + wantErrT: newErrInvalid("tls configuration"), + }, + { + name: "periodic/otlp-http-bad-client-certificate", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Endpoint: ptr("localhost:4317"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + ClientCertificateFile: ptr(filepath.Join("testdata", "bad_cert.crt")), + ClientKeyFile: ptr(filepath.Join("testdata", "bad_cert.crt")), + }, + }, + }, + }, + wantErrT: newErrInvalid("tls configuration"), + }, + { + name: "periodic/otlp-http-bad-headerslist", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Endpoint: ptr("localhost:4317"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + HeadersList: ptr("==="), + }, + }, + }, + }, + wantErrT: newErrInvalid("invalid headers_list"), + }, + { + name: "periodic/otlp-http-exporter-with-path", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Endpoint: ptr("http://localhost:4318/path/123"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), + }, + { + name: "periodic/otlp-http-exporter-no-endpoint", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Compression: ptr("gzip"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), + }, + { + name: "periodic/otlp-http-exporter-no-scheme", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Endpoint: ptr("localhost:4318"), + Compression: ptr("gzip"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), + }, + { + name: "periodic/otlp-http-invalid-endpoint", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Endpoint: ptr(" "), + Compression: ptr("gzip"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantErrT: newErrInvalid("endpoint parsing failed"), + }, + { + name: "periodic/otlp-http-none-compression", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Endpoint: ptr("localhost:4318"), + Compression: ptr("none"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), + }, + { + name: "periodic/otlp-http-cumulative-temporality", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Endpoint: ptr("localhost:4318"), + Compression: ptr("none"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + TemporalityPreference: ptr(ExporterTemporalityPreferenceCumulative), + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), + }, + { + name: "periodic/otlp-http-lowmemory-temporality", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Endpoint: ptr("localhost:4318"), + Compression: ptr("none"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + TemporalityPreference: ptr(ExporterTemporalityPreferenceLowMemory), + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), + }, + { + name: "periodic/otlp-http-delta-temporality", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Endpoint: ptr("localhost:4318"), + Compression: ptr("none"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + TemporalityPreference: ptr(ExporterTemporalityPreferenceDelta), + }, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(otlpHTTPExporter), + }, + { + name: "periodic/otlp-http-invalid-temporality", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Endpoint: ptr("localhost:4318"), + Compression: ptr("none"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + TemporalityPreference: (*ExporterTemporalityPreference)(ptr("invalid")), + }, + }, + }, + }, + wantErrT: newErrInvalid("unsupported temporality preference \"invalid\""), + }, + { + name: "periodic/otlp-http-invalid-compression", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPHttp: &OTLPHttpMetricExporter{ + Endpoint: ptr("localhost:4318"), + Compression: ptr("invalid"), + Timeout: ptr(1000), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + }, + }, + wantErrT: newErrInvalid("unsupported compression \"invalid\""), + }, + { + name: "periodic/no-exporter", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{}, + }, + }, + wantErrT: newErrInvalid("no valid metric exporter"), + }, + { + name: "periodic/console-exporter", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + Console: ConsoleExporter{}, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader(consoleExporter), + }, + { + name: "periodic/console-exporter-with-extra-options", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Interval: ptr(30_000), + Timeout: ptr(5_000), + Exporter: PushMetricExporter{ + Console: ConsoleExporter{}, + }, + }, + }, + wantReader: sdkmetric.NewPeriodicReader( + consoleExporter, + sdkmetric.WithInterval(30_000*time.Millisecond), + sdkmetric.WithTimeout(5_000*time.Millisecond), + ), + }, + { + name: "periodic/otlp_file", + reader: MetricReader{ + Periodic: &PeriodicMetricReader{ + Exporter: PushMetricExporter{ + OTLPFileDevelopment: &ExperimentalOTLPFileMetricExporter{}, + }, + }, + }, + wantErrT: newErrInvalid("otlp_file/development"), + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + got, err := metricReader(t.Context(), tt.reader) + require.ErrorIs(t, err, tt.wantErrT) + if tt.wantReader == nil { + require.Nil(t, got) + } else { + require.Equal(t, reflect.TypeOf(tt.wantReader), reflect.TypeOf(got)) + var fieldName string + switch reflect.TypeOf(tt.wantReader).String() { + case "*metric.PeriodicReader": + fieldName = "exporter" + case "otelconf.readerWithServer": + fieldName = "Reader" + default: + fieldName = "e" + } + wantExporterType := reflect.Indirect(reflect.ValueOf(tt.wantReader)).FieldByName(fieldName).Elem().Type() + gotExporterType := reflect.Indirect(reflect.ValueOf(got)).FieldByName(fieldName).Elem().Type() + require.Equal(t, wantExporterType.String(), gotExporterType.String()) + require.NoError(t, got.Shutdown(t.Context())) + } + }) + } +} + +func TestView(t *testing.T) { + testCases := []struct { + name string + view View + args any + wantErr string + matchInstrument *sdkmetric.Instrument + wantStream sdkmetric.Stream + wantResult bool + }{ + { + name: "no selector", + wantErr: "view: no selector provided", + }, + { + name: "selector/invalid_type", + view: View{ + Selector: &ViewSelector{ + InstrumentType: (*InstrumentType)(ptr("invalid_type")), + }, + }, + wantErr: "view_selector: instrument_type: invalid value", + }, + { + name: "selector/invalid_type", + view: View{ + Selector: &ViewSelector{}, + }, + wantErr: "view_selector: empty selector not supporter", + }, + { + name: "all selectors match", + view: View{ + Selector: &ViewSelector{ + InstrumentName: ptr("test_name"), + InstrumentType: ptr(InstrumentTypeCounter), + Unit: ptr("test_unit"), + MeterName: ptr("test_meter_name"), + MeterVersion: ptr("test_meter_version"), + MeterSchemaUrl: ptr("test_schema_url"), + }, + }, + matchInstrument: &sdkmetric.Instrument{ + Name: "test_name", + Unit: "test_unit", + Kind: sdkmetric.InstrumentKindCounter, + Scope: instrumentation.Scope{ + Name: "test_meter_name", + Version: "test_meter_version", + SchemaURL: "test_schema_url", + }, + }, + wantStream: sdkmetric.Stream{Name: "test_name", Unit: "test_unit"}, + wantResult: true, + }, + { + name: "all selectors no match name", + view: View{ + Selector: &ViewSelector{ + InstrumentName: ptr("test_name"), + InstrumentType: ptr(InstrumentTypeCounter), + Unit: ptr("test_unit"), + MeterName: ptr("test_meter_name"), + MeterVersion: ptr("test_meter_version"), + MeterSchemaUrl: ptr("test_schema_url"), + }, + }, + matchInstrument: &sdkmetric.Instrument{ + Name: "not_match", + Unit: "test_unit", + Kind: sdkmetric.InstrumentKindCounter, + Scope: instrumentation.Scope{ + Name: "test_meter_name", + Version: "test_meter_version", + SchemaURL: "test_schema_url", + }, + }, + wantStream: sdkmetric.Stream{}, + wantResult: false, + }, + { + name: "all selectors no match unit", + view: View{ + Selector: &ViewSelector{ + InstrumentName: ptr("test_name"), + InstrumentType: ptr(InstrumentTypeCounter), + Unit: ptr("test_unit"), + MeterName: ptr("test_meter_name"), + MeterVersion: ptr("test_meter_version"), + MeterSchemaUrl: ptr("test_schema_url"), + }, + }, + matchInstrument: &sdkmetric.Instrument{ + Name: "test_name", + Unit: "not_match", + Kind: sdkmetric.InstrumentKindCounter, + Scope: instrumentation.Scope{ + Name: "test_meter_name", + Version: "test_meter_version", + SchemaURL: "test_schema_url", + }, + }, + wantStream: sdkmetric.Stream{}, + wantResult: false, + }, + { + name: "all selectors no match kind", + view: View{ + Selector: &ViewSelector{ + InstrumentName: ptr("test_name"), + InstrumentType: (*InstrumentType)(ptr("histogram")), + Unit: ptr("test_unit"), + MeterName: ptr("test_meter_name"), + MeterVersion: ptr("test_meter_version"), + MeterSchemaUrl: ptr("test_schema_url"), + }, + }, + matchInstrument: &sdkmetric.Instrument{ + Name: "test_name", + Unit: "test_unit", + Kind: sdkmetric.InstrumentKindCounter, + Scope: instrumentation.Scope{ + Name: "test_meter_name", + Version: "test_meter_version", + SchemaURL: "test_schema_url", + }, + }, + wantStream: sdkmetric.Stream{}, + wantResult: false, + }, + { + name: "all selectors no match meter name", + view: View{ + Selector: &ViewSelector{ + InstrumentName: ptr("test_name"), + InstrumentType: ptr(InstrumentTypeCounter), + Unit: ptr("test_unit"), + MeterName: ptr("test_meter_name"), + MeterVersion: ptr("test_meter_version"), + MeterSchemaUrl: ptr("test_schema_url"), + }, + }, + matchInstrument: &sdkmetric.Instrument{ + Name: "test_name", + Unit: "test_unit", + Kind: sdkmetric.InstrumentKindCounter, + Scope: instrumentation.Scope{ + Name: "not_match", + Version: "test_meter_version", + SchemaURL: "test_schema_url", + }, + }, + wantStream: sdkmetric.Stream{}, + wantResult: false, + }, + { + name: "all selectors no match meter version", + view: View{ + Selector: &ViewSelector{ + InstrumentName: ptr("test_name"), + InstrumentType: ptr(InstrumentTypeCounter), + Unit: ptr("test_unit"), + MeterName: ptr("test_meter_name"), + MeterVersion: ptr("test_meter_version"), + MeterSchemaUrl: ptr("test_schema_url"), + }, + }, + matchInstrument: &sdkmetric.Instrument{ + Name: "test_name", + Unit: "test_unit", + Kind: sdkmetric.InstrumentKindCounter, + Scope: instrumentation.Scope{ + Name: "test_meter_name", + Version: "not_match", + SchemaURL: "test_schema_url", + }, + }, + wantStream: sdkmetric.Stream{}, + wantResult: false, + }, + { + name: "all selectors no match meter schema url", + view: View{ + Selector: &ViewSelector{ + InstrumentName: ptr("test_name"), + InstrumentType: ptr(InstrumentTypeCounter), + Unit: ptr("test_unit"), + MeterName: ptr("test_meter_name"), + MeterVersion: ptr("test_meter_version"), + MeterSchemaUrl: ptr("test_schema_url"), + }, + }, + matchInstrument: &sdkmetric.Instrument{ + Name: "test_name", + Unit: "test_unit", + Kind: sdkmetric.InstrumentKindCounter, + Scope: instrumentation.Scope{ + Name: "test_meter_name", + Version: "test_meter_version", + SchemaURL: "not_match", + }, + }, + wantStream: sdkmetric.Stream{}, + wantResult: false, + }, + { + name: "with stream", + view: View{ + Selector: &ViewSelector{ + InstrumentName: ptr("test_name"), + Unit: ptr("test_unit"), + }, + Stream: &ViewStream{ + Name: ptr("new_name"), + Description: ptr("new_description"), + AttributeKeys: ptr(IncludeExclude{Included: []string{"foo", "bar"}}), + Aggregation: &Aggregation{Sum: make(SumAggregation)}, + }, + }, + matchInstrument: &sdkmetric.Instrument{ + Name: "test_name", + Description: "test_description", + Unit: "test_unit", + }, + wantStream: sdkmetric.Stream{ + Name: "new_name", + Description: "new_description", + Unit: "test_unit", + Aggregation: sdkmetric.AggregationSum{}, + }, + wantResult: true, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + got, err := view(tt.view) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + require.Nil(t, got) + } else { + require.NoError(t, err) + gotStream, gotResult := got(*tt.matchInstrument) + // Remove filter, since it cannot be compared + gotStream.AttributeFilter = nil + require.Equal(t, tt.wantStream, gotStream) + require.Equal(t, tt.wantResult, gotResult) + } + }) + } +} + +func TestInstrumentType(t *testing.T) { + testCases := []struct { + name string + instType *InstrumentType + wantErr error + wantKind sdkmetric.InstrumentKind + }{ + { + name: "nil", + wantKind: sdkmetric.InstrumentKind(0), + }, + { + name: "counter", + instType: ptr(InstrumentTypeCounter), + wantKind: sdkmetric.InstrumentKindCounter, + }, + { + name: "up_down_counter", + instType: ptr(InstrumentTypeUpDownCounter), + wantKind: sdkmetric.InstrumentKindUpDownCounter, + }, + { + name: "histogram", + instType: ptr(InstrumentTypeHistogram), + wantKind: sdkmetric.InstrumentKindHistogram, + }, + { + name: "observable_counter", + instType: ptr(InstrumentTypeObservableCounter), + wantKind: sdkmetric.InstrumentKindObservableCounter, + }, + { + name: "observable_up_down_counter", + instType: ptr(InstrumentTypeObservableUpDownCounter), + wantKind: sdkmetric.InstrumentKindObservableUpDownCounter, + }, + { + name: "observable_gauge", + instType: ptr(InstrumentTypeObservableGauge), + wantKind: sdkmetric.InstrumentKindObservableGauge, + }, + { + name: "invalid", + instType: (*InstrumentType)(ptr("invalid")), + wantErr: errors.New("instrument_type: invalid value"), + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + got, err := instrumentKind(tt.instType) + if tt.wantErr != nil { + require.Equal(t, tt.wantErr, err) + require.Zero(t, got) + } else { + require.NoError(t, err) + require.Equal(t, tt.wantKind, got) + } + }) + } +} + +func TestAggregation(t *testing.T) { + testCases := []struct { + name string + aggregation *Aggregation + wantAggregation sdkmetric.Aggregation + }{ + { + name: "nil", + wantAggregation: nil, + }, + { + name: "empty", + aggregation: &Aggregation{}, + wantAggregation: nil, + }, + { + name: "Base2ExponentialBucketHistogram empty", + aggregation: &Aggregation{ + Base2ExponentialBucketHistogram: &Base2ExponentialBucketHistogramAggregation{}, + }, + wantAggregation: sdkmetric.AggregationBase2ExponentialHistogram{ + MaxSize: 0, + MaxScale: 0, + NoMinMax: true, + }, + }, + { + name: "Base2ExponentialBucketHistogram", + aggregation: &Aggregation{ + Base2ExponentialBucketHistogram: &Base2ExponentialBucketHistogramAggregation{ + MaxSize: ptr(2), + MaxScale: ptr(3), + RecordMinMax: ptr(true), + }, + }, + wantAggregation: sdkmetric.AggregationBase2ExponentialHistogram{ + MaxSize: 2, + MaxScale: 3, + NoMinMax: false, + }, + }, + { + name: "Default", + aggregation: &Aggregation{ + Default: make(DefaultAggregation), + }, + wantAggregation: nil, + }, + { + name: "Drop", + aggregation: &Aggregation{ + Drop: make(DropAggregation), + }, + wantAggregation: sdkmetric.AggregationDrop{}, + }, + { + name: "ExplicitBucketHistogram empty", + aggregation: &Aggregation{ + ExplicitBucketHistogram: &ExplicitBucketHistogramAggregation{}, + }, + wantAggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: nil, + NoMinMax: true, + }, + }, + { + name: "ExplicitBucketHistogram", + aggregation: &Aggregation{ + ExplicitBucketHistogram: &ExplicitBucketHistogramAggregation{ + Boundaries: []float64{1, 2, 3}, + RecordMinMax: ptr(true), + }, + }, + wantAggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: []float64{1, 2, 3}, + NoMinMax: false, + }, + }, + { + name: "LastValue", + aggregation: &Aggregation{ + LastValue: make(LastValueAggregation), + }, + wantAggregation: sdkmetric.AggregationLastValue{}, + }, + { + name: "Sum", + aggregation: &Aggregation{ + Sum: make(SumAggregation), + }, + wantAggregation: sdkmetric.AggregationSum{}, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + got := aggregation(tt.aggregation) + require.Equal(t, tt.wantAggregation, got) + }) + } +} + +func TestNewIncludeExcludeFilter(t *testing.T) { + testCases := []struct { + name string + attributeKeys *IncludeExclude + wantPass []string + wantFail []string + }{ + { + name: "empty", + attributeKeys: nil, + wantPass: []string{"foo", "bar"}, + wantFail: nil, + }, + { + name: "filter-with-include", + attributeKeys: ptr(IncludeExclude{ + Included: []string{"foo"}, + }), + wantPass: []string{"foo"}, + wantFail: []string{"bar"}, + }, + { + name: "filter-with-exclude", + attributeKeys: ptr(IncludeExclude{ + Excluded: []string{"foo"}, + }), + wantPass: []string{"bar"}, + wantFail: []string{"foo"}, + }, + { + name: "filter-with-include-and-exclude", + attributeKeys: ptr(IncludeExclude{ + Included: []string{"bar"}, + Excluded: []string{"foo"}, + }), + wantPass: []string{"bar"}, + wantFail: []string{"foo"}, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + got, err := newIncludeExcludeFilter(tt.attributeKeys) + require.NoError(t, err) + for _, pass := range tt.wantPass { + require.True(t, got(attribute.KeyValue{Key: attribute.Key(pass), Value: attribute.StringValue("")})) + } + for _, fail := range tt.wantFail { + require.False(t, got(attribute.KeyValue{Key: attribute.Key(fail), Value: attribute.StringValue("")})) + } + }) + } +} + +func TestNewIncludeExcludeFilterError(t *testing.T) { + _, err := newIncludeExcludeFilter(ptr(IncludeExclude{ + Included: []string{"foo"}, + Excluded: []string{"foo"}, + })) + require.Equal(t, fmt.Errorf("attribute cannot be in both include and exclude list: foo"), err) +} + +func TestPrometheusReaderOpts(t *testing.T) { + testCases := []struct { + name string + cfg ExperimentalPrometheusMetricExporter + wantOptions int + }{ + { + name: "no options", + cfg: ExperimentalPrometheusMetricExporter{}, + wantOptions: 0, + }, + { + name: "all set", + cfg: ExperimentalPrometheusMetricExporter{ + WithoutScopeInfo: ptr(true), + // WithoutTypeSuffix: ptr(true), + // WithoutUnits: ptr(true), + WithResourceConstantLabels: &IncludeExclude{}, + }, + wantOptions: 2, + }, + { + name: "all set false", + cfg: ExperimentalPrometheusMetricExporter{ + WithoutScopeInfo: ptr(false), + // WithoutTypeSuffix: ptr(false), + // WithoutUnits: ptr(false), + WithResourceConstantLabels: &IncludeExclude{}, + }, + wantOptions: 1, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + opts, err := prometheusReaderOpts(&tt.cfg) + require.NoError(t, err) + require.Len(t, opts, tt.wantOptions) + }) + } +} + +func TestPrometheusIPv6(t *testing.T) { + tests := []struct { + name string + host string + }{ + { + name: "IPv6", + host: "::1", + }, + { + name: "[IPv6]", + host: "[::1]", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + port := 0 + cfg := ExperimentalPrometheusMetricExporter{ + Host: &tt.host, + Port: &port, + WithoutScopeInfo: ptr(true), + // WithoutTypeSuffix: ptr(true), + // WithoutUnits: ptr(true), + WithResourceConstantLabels: &IncludeExclude{}, + } + + rs, err := prometheusReader(t.Context(), &cfg) + t.Cleanup(func() { + //nolint:usetesting // required to avoid getting a canceled context at cleanup. + require.NoError(t, rs.Shutdown(context.Background())) + }) + require.NoError(t, err) + + hServ := rs.(readerWithServer).server + assert.True(t, strings.HasPrefix(hServ.Addr, "[::1]:")) + + resp, err := http.DefaultClient.Get("http://" + hServ.Addr + "/metrics") + t.Cleanup(func() { + require.NoError(t, resp.Body.Close()) + }) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + }) + } +} + +func Test_otlpGRPCMetricExporter(t *testing.T) { + if runtime.GOOS == "windows" { + // TODO (#7446): Fix the flakiness on Windows. + t.Skip("Test is flaky on Windows.") + } + type args struct { + ctx context.Context + otlpConfig *OTLPGrpcMetricExporter + } + tests := []struct { + name string + args args + grpcServerOpts func() ([]grpc.ServerOption, error) + }{ + { + name: "no TLS config", + args: args{ + ctx: t.Context(), + otlpConfig: &OTLPGrpcMetricExporter{ + Compression: ptr("gzip"), + Timeout: ptr(5000), + Insecure: ptr(true), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + grpcServerOpts: func() ([]grpc.ServerOption, error) { + return []grpc.ServerOption{}, nil + }, + }, + { + name: "with TLS config", + args: args{ + ctx: t.Context(), + otlpConfig: &OTLPGrpcMetricExporter{ + Compression: ptr("gzip"), + Timeout: ptr(5000), + CertificateFile: ptr("testdata/server-certs/server.crt"), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + grpcServerOpts: func() ([]grpc.ServerOption, error) { + opts := []grpc.ServerOption{} + tlsCreds, err := credentials.NewServerTLSFromFile("testdata/server-certs/server.crt", "testdata/server-certs/server.key") + if err != nil { + return nil, err + } + opts = append(opts, grpc.Creds(tlsCreds)) + return opts, nil + }, + }, + { + name: "with TLS config and client key", + args: args{ + ctx: t.Context(), + otlpConfig: &OTLPGrpcMetricExporter{ + Compression: ptr("gzip"), + Timeout: ptr(5000), + CertificateFile: ptr("testdata/server-certs/server.crt"), + ClientKeyFile: ptr("testdata/client-certs/client.key"), + ClientCertificateFile: ptr("testdata/client-certs/client.crt"), + Headers: []NameStringValuePair{ + {Name: "test", Value: ptr("test1")}, + }, + }, + }, + grpcServerOpts: func() ([]grpc.ServerOption, error) { + opts := []grpc.ServerOption{} + cert, err := tls.LoadX509KeyPair("testdata/server-certs/server.crt", "testdata/server-certs/server.key") + if err != nil { + return nil, err + } + caCert, err := os.ReadFile("testdata/ca.crt") + if err != nil { + return nil, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + tlsCreds := credentials.NewTLS(&tls.Config{ + Certificates: []tls.Certificate{cert}, + ClientCAs: caCertPool, + ClientAuth: tls.RequireAndVerifyClientCert, + }) + opts = append(opts, grpc.Creds(tlsCreds)) + return opts, nil + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + n, err := net.Listen("tcp4", "localhost:0") + require.NoError(t, err) + + // We need to manually construct the endpoint using the port on which the server is listening. + // + // n.Addr() always returns 127.0.0.1 instead of localhost. + // But our certificate is created with CN as 'localhost', not '127.0.0.1'. + // So we have to manually form the endpoint as "localhost:". + _, port, err := net.SplitHostPort(n.Addr().String()) + require.NoError(t, err) + tt.args.otlpConfig.Endpoint = ptr("localhost:" + port) + + serverOpts, err := tt.grpcServerOpts() + require.NoError(t, err) + + startGRPCMetricCollector(t, n, serverOpts) + + exporter, err := otlpGRPCMetricExporter(tt.args.ctx, tt.args.otlpConfig) + require.NoError(t, err) + + res, err := resource.New(t.Context()) + require.NoError(t, err) + + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + assert.NoError(collect, exporter.Export(t.Context(), &metricdata.ResourceMetrics{ + Resource: res, + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Metrics: []metricdata.Metrics{ + { + Name: "test-metric", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + }, + }, + }, + })) + }, 10*time.Second, 1*time.Second) + }) + } +} + +// grpcMetricCollector is an OTLP gRPC server that collects all requests it receives. +type grpcMetricCollector struct { + v1.UnimplementedMetricsServiceServer +} + +var _ v1.MetricsServiceServer = (*grpcMetricCollector)(nil) + +// startGRPCMetricCollector returns a *grpcMetricCollector that is listening at the provided +// endpoint. +// +// If endpoint is an empty string, the returned collector will be listening on +// the localhost interface at an OS chosen port. +func startGRPCMetricCollector(t *testing.T, listener net.Listener, serverOptions []grpc.ServerOption) { + srv := grpc.NewServer(serverOptions...) + c := &grpcMetricCollector{} + + v1.RegisterMetricsServiceServer(srv, c) + + errCh := make(chan error, 1) + go func() { errCh <- srv.Serve(listener) }() + + t.Cleanup(func() { + srv.GracefulStop() + if err := <-errCh; err != nil && !errors.Is(err, grpc.ErrServerStopped) { + assert.NoError(t, err) + } + }) +} + +// Export handles the export req. +func (*grpcMetricCollector) Export( + _ context.Context, + _ *v1.ExportMetricsServiceRequest, +) (*v1.ExportMetricsServiceResponse, error) { + return &v1.ExportMetricsServiceResponse{}, nil +}