Skip to content

Commit b6a6b0e

Browse files
committed
Properly streaming response and support backward compatibility as well
Signed-off-by: Somil Jain <[email protected]>
1 parent a37de9d commit b6a6b0e

File tree

5 files changed

+167
-111
lines changed

5 files changed

+167
-111
lines changed

cmd/query/app/apiv3/gateway_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ func (gw *testGateway) execRequest(t *testing.T, url string) ([]byte, int) {
6565

6666
func (*testGateway) verifySnapshot(t *testing.T, body []byte) []byte {
6767
// reformat JSON body with indentation, to make diffing easier
68+
// Note: body may contain multiple newline-separated JSON objects for streaming responses
6869
var data any
6970
require.NoError(t, json.Unmarshal(body, &data), "response: %s", string(body))
7071
body, err := json.MarshalIndent(data, "", " ")
@@ -166,12 +167,8 @@ func (gw *testGateway) getTracesAndVerify(t *testing.T, url string, expectedTrac
166167
require.Equal(t, http.StatusOK, statusCode, "response=%s", string(body))
167168
body = gw.verifySnapshot(t, body)
168169

169-
var jsonArray []json.RawMessage
170-
require.NoError(t, json.Unmarshal(body, &jsonArray), "response should be valid JSON array")
171-
require.NotEmpty(t, jsonArray, "response array should not be empty")
172-
173170
var response api_v3.GRPCGatewayWrapper
174-
parseResponse(t, jsonArray[0], &response)
171+
parseResponse(t, body, &response)
175172

176173
td := response.Result.ToTraces()
177174
assert.Equal(t, 1, td.SpanCount())

cmd/query/app/apiv3/http_gateway.go

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ func (h *HTTPGateway) streamTraces(tracesIter func(yield func([]ptrace.Traces, e
159159

160160
w.Header().Set("Content-Type", "application/json")
161161
w.Header().Set("X-Content-Type-Options", "nosniff")
162+
w.Header().Set("Transfer-Encoding", "chunked")
162163

163164
tracesFound := false
164165
hasError := false
@@ -182,7 +183,6 @@ func (h *HTTPGateway) streamTraces(tracesIter func(yield func([]ptrace.Traces, e
182183
tracesFound = true
183184

184185
for _, td := range traces {
185-
// Combine all resource spans into a single trace response
186186
combinedTrace := ptrace.NewTraces()
187187
resources := td.ResourceSpans()
188188
for i := 0; i < resources.Len(); i++ {
@@ -197,16 +197,10 @@ func (h *HTTPGateway) streamTraces(tracesIter func(yield func([]ptrace.Traces, e
197197

198198
if !headerWritten {
199199
w.WriteHeader(http.StatusOK)
200-
// Write opening bracket for JSON array
201-
if _, err := w.Write([]byte("[")); err != nil {
202-
h.Logger.Error("Failed to write opening bracket", zap.Error(err))
203-
return false
204-
}
205200
headerWritten = true
206201
} else {
207-
// Write comma separator between array elements
208-
if _, err := w.Write([]byte(",")); err != nil {
209-
h.Logger.Error("Failed to write comma separator", zap.Error(err))
202+
if _, err := w.Write([]byte("\n")); err != nil {
203+
h.Logger.Error("Failed to write newline separator", zap.Error(err))
210204
return false
211205
}
212206
}
@@ -234,14 +228,6 @@ func (h *HTTPGateway) streamTraces(tracesIter func(yield func([]ptrace.Traces, e
234228
http.Error(w, string(resp), http.StatusNotFound)
235229
return
236230
}
237-
238-
if headerWritten {
239-
// Write closing bracket for JSON array
240-
if _, err := w.Write([]byte("]")); err != nil {
241-
h.Logger.Error("Failed to write closing bracket", zap.Error(err))
242-
}
243-
flusher.Flush()
244-
}
245231
}
246232

247233
func (*HTTPGateway) marshalResponse(response proto.Message, w http.ResponseWriter) {

cmd/query/app/apiv3/http_gateway_test.go

Lines changed: 114 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package apiv3
55

66
import (
7+
"bytes"
78
"encoding/json"
89
"errors"
910
"fmt"
@@ -392,7 +393,7 @@ func TestHTTPGatewayGetOperationsErrors(t *testing.T) {
392393
assert.Contains(t, w.Body.String(), assert.AnError.Error())
393394
}
394395

395-
// TestHTTPGatewayStreamingResponse verifies that streaming produces valid JSON array
396+
// TestHTTPGatewayStreamingResponse verifies that streaming produces valid NDJSON
396397
func TestHTTPGatewayStreamingResponse(t *testing.T) {
397398
gw := setupHTTPGatewayNoServer(t, "")
398399

@@ -428,13 +429,21 @@ func TestHTTPGatewayStreamingResponse(t *testing.T) {
428429

429430
body := w.Body.String()
430431

431-
// Verify response is valid JSON array
432-
var jsonArray []map[string]any
433-
err = json.Unmarshal([]byte(body), &jsonArray)
434-
require.NoError(t, err, "Response should be valid JSON array")
432+
// Verify response contains newline-separated JSON objects (NDJSON)
433+
lines := bytes.Split([]byte(body), []byte("\n"))
434+
nonEmptyLines := 0
435+
for _, line := range lines {
436+
if len(bytes.TrimSpace(line)) > 0 {
437+
nonEmptyLines++
438+
// Each line should be valid JSON
439+
var obj map[string]any
440+
err := json.Unmarshal(line, &obj)
441+
require.NoError(t, err, "Each line should be valid JSON")
442+
}
443+
}
435444

436-
// We should have individual trace objects in the array
437-
assert.GreaterOrEqual(t, len(jsonArray), 1, "Should have at least 1 trace")
445+
// We should have multiple trace objects
446+
assert.GreaterOrEqual(t, nonEmptyLines, 1, "Should have at least 1 trace")
438447
assert.Contains(t, body, "foobar") // First trace span name
439448
assert.Contains(t, body, "second-span") // Second trace span name
440449
}
@@ -473,10 +482,18 @@ func TestHTTPGatewayStreamingMultipleBatches(t *testing.T) {
473482

474483
body := w.Body.String()
475484

476-
// Verify response is valid JSON
477-
var jsonArray []map[string]any
478-
err = json.Unmarshal([]byte(body), &jsonArray)
479-
require.NoError(t, err, "Response should be valid JSON array")
485+
// Verify response contains valid NDJSON
486+
lines := bytes.Split([]byte(body), []byte("\n"))
487+
nonEmptyLines := 0
488+
for _, line := range lines {
489+
if len(bytes.TrimSpace(line)) > 0 {
490+
nonEmptyLines++
491+
var obj map[string]any
492+
err := json.Unmarshal(line, &obj)
493+
require.NoError(t, err, "Each line should be valid JSON")
494+
}
495+
}
496+
assert.GreaterOrEqual(t, nonEmptyLines, 1, "Should have at least 1 trace")
480497

481498
assert.Contains(t, body, "foobar")
482499
assert.Contains(t, body, "batch2-span")
@@ -587,10 +604,18 @@ func TestHTTPGatewayStreamingWithEmptyBatches(t *testing.T) {
587604
assert.Equal(t, http.StatusOK, w.Code)
588605

589606
body := w.Body.String()
590-
// Verify response is valid JSON
591-
var jsonArray []map[string]any
592-
err = json.Unmarshal([]byte(body), &jsonArray)
593-
require.NoError(t, err, "Response should be valid JSON array")
607+
// Verify response contains valid NDJSON
608+
lines := bytes.Split([]byte(body), []byte("\n"))
609+
nonEmptyLines := 0
610+
for _, line := range lines {
611+
if len(bytes.TrimSpace(line)) > 0 {
612+
nonEmptyLines++
613+
var obj map[string]any
614+
err := json.Unmarshal(line, &obj)
615+
require.NoError(t, err, "Each line should be valid JSON")
616+
}
617+
}
618+
assert.GreaterOrEqual(t, nonEmptyLines, 1, "Should have at least 1 trace")
594619

595620
assert.Contains(t, body, "foobar")
596621
}
@@ -635,10 +660,10 @@ func TestHTTPGatewayFindTracesStreaming(t *testing.T) {
635660
assert.Equal(t, http.StatusOK, w.Code)
636661

637662
body := w.Body.String()
638-
// Verify response is valid JSON
639-
var jsonArray []map[string]any
640-
err = json.Unmarshal([]byte(body), &jsonArray)
641-
require.NoError(t, err, "Response should be valid JSON array")
663+
// Verify response contains valid JSON
664+
var obj map[string]any
665+
err = json.Unmarshal([]byte(body), &obj)
666+
require.NoError(t, err, "Response should be valid JSON")
642667

643668
assert.Contains(t, body, "foobar")
644669
}
@@ -679,8 +704,8 @@ func TestHTTPGatewayStreamingMarshalError(t *testing.T) {
679704
}
680705
gw.router.ServeHTTP(w, r)
681706

682-
// Should log error for failing to write opening bracket (first write operation)
683-
assert.Contains(t, log.String(), "Failed to write opening bracket")
707+
// Should log error for failing to marshal trace chunk (first write operation)
708+
assert.Contains(t, log.String(), "Failed to marshal trace chunk")
684709
}
685710

686711
// failingWriter is a ResponseWriter that simulates write failures
@@ -736,10 +761,18 @@ func TestHTTPGatewayStreamingFirstChunkWrite(t *testing.T) {
736761
assert.Equal(t, http.StatusOK, w.Code)
737762

738763
body := w.Body.String()
739-
// Verify response is valid JSON
740-
var jsonArray []map[string]any
741-
err = json.Unmarshal([]byte(body), &jsonArray)
742-
require.NoError(t, err, "Response should be valid JSON array")
764+
// Verify response contains valid NDJSON
765+
lines := bytes.Split([]byte(body), []byte("\n"))
766+
nonEmptyLines := 0
767+
for _, line := range lines {
768+
if len(bytes.TrimSpace(line)) > 0 {
769+
nonEmptyLines++
770+
var obj map[string]any
771+
err := json.Unmarshal(line, &obj)
772+
require.NoError(t, err, "Each line should be valid JSON")
773+
}
774+
}
775+
assert.GreaterOrEqual(t, nonEmptyLines, 1, "Should have at least 1 trace")
743776

744777
assert.Contains(t, body, "foobar")
745778
assert.Contains(t, body, "span2")
@@ -817,7 +850,7 @@ func TestHTTPGatewayStreamingFallbackNoTraces(t *testing.T) {
817850
}
818851

819852
// TestHTTPGatewayStreamingClientSideParsing verifies that the streaming response
820-
// is valid JSON that clients can parse normally
853+
// is valid NDJSON that clients can parse
821854
func TestHTTPGatewayStreamingClientSideParsing(t *testing.T) {
822855
gw := setupHTTPGateway(t, "")
823856

@@ -872,21 +905,27 @@ func TestHTTPGatewayStreamingClientSideParsing(t *testing.T) {
872905

873906
bodyStr := string(body)
874907

875-
// The response MUST be valid JSON that can be parsed as a whole
876-
var jsonArray []map[string]any
877-
err = json.Unmarshal(body, &jsonArray)
878-
require.NoError(t, err, "Response should be valid JSON array that can be parsed")
879-
880-
// Verify we got multiple trace results
881-
assert.GreaterOrEqual(t, len(jsonArray), 3, "Should have at least 3 trace objects in array")
908+
// The response should be NDJSON - newline-separated JSON objects
909+
lines := bytes.Split(body, []byte("\n"))
910+
validObjects := 0
911+
for _, line := range lines {
912+
if len(bytes.TrimSpace(line)) == 0 {
913+
continue
914+
}
915+
var obj map[string]any
916+
err := json.Unmarshal(line, &obj)
917+
require.NoError(t, err, "Each line should be valid JSON")
882918

883-
// Each element should have a "result" field
884-
for i, obj := range jsonArray {
919+
// Each object should have a "result" field
885920
result, hasResult := obj["result"]
886-
assert.True(t, hasResult, "Element %d should have a 'result' field", i)
887-
assert.NotNil(t, result, "Element %d result should not be nil", i)
921+
assert.True(t, hasResult, "Object should have a 'result' field")
922+
assert.NotNil(t, result, "Object result should not be nil")
923+
validObjects++
888924
}
889925

926+
// Verify we got multiple trace results
927+
assert.GreaterOrEqual(t, validObjects, 3, "Should have at least 3 trace objects")
928+
890929
// Verify all traces are present in the response
891930
assert.Contains(t, bodyStr, "foobar", "Should contain first trace")
892931
assert.Contains(t, bodyStr, "client-test-span", "Should contain second trace")
@@ -968,3 +1007,41 @@ func TestHTTPGatewayStreamingEmptyTracesVsNoTraces(t *testing.T) {
9681007
assert.Contains(t, w.Body.String(), "No traces found")
9691008
})
9701009
}
1010+
1011+
// TestHTTPGatewayStreamingSingleTraceValidJSON verifies that a single trace
1012+
// with streaming support returns valid JSON (not NDJSON)
1013+
func TestHTTPGatewayStreamingSingleTraceValidJSON(t *testing.T) {
1014+
gw := setupHTTPGatewayNoServer(t, "")
1015+
1016+
trace1 := makeTestTrace()
1017+
gw.reader.
1018+
On("GetTraces", matchContext, mock.AnythingOfType("[]tracestore.GetTraceParams")).
1019+
Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
1020+
yield([]ptrace.Traces{trace1}, nil)
1021+
})).Once()
1022+
1023+
r, err := http.NewRequest(http.MethodGet, "/api/v3/traces/1", http.NoBody)
1024+
require.NoError(t, err)
1025+
w := httptest.NewRecorder()
1026+
gw.router.ServeHTTP(w, r)
1027+
1028+
assert.Equal(t, http.StatusOK, w.Code)
1029+
1030+
body := w.Body.String()
1031+
1032+
// Critical: Single trace should be parseable as standard JSON
1033+
var response map[string]any
1034+
err = json.Unmarshal([]byte(body), &response)
1035+
require.NoError(t, err, "Single trace response should be valid JSON")
1036+
1037+
// Should have result field
1038+
result, hasResult := response["result"]
1039+
assert.True(t, hasResult, "Response should have 'result' field")
1040+
assert.NotNil(t, result, "Result should not be nil")
1041+
1042+
// Should NOT contain newlines (not NDJSON)
1043+
assert.NotContains(t, body, "}\n{", "Single trace should not have NDJSON format")
1044+
1045+
// Verify content
1046+
assert.Contains(t, body, "foobar")
1047+
}
Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,25 @@
1-
[
2-
{
3-
"result": {
4-
"resourceSpans": [
5-
{
6-
"resource": {},
7-
"scopeSpans": [
8-
{
9-
"scope": {},
10-
"spans": [
11-
{
12-
"kind": 2,
13-
"name": "foobar",
14-
"spanId": "0000000000000002",
15-
"status": {
16-
"code": 2
17-
},
18-
"traceId": "00000000000000000000000000000001"
19-
}
20-
]
21-
}
22-
]
23-
}
24-
]
25-
}
1+
{
2+
"result": {
3+
"resourceSpans": [
4+
{
5+
"resource": {},
6+
"scopeSpans": [
7+
{
8+
"scope": {},
9+
"spans": [
10+
{
11+
"kind": 2,
12+
"name": "foobar",
13+
"spanId": "0000000000000002",
14+
"status": {
15+
"code": 2
16+
},
17+
"traceId": "00000000000000000000000000000001"
18+
}
19+
]
20+
}
21+
]
22+
}
23+
]
2624
}
27-
]
25+
}

0 commit comments

Comments
 (0)