@@ -4,10 +4,8 @@ import (
44 "context"
55 "encoding/json"
66 "fmt"
7+ "io"
78 "os"
8- "path/filepath"
9- "sort"
10- "strings"
119 "time"
1210
1311 log "github.com/sirupsen/logrus"
@@ -16,117 +14,81 @@ import (
1614 "github.com/open-telemetry/opentelemetry-ebpf-profiler/reporter"
1715)
1816
19- type fileInfo struct {
20- name string
21- timestamp int64
22- id uint64
23- funcName string
24- }
25-
26- // Replay replays the stored data from benchDataDir.
17+ // Replay replays the stored data from replayInputsFrom.
2718// The argument r is the reporter that will receive the replayed data.
28- func Replay (ctx context.Context , benchDataDir string , rep reporter.Reporter ) error {
29- files , err := os .ReadDir ( benchDataDir )
19+ func Replay (ctx context.Context , replayInputsFrom string , rep reporter.Reporter ) error {
20+ stream , err := os .Open ( replayInputsFrom )
3021 if err != nil {
31- return fmt .Errorf ("failed to read directory %s: %v" , benchDataDir , err )
22+ return fmt .Errorf ("failed to open file %s: %v" , replayInputsFrom , err )
3223 }
24+ decoder := json .NewDecoder (stream )
3325
34- fileInfos := make ([]fileInfo , 0 , len (files ))
26+ var m metaInfo
27+ var curTS int64
3528
36- for _ , f := range files {
37- if ! strings .HasSuffix (f .Name (), ".json" ) {
38- continue
29+ for {
30+ if err = decoder .Decode (& m ); err != nil {
31+ // EOF is returned at the end of the stream.
32+ if err != io .EOF {
33+ return err
34+ }
35+ break
3936 }
4037
41- name := f .Name ()
42- // scan name for timestamp, counter and function name
43- var timestamp int64
44- var id uint64
45- var funcName string
46- if _ , err = fmt .Sscanf (name , "%d_%x_%s" , & timestamp , & id , & funcName ); err != nil {
47- log .Errorf ("Failed to parse file name %s: %v" , name , err )
48- continue
38+ if curTS != 0 {
39+ time .Sleep (time .Duration (m .TS - curTS ) * time .Nanosecond )
4940 }
50- funcName = strings .TrimSuffix (funcName , ".json" )
51-
52- fileInfos = append (fileInfos , fileInfo {
53- name : name ,
54- timestamp : timestamp ,
55- id : id ,
56- funcName : funcName ,
57- })
58- }
59-
60- if len (fileInfos ) == 0 {
61- return nil
62- }
63-
64- // Sort fileInfos ascending by ID.
65- sort .Slice (fileInfos , func (i , j int ) bool {
66- return fileInfos [i ].id < fileInfos [j ].id
67- })
41+ curTS = m .TS
6842
69- if fileInfos [0 ].funcName != "Start" {
70- return fmt .Errorf ("first function name must be \" Start\" , instead it is \" %s\" " ,
71- fileInfos [0 ].funcName )
72- }
73-
74- curTS := fileInfos [0 ].timestamp
75-
76- // Replay the stored data
77- for _ , fi := range fileInfos [1 :] {
78- time .Sleep (time .Duration (fi .timestamp - curTS ) * time .Nanosecond )
79- curTS = fi .timestamp
80-
81- switch fi .funcName {
43+ switch m .Name {
8244 case "TraceEvent" :
8345 var v traceEvent
84- if err = dataFromFileInfo ( benchDataDir , fi , & v ); err == nil {
46+ if err = decodeTo ( decoder , & v ); err == nil {
8547 rep .ReportTraceEvent (v .Trace , v .Meta )
8648 }
8749 case "CountForTrace" :
8850 var v countForTrace
89- if err = dataFromFileInfo ( benchDataDir , fi , & v ); err == nil {
51+ if err = decodeTo ( decoder , & v ); err == nil {
9052 rep .ReportCountForTrace (v .TraceHash , v .Count , v .Meta )
9153 }
9254 case "FramesForTrace" :
9355 var v libpf.Trace
94- if err = dataFromFileInfo [libpf.Trace ](benchDataDir , fi , & v ); err == nil {
56+ if err = decodeTo [libpf.Trace ](decoder , & v ); err == nil {
9557 rep .ReportFramesForTrace (& v )
9658 }
9759 case "FallbackSymbol" :
9860 var v fallbackSymbol
99- if err = dataFromFileInfo ( benchDataDir , fi , & v ); err == nil {
100- rep .ReportFallbackSymbol (v . FrameID , v .Symbol )
61+ if err = decodeTo ( decoder , & v ); err == nil {
62+ rep .ReportFallbackSymbol (libpf . NewFrameID ( v . FileID , v . AddressOrLine ) , v .Symbol )
10163 }
102- case "ExectableMetadata " :
64+ case "ExecutableMetadata " :
10365 var v executableMetadata
104- if err = dataFromFileInfo ( benchDataDir , fi , & v ); err == nil {
66+ if err = decodeTo ( decoder , & v ); err == nil {
10567 rep .ExecutableMetadata (context .Background (), v .FileID , v .FileName , v .BuildID ,
10668 v .Interp , nil )
10769 }
10870 case "FrameMetadata" :
10971 var v frameMetadata
110- if err = dataFromFileInfo ( benchDataDir , fi , & v ); err == nil {
72+ if err = decodeTo ( decoder , & v ); err == nil {
11173 rep .FrameMetadata (v .FileID , v .AddressOrLine , v .LineNumber , v .FunctionOffset ,
11274 v .FunctionName , v .FilePath )
11375 }
11476 case "HostMetadata" :
11577 var v hostMetadata
116- if err = dataFromFileInfo ( benchDataDir , fi , & v ); err == nil {
78+ if err = decodeTo ( decoder , & v ); err == nil {
11779 rep .ReportHostMetadata (v .Metadata )
11880 }
11981 case "Metrics" :
12082 var v metrics
121- if err = dataFromFileInfo [metrics ](benchDataDir , fi , & v ); err == nil {
83+ if err = decodeTo [metrics ](decoder , & v ); err == nil {
12284 rep .ReportMetrics (v .Timestamp , v .IDs , v .Values )
12385 }
12486 default :
125- err = fmt .Errorf ("unsupported function name in file %s: %s" , fi . name , fi . funcName )
87+ err = fmt .Errorf ("unsupported function name in file %s: %s" , replayInputsFrom , m . Name )
12688 }
12789
12890 if err != nil {
129- log .Errorf ("Failed to replay data from file %s: %v" , fi . name , err )
91+ log .Errorf ("Failed to replay data from file %s: %v" , m . Name , err )
13092 }
13193
13294 if err = ctx .Err (); err != nil {
@@ -137,16 +99,9 @@ func Replay(ctx context.Context, benchDataDir string, rep reporter.Reporter) err
13799 return nil
138100}
139101
140- func dataFromFileInfo [T any ](dir string , fi fileInfo , data * T ) error {
141- pathName := filepath .Join (dir , fi .name )
142- f , err := os .Open (pathName )
143- if err != nil {
144- return fmt .Errorf ("failed to open file %s: %v" , pathName , err )
145- }
146- defer f .Close ()
147-
148- if err = json .NewDecoder (f ).Decode (data ); err != nil {
149- return fmt .Errorf ("failed to decode JSON from file %s: %v" , pathName , err )
102+ func decodeTo [T any ](decoder * json.Decoder , data * T ) error {
103+ if err := decoder .Decode (data ); err != nil {
104+ return fmt .Errorf ("failed to decode JSON: %v" , err )
150105 }
151106
152107 return nil
0 commit comments