@@ -16,33 +16,48 @@ package exporter
1616
1717import (
1818 "context"
19+ "os"
1920
2021 v2 "github.com/cloudevents/sdk-go/v2"
2122 "github.com/google/uuid"
22- vanussdk "github.com/vanus-labs/sdk/golang"
23+ "github.com/vanus-labs/vanus/client"
24+ "github.com/vanus-labs/vanus/client/pkg/api"
2325 "github.com/vanus-labs/vanus/observability/log"
26+ "github.com/vanus-labs/vanus/pkg/cluster"
27+ "github.com/vanus-labs/vanus/proto/pkg/cloudevents"
28+ "github.com/vanus-labs/vanus/proto/pkg/codec"
29+ "google.golang.org/grpc/credentials/insecure"
2430
2531 "go.opentelemetry.io/otel/attribute"
2632 tracesdk "go.opentelemetry.io/otel/sdk/trace"
2733)
2834
35+ func GetExporter (endpoints []string , eventbus string ) tracesdk.SpanExporter {
36+ spanExporter , err := New (context .Background (), WithEndpoints (endpoints ), WithEventbus (eventbus ))
37+ if err != nil {
38+ log .Error ().Err (err ).Msg ("new span exporter failed" )
39+ os .Exit (- 1 )
40+ }
41+ return spanExporter
42+ }
43+
2944type Option func (* Options )
3045
3146type Options struct {
32- Endpoints string
47+ Endpoints [] string
3348 Eventbus string
3449}
3550
3651func defaultOptions () * Options {
3752 return & Options {
38- Endpoints : "127.0.0.1:8080" ,
53+ Endpoints : [] string {} ,
3954 Eventbus : "event-tracing" ,
4055 }
4156}
4257
43- func WithEndpoint ( endpoint string ) Option {
58+ func WithEndpoints ( endpoints [] string ) Option {
4459 return func (options * Options ) {
45- options .Endpoints = endpoint
60+ options .Endpoints = endpoints
4661 }
4762}
4863
@@ -54,9 +69,8 @@ func WithEventbus(eventbus string) Option {
5469
5570// Exporter exports trace data in the OTLP wire format.
5671type Exporter struct {
57- endpoints string
58- client vanussdk.Client
59- publisher vanussdk.Publisher
72+ endpoints []string
73+ writer api.BusWriter
6074}
6175
6276var _ tracesdk.SpanExporter = (* Exporter )(nil )
@@ -67,47 +81,52 @@ func New(ctx context.Context, opts ...Option) (*Exporter, error) {
6781 apply (defaultOpts )
6882 }
6983
70- clientOpts := & vanussdk.ClientOptions {
71- Endpoint : defaultOpts .Endpoints ,
72- Token : "admin" ,
84+ ctrl := cluster .NewClusterController (defaultOpts .Endpoints , insecure .NewCredentials ())
85+ if err := ctrl .WaitForControllerReady (true ); err != nil {
86+ log .Error (ctx ).Err (err ).Msg ("wait for controller ready timeout" )
87+ return nil , err
7388 }
74-
75- c , err := vanussdk .Connect (clientOpts )
89+ eventbus , err := ctrl .EventbusService ().GetEventbusByName (ctx , "default" , defaultOpts .Eventbus )
7690 if err != nil {
77- panic ("failed to connect to Vanus cluster, error: " + err .Error ())
91+ log .Error (ctx ).Err (err ).Str ("eventbus" , defaultOpts .Eventbus ).Msg ("failed to get eventbus" )
92+ return nil , err
7893 }
7994
80- ebOpt := vanussdk .WithEventbus ("default" , defaultOpts .Eventbus )
95+ c := client .Connect (defaultOpts .Endpoints )
96+ bus := c .Eventbus (ctx , api .WithName (defaultOpts .Eventbus ), api .WithID (eventbus .Id ))
8197 exporter := & Exporter {
8298 endpoints : defaultOpts .Endpoints ,
83- client : c ,
84- publisher : c .Publisher (ebOpt ),
85- }
86- _ , err = c .Controller ().Eventbus ().Get (ctx , ebOpt )
87- if err != nil {
88- panic ("failed to get tracing eventbus, error: " + err .Error ())
99+ writer : bus .Writer (),
89100 }
90101 return exporter , nil
91102}
92103
93104// ExportSpans exports a batch of spans.
94105func (e * Exporter ) ExportSpans (ctx context.Context , ss []tracesdk.ReadOnlySpan ) error {
95- es := make ([]* v2. Event , 0 )
106+ ces := make ([]* cloudevents. CloudEvent , 0 )
96107 for _ , span := range ss {
97- if span .Name () != "EventTracing" {
108+ event := newEvent (span )
109+ if event .Type () != "event-tracing" {
98110 continue
99111 }
100- event := newEvent (span )
101- es = append (es , & event )
112+ eventpb , err := codec .ToProto (& event )
113+ if err != nil {
114+ log .Error (ctx ).Err (err ).Any ("event" , event ).Msg ("failed to proto event" )
115+ return nil
116+ }
117+ ces = append (ces , eventpb )
102118 }
103119
104- if len (es ) == 0 {
120+ if len (ces ) == 0 {
105121 return nil
106122 }
107123
108- err := e .publisher .Publish (ctx , es ... )
124+ ceBatch := & cloudevents.CloudEventBatch {
125+ Events : ces ,
126+ }
127+ _ , err := e .writer .Append (ctx , ceBatch )
109128 if err != nil {
110- log .Error (ctx ).Err (err ).Msg ("failed to publish events to tracing eventbus" )
129+ log .Error (ctx ).Err (err ).Msg ("failed to append events to tracing eventbus" )
111130 return nil
112131 }
113132 return nil
@@ -122,7 +141,6 @@ func newEvent(span tracesdk.ReadOnlySpan) v2.Event {
122141 event := v2 .NewEvent ()
123142 event .SetID (uuid .New ().String ())
124143 event .SetSource (span .Name ())
125- event .SetType (span .SpanKind ().String ())
126144 data := make (map [string ]interface {})
127145 data ["name" ] = span .Name ()
128146 data ["trace_id" ] = span .SpanContext ().TraceID ().String ()
@@ -136,6 +154,9 @@ func newEvent(span tracesdk.ReadOnlySpan) v2.Event {
136154 data [string (attr .Key )] = attr .Value .AsInt64 ()
137155 } else if attr .Value .Type () == attribute .STRING {
138156 data [string (attr .Key )] = attr .Value .AsString ()
157+ if string (attr .Key ) == "type" && attr .Value .AsString () == "event-tracing" {
158+ event .SetType ("event-tracing" )
159+ }
139160 }
140161 }
141162 data ["events" ] = span .Events ()
0 commit comments