-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
🔥 feat: Add StreamResponseBody support for the Client #3711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Thanks for opening this pull request! 🎉 Please check out our contributing guidelines. If you need help or want to chat with us, join us on Discord https://gofiber.io/discord |
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds end-to-end response-body streaming: client-level getters/setters, transport interface & implementations, Response.BodyStream/IsStreaming accessors, Save() streaming paths, and core propagation of request body streams and safe RawResponse swapping. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant Client
participant Transport
participant Core
participant fasthttp as fasthttp.Client
participant Response
Caller->>Client: SetStreamResponseBody(true)
Client->>Transport: SetStreamResponseBody(true)
Transport->>fasthttp: set StreamResponseBody flag
Caller->>Client: Do(request)
Client->>Core: executeRequest()
Core->>fasthttp: Do()/DoDeadline()
fasthttp-->>Core: *fasthttp.Response
rect rgb(230, 245, 255)
Note over Core: Propagate request BodyStream to fasthttp request (if present)
Core->>fasthttp: SetBodyStream(...)
end
Core-->>Client: constructed Response (with RawResponse)
Caller->>Response: IsStreaming()?
Response-->>Caller: true/false
alt Streaming enabled
Caller->>Response: BodyStream()
Response-->>Caller: io.Reader (stream)
Caller->>Response: Save(path or writer)
Response->>Response: io.Copy(BodyStream -> dest)
else Non-streaming
Caller->>Response: Body()
Response-->>Caller: []byte
Caller->>Response: Save(path or writer)
Response->>Response: io.Copy(bytes.Reader -> dest)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Possibly related issues
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (8)
🚧 Files skipped from review as they are similar to previous changes (4)
🧰 Additional context used📓 Path-based instructions (1)**/*.go📄 CodeRabbit inference engine (AGENTS.md)
Files:
🧠 Learnings (4)📚 Learning: 2025-10-16T07:19:52.418ZApplied to files:
📚 Learning: 2024-11-29T12:37:27.581ZApplied to files:
📚 Learning: 2025-09-28T17:39:26.644ZApplied to files:
📚 Learning: 2025-10-16T07:15:26.529ZApplied to files:
🧬 Code graph analysis (2)client/response_test.go (2)
client/core.go (1)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @Abhirup-99, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces a significant enhancement to the client by adding support for streaming HTTP response bodies. This allows consumers to process large responses or server-sent events incrementally, preventing excessive memory consumption. The changes include new configuration options at both the client and individual request levels, along with the necessary logic to facilitate this streaming behavior and comprehensive test coverage.
Highlights
- Client-level Streaming Configuration: Introduced a streamResponseBody field and corresponding SetStreamResponseBody and StreamResponseBody methods to the Client struct, enabling global control over response body streaming.
- Request-level Streaming Override: Added a streamResponseBody field and methods to the Request struct, allowing per-request configuration that can override the client's default streaming setting.
- Dynamic Streaming Execution: Modified the core request execution logic to dynamically set and restore the underlying HTTP client's streaming option based on the active request or client configuration.
- Streamed Body Access: Implemented a BodyStream() method on the Response struct, providing an io.Reader interface for efficient, stream-based consumption of response bodies.
- Comprehensive Test Coverage: Added extensive unit tests to validate the new streaming functionality, covering scenarios like server-sent events, large responses, default settings, overrides, and method chaining.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
|
I will update the documentation if the approach seems sound enough. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request adds support for streaming HTTP response bodies in the client library, enabling responses to be read as streams instead of being fully loaded into memory. This is particularly useful for handling large responses or server-sent events.
- Added streaming configuration at both client and request levels with override capability
- Implemented
BodyStream()method on Response struct to provide streaming access - Added comprehensive test coverage for various streaming scenarios
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| client/client.go | Adds streamResponseBody field and configuration methods to Client struct |
| client/request.go | Adds per-request streaming configuration with client-level fallback |
| client/core.go | Updates request execution to handle streaming option setting and restoration |
| client/response.go | Implements BodyStream method for streaming response body access |
| client/client_test.go | Comprehensive tests for client-level streaming configuration and behavior |
| client/response_test.go | Tests for response streaming functionality including large data handling |
client/core.go
Outdated
| // Set StreamResponseBody option from request setting (falls back to client setting) | ||
| originalStreamResponseBody := c.client.fasthttp.StreamResponseBody | ||
| c.client.fasthttp.StreamResponseBody = c.req.StreamResponseBody() | ||
|
|
||
| var err error | ||
| go func() { | ||
| respv := fasthttp.AcquireResponse() | ||
| defer func() { | ||
| // Restore original StreamResponseBody setting | ||
| c.client.fasthttp.StreamResponseBody = originalStreamResponseBody |
Copilot
AI
Aug 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The StreamResponseBody setting is being modified on the shared client instance without proper synchronization. In a multi-threaded environment, concurrent requests could interfere with each other's streaming configuration. Consider using per-request client instances or adding proper locking mechanisms.
| // Set StreamResponseBody option from request setting (falls back to client setting) | |
| originalStreamResponseBody := c.client.fasthttp.StreamResponseBody | |
| c.client.fasthttp.StreamResponseBody = c.req.StreamResponseBody() | |
| var err error | |
| go func() { | |
| respv := fasthttp.AcquireResponse() | |
| defer func() { | |
| // Restore original StreamResponseBody setting | |
| c.client.fasthttp.StreamResponseBody = originalStreamResponseBody | |
| // Create a shallow copy of the fasthttp.Client to avoid data races on StreamResponseBody | |
| clientCopy := *c.client.fasthttp | |
| clientCopy.StreamResponseBody = c.req.StreamResponseBody() | |
| var err error | |
| go func() { | |
| respv := fasthttp.AcquireResponse() | |
| defer func() { |
client/core.go
Outdated
| // Set StreamResponseBody option from request setting (falls back to client setting) | ||
| originalStreamResponseBody := c.client.fasthttp.StreamResponseBody | ||
| c.client.fasthttp.StreamResponseBody = c.req.StreamResponseBody() | ||
|
|
||
| var err error | ||
| go func() { | ||
| respv := fasthttp.AcquireResponse() | ||
| defer func() { | ||
| // Restore original StreamResponseBody setting | ||
| c.client.fasthttp.StreamResponseBody = originalStreamResponseBody |
Copilot
AI
Aug 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The restoration of the original StreamResponseBody setting in the defer function may not execute in case of panics or early returns, potentially leaving the client in an inconsistent state. Additionally, this restoration happens in a goroutine which could create race conditions with other concurrent requests.
| // Set StreamResponseBody option from request setting (falls back to client setting) | |
| originalStreamResponseBody := c.client.fasthttp.StreamResponseBody | |
| c.client.fasthttp.StreamResponseBody = c.req.StreamResponseBody() | |
| var err error | |
| go func() { | |
| respv := fasthttp.AcquireResponse() | |
| defer func() { | |
| // Restore original StreamResponseBody setting | |
| c.client.fasthttp.StreamResponseBody = originalStreamResponseBody | |
| // Use a per-request copy of the fasthttp.Client to avoid data races on StreamResponseBody | |
| clientCopy := *c.client.fasthttp | |
| clientCopy.StreamResponseBody = c.req.StreamResponseBody() | |
| var err error | |
| go func() { | |
| respv := fasthttp.AcquireResponse() | |
| defer func() { |
|
Warning Gemini encountered an error creating the review. You can try again by commenting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Nitpick comments (4)
client/request.go (1)
594-605: Effective value resolution is correct and robustFalls back to the client setting when unset; default false if no client is attached yet. Consider documenting this precedence in the method comment for discoverability.
client/client_test.go (3)
1750-1767: Optional: make SSE handler actually stream (flush between events) for stronger signalRight now the handler writes three messages and returns. This can still pass even if the server buffers and sends a single chunk. Consider using SetBodyStreamWriter and flushing between writes to exercise true streaming behavior. Example:
app.Get("/sse", func(c fiber.Ctx) error { c.Set("Content-Type", "text/event-stream") c.Set("Cache-Control", "no-cache") c.Set("Connection", "keep-alive") c.Context().SetBodyStreamWriter(func(w *bufio.Writer) { msgs := []string{"data: message 1\n\n", "data: message 2\n\n", "data: message 3\n\n"} for _, m := range msgs { _, _ = w.WriteString(m) _ = w.Flush() time.Sleep(10 * time.Millisecond) } }) return nil })This reduces false positives where streaming isn’t actually occurring.
1836-1841: Strengthen fallback contract when streaming is disabled by reading BodyStreamYou assert BodyStream() is non-nil, but don’t verify it yields the same content as Body() in the fallback path. Read it and compare.
Apply this diff:
body := resp.Body() require.Equal(t, "Hello, World!", string(body)) bodyStream := resp.BodyStream() require.NotNil(t, bodyStream) + streamed, err := io.ReadAll(bodyStream) + require.NoError(t, err) + require.Equal(t, body, streamed)
1854-1883: Please add a follow-up request and Reset() behavior test to guard against per-request overrides leakingYou’ve already verified the getter/inheritance logic for StreamResponseBody, but we’re still missing two important checks in client/client_test.go around lines 1854–1883:
- A second request on the same client after an override, to ensure the per-request override doesn’t persist.
- Exercising
Request.Reset()to confirm it clears any per-request streaming flag and falls back to the client default.Suggested diff to insert after the existing assertions:
req2 := client.R().SetStreamResponseBody(false) require.False(t, req2.StreamResponseBody()) + // Follow-up call: ensure the override didn’t leak into actual behavior. + resp2b, err := req2.Get("http://" + addr + "/test") + require.NoError(t, err) + defer resp2b.Close() + require.Equal(t, "Hello, World!", string(resp2b.Body())) clientWithStreaming := New().SetStreamResponseBody(true) req3 := clientWithStreaming.R() require.True(t, req3.StreamResponseBody()) // Should inherit from client req4 := client.R(). SetStreamResponseBody(true). SetTimeout(time.Second * 5). SetStreamResponseBody(false) require.False(t, req4.StreamResponseBody()) + + // After Reset, per-request override should be cleared and revert to client setting (false). + reqReset := client.R().SetStreamResponseBody(true) + reqReset.Reset() + require.False(t, reqReset.StreamResponseBody())This will catch regressions where the underlying fasthttp flag isn’t properly restored and confirm Reset() clears the override.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
client/client.go(2 hunks)client/client_test.go(1 hunks)client/core.go(1 hunks)client/request.go(3 hunks)client/response.go(1 hunks)client/response_test.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
client/response_test.go (1)
client/client.go (2)
Get(706-708)New(647-652)
client/client_test.go (1)
client/client.go (2)
New(647-652)Get(706-708)
🔇 Additional comments (6)
client/request.go (3)
70-72: Good choice: tri-state per-request overrideUsing a pointer to bool for streamResponseBody cleanly distinguishes unset from explicit true/false. LGTM.
606-612: Chainable setter is fineNo issues.
679-679: Reset clears the override as expectedResetting streamResponseBody to nil ensures subsequent requests inherit the client default. LGTM.
client/client_test.go (3)
1737-1745: LGTM: basic client-level toggle coverage is solidCovers default false, enable, disable. Clear, fast, and deterministic.
1793-1819: LGTM: large-response streaming vs non-streaming parityGood end-to-end validation that BodyStream() delivers identical data to Body() for large payloads, with streaming enabled/disabled. Using 1 MiB is a reasonable balance for CI.
1846-1852: LGTM: chainable method precedenceConfirms last-set-wins across chained setters with unrelated setters in between. Clear and sufficient.
client/client_test.go
Outdated
| buffer := make([]byte, 1024) | ||
| n, err := bodyStream.Read(buffer) | ||
| require.NoError(t, err) | ||
| require.Greater(t, n, 0) | ||
|
|
||
| content := string(buffer[:n]) | ||
| require.Contains(t, content, "data: message 1") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Prevent flaky read assertion: allow io.EOF when n > 0
io.Reader is allowed to return (n > 0, err == io.EOF). Requiring NoError can intermittently fail. Accept EOF if bytes were read.
Apply this diff:
- buffer := make([]byte, 1024)
- n, err := bodyStream.Read(buffer)
- require.NoError(t, err)
- require.Greater(t, n, 0)
+ buffer := make([]byte, 1024)
+ n, err := bodyStream.Read(buffer)
+ require.Greater(t, n, 0)
+ if err != nil {
+ require.ErrorIs(t, err, io.EOF)
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| buffer := make([]byte, 1024) | |
| n, err := bodyStream.Read(buffer) | |
| require.NoError(t, err) | |
| require.Greater(t, n, 0) | |
| content := string(buffer[:n]) | |
| require.Contains(t, content, "data: message 1") | |
| } | |
| buffer := make([]byte, 1024) | |
| n, err := bodyStream.Read(buffer) | |
| require.Greater(t, n, 0) | |
| if err != nil { | |
| require.ErrorIs(t, err, io.EOF) | |
| } | |
| content := string(buffer[:n]) | |
| require.Contains(t, content, "data: message 1") | |
| } |
🤖 Prompt for AI Agents
In client/client_test.go around lines 1781-1788, the test currently requires
NoError on bodyStream.Read which is flaky because io.Reader may return (n>0,
err==io.EOF); change the assertion to allow io.EOF when bytes were read: after
reading and asserting n>0, assert either err==nil or errors.Is(err, io.EOF) (or
use require.NoError when err==nil else require.True(errors.Is(err, io.EOF))),
and add imports for "io" and/or "errors" if not already present.
client/response_test.go
Outdated
| func Test_Response_BodyStream(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| server := startTestServer(t, func(app *fiber.App) { | ||
| app.Get("/stream", func(c fiber.Ctx) error { | ||
| return c.SendString("streaming data") | ||
| }) | ||
| app.Get("/large", func(c fiber.Ctx) error { | ||
| data := make([]byte, 1024) | ||
| for i := range data { | ||
| data[i] = byte('A' + i%26) | ||
| } | ||
| return c.Send(data) | ||
| }) | ||
| }) | ||
| defer server.stop() | ||
|
|
||
| t.Run("basic streaming", func(t *testing.T) { | ||
| client := New().SetDial(server.dial()).SetStreamResponseBody(true) | ||
|
|
||
| resp, err := client.Get("http://example.com/stream") | ||
| require.NoError(t, err) | ||
| defer resp.Close() | ||
| bodyStream := resp.BodyStream() | ||
| require.NotNil(t, bodyStream) | ||
| data, err := io.ReadAll(bodyStream) | ||
| require.NoError(t, err) | ||
| require.Equal(t, "streaming data", string(data)) | ||
| }) | ||
|
|
||
| t.Run("large response streaming", func(t *testing.T) { | ||
| client := New().SetDial(server.dial()).SetStreamResponseBody(true) | ||
| resp, err := client.Get("http://example.com/large") | ||
| require.NoError(t, err) | ||
| defer resp.Close() | ||
| bodyStream := resp.BodyStream() | ||
| require.NotNil(t, bodyStream) | ||
| buffer := make([]byte, 256) | ||
| var totalRead []byte | ||
| for { | ||
| n, err := bodyStream.Read(buffer) | ||
| if n > 0 { | ||
| totalRead = append(totalRead, buffer[:n]...) | ||
| } | ||
| if err == io.EOF { | ||
| break | ||
| } | ||
| require.NoError(t, err) | ||
| } | ||
| require.Equal(t, 1024, len(totalRead)) | ||
| for i := 0; i < 1024; i++ { | ||
| expected := byte('A' + i%26) | ||
| require.Equal(t, expected, totalRead[i]) | ||
| } | ||
| }) | ||
|
|
||
| t.Run("compare with regular body", func(t *testing.T) { | ||
| client1 := New().SetDial(server.dial()) | ||
| resp1, err := client1.Get("http://example.com/stream") | ||
| require.NoError(t, err) | ||
| defer resp1.Close() | ||
| normalBody := resp1.Body() | ||
| client2 := New().SetDial(server.dial()).SetStreamResponseBody(true) | ||
| resp2, err := client2.Get("http://example.com/stream") | ||
| require.NoError(t, err) | ||
| defer resp2.Close() | ||
| streamedBody, err := io.ReadAll(resp2.BodyStream()) | ||
| require.NoError(t, err) | ||
| require.Equal(t, normalBody, streamedBody) | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix flakiness: don’t retain Body() buffer after closing the Response
In “compare with regular body”, normalBody references the internal buffer of resp1. After resp1.Close(), that memory can be reused by the pool, leading to flaky comparisons. Copy the bytes before closing.
t.Run("compare with regular body", func(t *testing.T) {
client1 := New().SetDial(server.dial())
resp1, err := client1.Get("http://example.com/stream")
require.NoError(t, err)
defer resp1.Close()
- normalBody := resp1.Body()
+ normalBody := append([]byte(nil), resp1.Body()...)
client2 := New().SetDial(server.dial()).SetStreamResponseBody(true)
resp2, err := client2.Get("http://example.com/stream")
require.NoError(t, err)
defer resp2.Close()
streamedBody, err := io.ReadAll(resp2.BodyStream())
require.NoError(t, err)
require.Equal(t, normalBody, streamedBody)
})Optional coverage: add an assertion that len(resp.Body()) == 0 when streaming is enabled to codify the documented behavior.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func Test_Response_BodyStream(t *testing.T) { | |
| t.Parallel() | |
| server := startTestServer(t, func(app *fiber.App) { | |
| app.Get("/stream", func(c fiber.Ctx) error { | |
| return c.SendString("streaming data") | |
| }) | |
| app.Get("/large", func(c fiber.Ctx) error { | |
| data := make([]byte, 1024) | |
| for i := range data { | |
| data[i] = byte('A' + i%26) | |
| } | |
| return c.Send(data) | |
| }) | |
| }) | |
| defer server.stop() | |
| t.Run("basic streaming", func(t *testing.T) { | |
| client := New().SetDial(server.dial()).SetStreamResponseBody(true) | |
| resp, err := client.Get("http://example.com/stream") | |
| require.NoError(t, err) | |
| defer resp.Close() | |
| bodyStream := resp.BodyStream() | |
| require.NotNil(t, bodyStream) | |
| data, err := io.ReadAll(bodyStream) | |
| require.NoError(t, err) | |
| require.Equal(t, "streaming data", string(data)) | |
| }) | |
| t.Run("large response streaming", func(t *testing.T) { | |
| client := New().SetDial(server.dial()).SetStreamResponseBody(true) | |
| resp, err := client.Get("http://example.com/large") | |
| require.NoError(t, err) | |
| defer resp.Close() | |
| bodyStream := resp.BodyStream() | |
| require.NotNil(t, bodyStream) | |
| buffer := make([]byte, 256) | |
| var totalRead []byte | |
| for { | |
| n, err := bodyStream.Read(buffer) | |
| if n > 0 { | |
| totalRead = append(totalRead, buffer[:n]...) | |
| } | |
| if err == io.EOF { | |
| break | |
| } | |
| require.NoError(t, err) | |
| } | |
| require.Equal(t, 1024, len(totalRead)) | |
| for i := 0; i < 1024; i++ { | |
| expected := byte('A' + i%26) | |
| require.Equal(t, expected, totalRead[i]) | |
| } | |
| }) | |
| t.Run("compare with regular body", func(t *testing.T) { | |
| client1 := New().SetDial(server.dial()) | |
| resp1, err := client1.Get("http://example.com/stream") | |
| require.NoError(t, err) | |
| defer resp1.Close() | |
| normalBody := resp1.Body() | |
| client2 := New().SetDial(server.dial()).SetStreamResponseBody(true) | |
| resp2, err := client2.Get("http://example.com/stream") | |
| require.NoError(t, err) | |
| defer resp2.Close() | |
| streamedBody, err := io.ReadAll(resp2.BodyStream()) | |
| require.NoError(t, err) | |
| require.Equal(t, normalBody, streamedBody) | |
| }) | |
| } | |
| t.Run("compare with regular body", func(t *testing.T) { | |
| client1 := New().SetDial(server.dial()) | |
| resp1, err := client1.Get("http://example.com/stream") | |
| require.NoError(t, err) | |
| defer resp1.Close() | |
| normalBody := append([]byte(nil), resp1.Body()...) | |
| client2 := New().SetDial(server.dial()).SetStreamResponseBody(true) | |
| resp2, err := client2.Get("http://example.com/stream") | |
| require.NoError(t, err) | |
| defer resp2.Close() | |
| streamedBody, err := io.ReadAll(resp2.BodyStream()) | |
| require.NoError(t, err) | |
| require.Equal(t, normalBody, streamedBody) | |
| }) |
🤖 Prompt for AI Agents
In client/response_test.go around lines 542 to 612, the "compare with regular
body" subtest keeps normalBody as a reference to resp1's internal buffer and
then closes resp1, which can make the comparison flaky because the buffer may be
reused; fix by making a copy of resp1.Body() (e.g., allocate and copy the bytes)
before calling resp1.Close(), then use that copied slice for the equality check;
optionally add an assertion that when streaming is enabled the non-streaming
response's Body() is emptied (len(resp.Body()) == 0) to codify documented
behavior.
|
@Abhirup-99 The tests are failing. You can run them locally by running |
a725b97 to
9c12637
Compare
|
@gaby fixed the failing tests |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3711 +/- ##
==========================================
- Coverage 91.79% 91.41% -0.38%
==========================================
Files 114 113 -1
Lines 11539 11884 +345
==========================================
+ Hits 10592 10864 +272
- Misses 685 750 +65
- Partials 262 270 +8
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
client/client.go (2)
538-557: Reset should also clear streamResponseBody to default falseClient.Reset clears most flags but leaves streamResponseBody unchanged, leading to surprising behavior post-reset.
func (c *Client) Reset() { c.fasthttp = &fasthttp.Client{} c.baseURL = "" c.timeout = 0 c.userAgent = "" c.referer = "" c.retryConfig = nil c.debug = false + c.streamResponseBody = false
58-61: Guard temporary per-request flips of fasthttp.Client.StreamResponseBody with a dedicated mutex on ClientIf core executes by toggling the shared fasthttp.Client.StreamResponseBody between requests, concurrent requests can race. Add a small private mutex dedicated to this toggle. This keeps the public API unchanged and avoids overloading c.mu, which guards other state.
Apply:
type Client struct { logger log.CommonLogger fasthttp *fasthttp.Client @@ - timeout time.Duration - mu sync.RWMutex - debug bool - streamResponseBody bool + timeout time.Duration + mu sync.RWMutex + debug bool + streamResponseBody bool + // protects temporary flips of fasthttp.Client.StreamResponseBody during per-request execution + streamRespMu sync.Mutex }To confirm whether the toggle approach is used (vs. cloning a temp fasthttp.Client), run:
#!/bin/bash # Inspect how StreamResponseBody is applied during execution. rg -n -C2 --type=go 'StreamResponseBody' client/core.go client | sed -n '1,200p'client/response_test.go (1)
598-611: Fix flakiness: copy Body() before closing resp1normalBody aliases resp1’s internal buffer; after Close(), pooled buffers may be reused, causing nondeterministic failures. Copy the bytes before closing.
- defer resp1.Close() - normalBody := resp1.Body() + defer resp1.Close() + normalBody := append([]byte(nil), resp1.Body()...)
🧹 Nitpick comments (2)
client/client.go (2)
439-451: API shape LGTM; document precedence explicitly in the commentThe getter/setter are clear and chainable. Add one line noting that a request-level setting overrides the client-level default for discoverability.
// SetStreamResponseBody enables or disables response body streaming. // When enabled, the response body can be read as a stream using BodyStream() // instead of being fully loaded into memory. This is useful for large responses -// or server-sent events. +// or server-sent events. Note: a per-request setting (Request.SetStreamResponseBody) +// overrides this client-level default. func (c *Client) SetStreamResponseBody(enable bool) *Client { c.streamResponseBody = enable return c }
563-575: Optional: plumb StreamResponseBody through Config for convenienceMany helpers accept Config; adding an optional pointer makes it easy to set streaming per call without constructing a Request manually.
type Config struct { Ctx context.Context //nolint:containedctx // It's needed to be stored in the config. Body any Header map[string]string Param map[string]string Cookie map[string]string PathParam map[string]string FormData map[string]string UserAgent string Referer string File []*File Timeout time.Duration MaxRedirects int + // Optional per-request override; when set, overrides the client default. + StreamResponseBody *bool } @@ if cfg.MaxRedirects != 0 { req.SetMaxRedirects(cfg.MaxRedirects) } + if cfg.StreamResponseBody != nil { + req.SetStreamResponseBody(*cfg.StreamResponseBody) + } + if cfg.Body != nil { req.SetJSON(cfg.Body) return }Also applies to: 577-635
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
client/client.go(2 hunks)client/client_test.go(1 hunks)client/core.go(2 hunks)client/request.go(3 hunks)client/response.go(1 hunks)client/response_test.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- client/response.go
- client/client_test.go
- client/core.go
🔇 Additional comments (5)
client/response_test.go (1)
542-596: Solid coverage for basic and large streaming pathsThe test server and the two subtests validate BodyStream() on small and larger responses. Clear and deterministic.
client/request.go (4)
70-72: *Good choice using bool to represent tri-state (unset vs true/false)This enables request-level override while preserving client default. No action needed.
594-605: Precedence and fallback logic reads correctlyReturns request override when set; otherwise falls back to client-level; defaults to false if no client. Matches the intended design.
606-612: Setter is fine; pointer escape is safeUsing &enable is idiomatic (escapes to heap). Chainable API is consistent with others.
679-679: Reset correctly clears the per-request overrideResetting streamResponseBody to nil ensures the request returns to default behavior. Good.
9c12637 to
4d32c17
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
client/core.go (1)
132-141: Minor duplication: unify Do vs DoRedirects callsitesThe four callsites using
fastHTTPClient.Do/DoRedirectsunder retry and non-retry paths duplicate logic. A tiny helper makes it harder to regress:- if cfg != nil { - err = retry.NewExponentialBackoff(*cfg).Retry(func() error { - if shouldRedirect { - return fastHTTPClient.DoRedirects(reqv, respv, c.req.maxRedirects) - } - return fastHTTPClient.Do(reqv, respv) - }) - } else { - if shouldRedirect { - err = fastHTTPClient.DoRedirects(reqv, respv, c.req.maxRedirects) - } else { - err = fastHTTPClient.Do(reqv, respv) - } - } +call := func() error { + if shouldRedirect { + return fastHTTPClient.DoRedirects(reqv, respv, c.req.maxRedirects) + } + return fastHTTPClient.Do(reqv, respv) +} +if cfg != nil { + err = retry.NewExponentialBackoff(*cfg).Retry(call) +} else { + err = call() +}Low impact, but improves readability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
client/client.go(2 hunks)client/client_test.go(1 hunks)client/core.go(2 hunks)client/request.go(3 hunks)client/response.go(1 hunks)client/response_test.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- client/response.go
- client/client.go
- client/request.go
- client/response_test.go
- client/client_test.go
🧰 Additional context used
🧬 Code graph analysis (1)
client/core.go (1)
client/client.go (1)
Client(32-62)
🔇 Additional comments (3)
client/core.go (3)
84-118: Kudos: avoids flipping shared fasthttp.Client.StreamResponseBodyNot mutating the shared client’s flag sidesteps the prior data race concerns. Good move. If you adopt the dual-client approach suggested above, you’ll keep this benefit while restoring pooling.
149-151: No action needed: CopyTo + ReleaseResponse ordering is safeVerified that fasthttp.Response.CopyTo performs a synchronous copy of the body into resp.RawResponse (rather than merely reassigning a stream pointer), and once CopyTo returns, you can immediately call ReleaseResponse(respv) without affecting the destination’s BodyStream. The existing code at client/core.go (lines 149–151) is correct and requires no changes.
88-118: Verify comprehensive cloning offasthttp.Clientfields
I wasn’t able to locateclient/core.goin the repository, but manually copying each exported field fromfasthttp.Clientis brittle and prone to omissions (e.g., new fields likeMaxConnWaitTimeout,RetryIf, etc., won’t be copied and can silently change behavior). Please verify that you:
- Mirror all exported fields when creating a temporary client for per-request overrides.
- Centralize this logic in a helper (e.g., reflection-based) to automatically pick up new fields.
- Add a unit test that fails if any exported field in
fasthttp.Clientisn’t present in the clone.- Or consider toggling
StreamResponseBodyon the shared client under a lock instead of cloning.Let me know if you’d like a PR-ready helper and reflection-based test snippet for this.
4d32c17 to
28342fa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
client/response.go (2)
92-101: Tighten BodyStream docs: lifetime, connection reuse, and Body() emptiness when streamingThe code is fine. Please clarify the contract and call order to prevent misuse with pooled responses and fasthttp’s connection reuse.
Apply this doc tweak:
-// BodyStream returns the response body as a stream reader. -// Note: When using BodyStream(), the response body is not copied to memory, -// so calling Body() afterwards may return an empty slice. +// BodyStream returns an io.Reader for the response body. +// If streaming was enabled (via Request.SetStreamResponseBody, Client.SetStreamResponseBody, +// or the underlying fasthttp.Client.StreamResponseBody), the reader yields bytes directly from +// the connection without buffering in memory. +// +// Important: +// - Consume the reader before calling Response.Close(). +// - Read the stream to EOF to allow connection reuse by the underlying client. +// - When streaming is enabled, Body() (and helpers like String/JSON/XML/CBOR/Save) may be empty; +// prefer io.Copy with BodyStream() for large responses. +// If streaming is not enabled, BodyStream returns a bytes.Reader over Body(). func (r *Response) BodyStream() io.Reader {
150-165: Save() ignores streaming and closes caller’s writer — stream from BodyStream() and don’t close external writers
- Using Body() makes Save() write an empty file/output when streaming is enabled.
- Closing an io.Writer that the function didn’t open is surprising and can break callers.
Apply this minimal change:
- if _, err = io.Copy(outFile, bytes.NewReader(r.Body())); err != nil { + if _, err = io.Copy(outFile, r.BodyStream()); err != nil { return fmt.Errorf("failed to write response body to file: %w", err) } @@ - case io.Writer: - if _, err := io.Copy(p, bytes.NewReader(r.Body())); err != nil { + case io.Writer: + if _, err := io.Copy(p, r.BodyStream()); err != nil { return fmt.Errorf("failed to write response body to io.Writer: %w", err) } - defer func() { - if pc, ok := p.(io.WriteCloser); ok { - _ = pc.Close() //nolint:errcheck // not needed - } - }() return nilFollow-up (optional): consider adding a short note in Save()’s doc that it respects streaming and doesn’t close external writers.
client/response_test.go (1)
606-629: Avoid retaining Body()’s internal buffer across Close; also assert Body() is empty when streamingTo prevent future flakiness with pooled buffers and to codify the streaming contract, copy the non-streamed body and add an assertion that Body() is empty when streaming is enabled.
Apply:
- defer resp1.Close() - normalBody := resp1.Body() + defer resp1.Close() + normalBody := append([]byte(nil), resp1.Body()...) // copy before pool reuse @@ - streamedBody, err := io.ReadAll(resp2.BodyStream()) + streamedBody, err := io.ReadAll(resp2.BodyStream()) require.NoError(t, err) require.Equal(t, normalBody, streamedBody) + // streaming mode shouldn't buffer Body() + require.Len(t, resp2.Body(), 0)
🧹 Nitpick comments (1)
client/response_test.go (1)
542-629: Add a streaming Save() regression test (uses BodyStream and does not close external writer)Given the Save() fix, add coverage to ensure we don’t regress: the writer must not be closed by Save(), and content must be streamed correctly when streaming is enabled.
Insert this new subtest at the end of Test_Response_BodyStream (before the final closing brace):
+ t.Run("Save streams and does not close external writer", func(t *testing.T) { + t.Parallel() + + server := startTestServer(t, func(app *fiber.App) { + app.Get("/stream", func(c fiber.Ctx) error { + return c.SendString("streaming data") + }) + }) + defer server.stop() + + client := New().SetDial(server.dial()).SetStreamResponseBody(true) + resp, err := client.Get("http://example.com/stream") + require.NoError(t, err) + defer resp.Close() + + type trackingBuf struct { + bytes.Buffer + closed bool + } + // implements io.WriteCloser to verify Save() does not close it + var w = &trackingBuf{} + // wrap with NopCloser-like shim + writer := struct { + io.Writer + io.Closer + }{ + Writer: w, + Closer: io.CloserFunc(func() error { + w.closed = true + return nil + }), + } + + // Save to writer; it must not close it + err = resp.Save(writer) + require.NoError(t, err) + require.Equal(t, "streaming data", w.String()) + require.False(t, w.closed, "Save must not close external writers") + })If you prefer, we can place a similar subtest under Test_Response_Save.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
client/client.go(2 hunks)client/client_test.go(1 hunks)client/core.go(2 hunks)client/request.go(3 hunks)client/response.go(1 hunks)client/response_test.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- client/client.go
- client/request.go
- client/core.go
- client/client_test.go
🧰 Additional context used
🧬 Code graph analysis (1)
client/response_test.go (1)
client/client.go (2)
Get(706-708)New(647-652)
28342fa to
ddbb831
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
client/response.go (2)
92-101: Tighten BodyStream contract; clarify lifetime and fallback behaviorDoc is still a bit vague about ordering/lifetime and the fallback to an in-memory reader. Propose expanding the comment to state the contract explicitly and reduce surprises.
-// BodyStream returns the response body as a stream reader. -// Note: When using BodyStream(), the response body is not copied to memory, -// so calling Body() afterwards may return an empty slice. +// BodyStream returns an io.Reader for the response body. +// +// If streaming was enabled (via Request.SetStreamResponseBody, Client.SetStreamResponseBody, +// or the underlying fasthttp.Client.StreamResponseBody), the reader yields data directly from +// the connection without buffering. Otherwise, it returns a bytes.Reader over Body(). +// +// Important: +// - Consume the reader before calling Response.Close(). +// - When streaming is enabled, Body() (and helpers like String/JSON/XML/CBOR/Save) may observe +// an empty body. Prefer io.Copy with BodyStream() for large responses. func (r *Response) BodyStream() io.Reader { if stream := r.RawResponse.BodyStream(); stream != nil { return stream } // If streaming is not enabled, return a bytes.Reader from the regular body return bytes.NewReader(r.RawResponse.Body()) }
161-172: Make Save() always stream via BodyStream(); also stream when saving to a fileThe writer branch partially streams, but the file path branch still buffers via Body(). Since BodyStream() already falls back to a bytes.Reader when streaming isn’t enabled, you can unconditionally copy from BodyStream() in both branches. This avoids large in-memory buffers and simplifies the code.
@@ - if _, err = io.Copy(outFile, bytes.NewReader(r.Body())); err != nil { + if _, err = io.Copy(outFile, r.BodyStream()); err != nil { return fmt.Errorf("failed to write response body to file: %w", err) } @@ - case io.Writer: - var err error - if r.IsStreaming() { - _, err = io.Copy(p, r.BodyStream()) - } else { - _, err = io.Copy(p, bytes.NewReader(r.Body())) - } - - if err != nil { - return fmt.Errorf("failed to write response body to writer: %w", err) - } - - return nil + case io.Writer: + if _, err := io.Copy(p, r.BodyStream()); err != nil { + return fmt.Errorf("failed to write response body to writer: %w", err) + } + return nilFollow-up: consider adding/adjusting tests to cover Save(filePath) with streaming enabled (e.g., large response) to ensure we don’t regress. I can draft that if helpful.
client/core.go (1)
84-120: Per-request allocating a new fasthttp.Client loses connection pooling and adds substantial latency/TLS overheadConstructing a fresh
*fasthttp.Clientwhenever the per-request streaming flag differs discards the shared connection pool and TLS session cache, causing extra TCP/TLS handshakes and reduced throughput under mixed workloads. This will regress performance for SSE/large downloads that toggle streaming on/off across requests.Consider maintaining two long-lived internal clients (streaming vs non-streaming) on
Clientand routing each request accordingly. Here's a suggested implementation:Add to
client/client.go:+// In Client struct: +streamingFHC *fasthttp.Client // lazily initialized clone with StreamResponseBody=true + +// pickFastHTTPClient returns a pooled client configured for the requested streaming mode. +func (c *Client) pickFastHTTPClient(stream bool) *fasthttp.Client { + c.mu.RLock() + defer c.mu.RUnlock() + + if stream == c.streamResponseBody { + return c.fasthttp + } + // Lazily build the streaming variant once and reuse it. + if stream && c.streamingFHC != nil { + return c.streamingFHC + } + return c.initStreamingClientLocked(stream) +} + +// initStreamingClientLocked clones exported config into a new fasthttp.Client. +// c.mu must be held by the caller. +func (c *Client) initStreamingClientLocked(stream bool) *fasthttp.Client { + o := c.fasthttp + clone := &fasthttp.Client{ + Dial: o.Dial, + DialDualStack: o.DialDualStack, + TLSConfig: o.TLSConfig, + MaxConnsPerHost: o.MaxConnsPerHost, + MaxIdleConnDuration: o.MaxIdleConnDuration, + MaxConnDuration: o.MaxConnDuration, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, + ReadBufferSize: o.ReadBufferSize, + WriteBufferSize: o.WriteBufferSize, + MaxResponseBodySize: o.MaxResponseBodySize, + NoDefaultUserAgentHeader: o.NoDefaultUserAgentHeader, + DisableHeaderNamesNormalizing: o.DisableHeaderNamesNormalizing, + DisablePathNormalizing: o.DisablePathNormalizing, + MaxIdemponentCallAttempts: o.MaxIdemponentCallAttempts, + Name: o.Name, + ConfigureClient: o.ConfigureClient, + StreamResponseBody: stream, + } + if stream { + c.streamingFHC = clone + } + return clone +}Then simplify this code block:
- // Determine which client to use - create a new one if StreamResponseBody differs - var fastHTTPClient *fasthttp.Client - requestStreamResponseBody := c.req.StreamResponseBody() - c.client.mu.RLock() - clientStream := c.client.streamResponseBody - original := c.client.fasthttp - - if requestStreamResponseBody != clientStream { - // Request setting differs from client setting, create a temporary client - - fastHTTPClient = &fasthttp.Client{ - Dial: original.Dial, - DialDualStack: original.DialDualStack, - TLSConfig: original.TLSConfig, - MaxConnsPerHost: original.MaxConnsPerHost, - MaxIdleConnDuration: original.MaxIdleConnDuration, - MaxConnDuration: original.MaxConnDuration, - ReadTimeout: original.ReadTimeout, - WriteTimeout: original.WriteTimeout, - ReadBufferSize: original.ReadBufferSize, - WriteBufferSize: original.WriteBufferSize, - MaxResponseBodySize: original.MaxResponseBodySize, - NoDefaultUserAgentHeader: original.NoDefaultUserAgentHeader, - DisableHeaderNamesNormalizing: original.DisableHeaderNamesNormalizing, - DisablePathNormalizing: original.DisablePathNormalizing, - MaxIdemponentCallAttempts: original.MaxIdemponentCallAttempts, - Name: original.Name, - ConfigureClient: original.ConfigureClient, - - // Request-specific override - StreamResponseBody: requestStreamResponseBody, - } - } else { - fastHTTPClient = original - } - c.client.mu.RUnlock() + // Determine which client to use without sacrificing connection pooling + requestStreamResponseBody := c.req.StreamResponseBody() + fastHTTPClient := c.client.pickFastHTTPClient(requestStreamResponseBody)This keeps pooling intact and avoids repeated allocation while maintaining thread safety.
🧹 Nitpick comments (1)
client/response.go (1)
103-106: IsStreaming semantics are OK; consider documenting that it reflects fasthttp.BodyStream presenceMinor: IsStreaming simply mirrors RawResponse.BodyStream() != nil. A short note in the comment that this reports whether the underlying fasthttp response is streaming (not whether data remains unread) would avoid confusion.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
client/client.go(2 hunks)client/client_test.go(1 hunks)client/core.go(2 hunks)client/request.go(3 hunks)client/response.go(2 hunks)client/response_test.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- client/response_test.go
- client/client_test.go
- client/client.go
- client/request.go
🧰 Additional context used
🧬 Code graph analysis (1)
client/core.go (1)
client/client.go (1)
Client(32-62)
🔇 Additional comments (1)
client/core.go (1)
133-142: LGTM!The routing to use
fastHTTPClientinstead of the hardcoded client is correct and properly handles both retry and non-retry paths with redirects.
|
We will check tomorrow(sickness weekend) |
|
@Abhirup-99 can you check the linting and the last hints |
9b16623 to
d297c79
Compare
|
@ReneWerner87 updated the code. Also, can I work on #3731 ? Checking lint is very difficult because make lint seems to fail for go 1.25. |
|
@Abhirup-99 How exactly is the server streaming a response back to the client during the tests? The creation of a BodyStream by the client is masking this |
client/client_test.go
Outdated
| app.Get("/sse", func(c fiber.Ctx) error { | ||
| c.Set("Content-Type", "text/event-stream") | ||
| c.Set("Cache-Control", "no-cache") | ||
| c.Set("Connection", "keep-alive") | ||
|
|
||
| messages := []string{ | ||
| "data: message 1\n\n", | ||
| "data: message 2\n\n", | ||
| "data: message 3\n\n", | ||
| } | ||
|
|
||
| for _, msg := range messages { | ||
| if _, err := c.WriteString(msg); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use c.SendStreamWriter() for server-sent events here since c.WriteString() does not flush on write.
https://docs.gofiber.io/next/api/ctx/#sendstreamwriter
Then, we can add a flush instruction to ensure that each message is flushed as soon as possible.
For example:
if _, err := w.WriteString(msg); err != nil {
return
}
if _, err := w.Flush(); err != nil {
return
}To ensure that the data is streaming, we could also add a time.Sleep() after each flush and timeout early. This way, we ensure that data is actually streaming and isn't sent as one large chunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@grivera64 it seems like streaming doesn't work properly with sendstreamwriter.
t.Run("server sent events with incremental reads", func(t *testing.T) {
t.Parallel()
server := startTestServer(t, func(app *fiber.App) {
app.Get("/sse", func(c fiber.Ctx) error {
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
c.Set("Transfer-Encoding", "chunked")
return c.SendStreamWriter(func(w *bufio.Writer) {
messages := []string{
"data: event 1\n\n",
"data: event 2\n\n",
"data: event 3\n\n",
"data: event 4\n\n",
}
for i, msg := range messages {
// Write each SSE message
if _, err := w.WriteString(msg); err != nil {
return
}
// Flush to ensure immediate delivery
if err := w.Flush(); err != nil {
return
}
// Add delay to ensure proper streaming behavior
if i < len(messages)-1 {
time.Sleep(10 * time.Millisecond)
}
}
})
})
})
defer server.stop()
client := New().SetDial(server.dial()).SetStreamResponseBody(true)
resp, err := client.Get("http://example.com/sse")
require.NoError(t, err)
defer resp.Close()
bodyStream := resp.BodyStream()
require.NotNil(t, bodyStream)
content, err := io.ReadAll(bodyStream)
require.NoError(t, err)
contentStr := string(content)
require.Contains(t, contentStr, "event 1")
require.Contains(t, contentStr, "event 2")
require.Contains(t, contentStr, "event 3")
require.Contains(t, contentStr, "event 4")
require.Contains(t, contentStr, "data: event")
require.Contains(t, contentStr, "\n\n")
})
this doesn't seem to work, but droping streamresponsebody and check for body makes this work.
Let me know what can be done here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@grivera64 can you check this out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@grivera64 can you check this out?
@Abhirup-99 It seems that Client currently uses fasthttp.Response.CopyTo() on client/core.go:113 which does not copy bodyStream's from fasthttp.Response to client.Response.RawResponse. This causes resp.BodyStream() to always return nil and lead to an EOF error whenever StreamResponseBody is set to true.
Lines 85 to 116 in c1f7afe
| go func() { | |
| respv := fasthttp.AcquireResponse() | |
| defer func() { | |
| fasthttp.ReleaseRequest(reqv) | |
| fasthttp.ReleaseResponse(respv) | |
| }() | |
| if cfg != nil { | |
| // Use an exponential backoff retry strategy. | |
| err = retry.NewExponentialBackoff(*cfg).Retry(func() error { | |
| if c.req.maxRedirects > 0 && (string(reqv.Header.Method()) == fiber.MethodGet || string(reqv.Header.Method()) == fiber.MethodHead) { | |
| return c.client.fasthttp.DoRedirects(reqv, respv, c.req.maxRedirects) | |
| } | |
| return c.client.fasthttp.Do(reqv, respv) | |
| }) | |
| } else { | |
| if c.req.maxRedirects > 0 && (string(reqv.Header.Method()) == fiber.MethodGet || string(reqv.Header.Method()) == fiber.MethodHead) { | |
| err = c.client.fasthttp.DoRedirects(reqv, respv, c.req.maxRedirects) | |
| } else { | |
| err = c.client.fasthttp.Do(reqv, respv) | |
| } | |
| } | |
| if atomic.CompareAndSwapInt32(&done, 0, 1) { | |
| if err != nil { | |
| errCh <- err | |
| return | |
| } | |
| respv.CopyTo(resp.RawResponse) | |
| errCh <- nil | |
| } | |
| }() |
To fix this, I found a solution, though it isn't the cleanest approach (maybe @gaby or @ReneWerner87 are more familiar with these changes from PR #1986, and can help with a cleaner approach):
git diff
diff --git a/client/client_test.go b/client/client_test.go
index eb9662fe..edd4b1f4 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -1,6 +1,7 @@
package client
import (
+ "bufio"
"context"
"crypto/tls"
"encoding/hex"
@@ -1758,14 +1759,20 @@ func Test_Client_StreamResponseBody_ServerSentEvents(t *testing.T) {
"data: message 2\n\n",
"data: message 3\n\n",
}
-
- for _, msg := range messages {
- if _, err := c.WriteString(msg); err != nil {
- return err
+ return c.SendStreamWriter(func(w *bufio.Writer) {
+ for i, msg := range messages {
+ if _, err := w.WriteString(msg); err != nil {
+ return
+ }
+ if err := w.Flush(); err != nil {
+ return
+ }
+ // Add delay to ensure proper streaming behavior
+ if i < len(messages)-1 {
+ time.Sleep(10 * time.Millisecond)
+ }
}
- }
-
- return nil
+ })
})
})
defer func() { require.NoError(t, app.Shutdown()) }()
diff --git a/client/core.go b/client/core.go
index 38db93bf..593b1185 100644
--- a/client/core.go
+++ b/client/core.go
@@ -84,10 +84,7 @@ func (c *core) execFunc() (*Response, error) {
var err error
go func() {
respv := fasthttp.AcquireResponse()
- defer func() {
- fasthttp.ReleaseRequest(reqv)
- fasthttp.ReleaseResponse(respv)
- }()
+ defer fasthttp.ReleaseRequest(reqv)
if cfg != nil {
// Use an exponential backoff retry strategy.
@@ -108,9 +105,11 @@ func (c *core) execFunc() (*Response, error) {
if atomic.CompareAndSwapInt32(&done, 0, 1) {
if err != nil {
errCh <- err
+ fasthttp.ReleaseResponse(respv)
return
}
- respv.CopyTo(resp.RawResponse)
+ fasthttp.ReleaseResponse(resp.RawResponse)
+ resp.RawResponse = respv
errCh <- nil
}
}()
Essentially, this directly uses the fasthttp.Response as our resp.RawResponse rather than copying the data, as we will lose the body stream if we attempt to copy the response. If a failure happened, do not use the fasthttp.Response and simply release it. If it succeeded, release the current resp.RawResponse and replace it with the successful fasthttp.Response.
On a side note: While fiber uses c.fasthttp.DoXXX() internally, c.fasthttp.Get(...) seems to not be compatible with StreamResponseBody: true either from my testing. I am not sure if this is intended or a bug, but it may be a good idea to write an issue about this in the upstream valyala/fasthttp repo for in case it's not an intended feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some more testing, the git diff I had recommended above seems to cause some race conditions when trying it out with #3799., let's not go with that solution.
We might have to solve this in the upstream fasthttp repository, since there doesn't seem to be any interrupt logic in fashttp.Client.Do() that we can use currently. Right now, any interruption leads to discarding the full response.
We can also try to do something similar to how app.Test() deals with this, but that may require re-writing some client logic from scratch.
Whichever way we try, we would need to have a way to access the raw client net.Conn to be able to close it after the timeout duration and read any partial response that the connection read before it closed
d297c79 to
d310851
Compare
Fixed this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
client/response.go (1)
155-158: Bug: Save(path) ignores streaming and writes 0 bytes when streaming is enabled. Use BodyStream().When streaming is active, Body() may be empty. Save must copy from BodyStream() for file paths too.
- if _, err = io.Copy(outFile, bytes.NewReader(r.Body())); err != nil { + if _, err = io.Copy(outFile, r.BodyStream()); err != nil { return fmt.Errorf("failed to write response body to file: %w", err) }
🧹 Nitpick comments (5)
client/client_test.go (1)
1790-1819: Optional: also assert Body() is empty in streaming mode.To codify the documented behavior, after reading BodyStream(), assert len(resp.Body()) == 0 when streaming is enabled.
client/response.go (2)
92-101: Clarify BodyStream docs: note fallback and lifetime.Doc claims “not copied to memory”, which isn’t true in fallback. Make it explicit and state consumption/Close order.
-// BodyStream returns the response body as a stream reader. -// Note: When using BodyStream(), the response body is not copied to memory, -// so calling Body() afterwards may return an empty slice. +// BodyStream returns an io.Reader for the response body. +// - When streaming is enabled (underlying fasthttp.Response has a BodyStream), the reader yields data directly from the connection without buffering. In this case, calling Body() afterwards may return an empty slice. +// - When streaming is not enabled, it returns a bytes.Reader over the in-memory body. +// Consume the reader before calling Response.Close().
161-171: Simplify writer path: always copy from BodyStream().IsStreaming() check is unnecessary; BodyStream() already picks the correct source.
- var err error - if r.IsStreaming() { - _, err = io.CopyBuffer(p, r.BodyStream(), nil) - } else { - _, err = io.Copy(p, bytes.NewReader(r.Body())) - } + _, err := io.CopyBuffer(p, r.BodyStream(), nil) if err != nil { return fmt.Errorf("failed to write response body to writer: %w", err) }client/response_test.go (2)
1114-1136: Optional: assert IsStreaming implies Body() drained.After reading BodyStream() in streaming mode, consider asserting len(resp.Body()) == 0 to lock in the contract.
458-505: Add a Save(path) streaming test to cover file output.Given Save(path) previously read from Body(), add a subtest enabling streaming that writes to a file path and validates content/size.
I can draft the subtest snippet if helpful.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
client/client.go(1 hunks)client/client_test.go(1 hunks)client/request.go(1 hunks)client/response.go(2 hunks)client/response_test.go(2 hunks)docs/client/rest.md(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- docs/client/rest.md
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Format Go code using gofumpt (enforced viamake format)
Ensure code passes golangci-lint checks (enforced viamake lint)
Optimize struct field alignment using betteralign (enforced viamake betteralign)
Modernize Go code using gopls modernize (enforced viamake modernize)
Files:
client/client.goclient/response.goclient/client_test.goclient/response_test.goclient/request.go
🧬 Code graph analysis (2)
client/response_test.go (2)
client/client.go (1)
New(646-651)client/response.go (1)
Response(19-25)
client/request.go (1)
client/client.go (1)
Client(32-61)
🔇 Additional comments (7)
client/client_test.go (4)
1736-1745: LGTM: toggling API works as expected.
1747-1788: SSE test isn’t streaming; stream from server and assert progressive reads (allow EOF when n > 0).The handler buffers all messages and the client reads once. This can pass without true streaming. Use a stream writer with flushes and read incrementally; accept io.EOF when bytes were read.
Apply this diff within the test:
func Test_Client_StreamResponseBody_ServerSentEvents(t *testing.T) { t.Parallel() - app, addr := startTestServerWithPort(t, func(app *fiber.App) { + app, addr := startTestServerWithPort(t, func(app *fiber.App) { app.Get("/sse", func(c fiber.Ctx) error { c.Set("Content-Type", "text/event-stream") c.Set("Cache-Control", "no-cache") c.Set("Connection", "keep-alive") - - messages := []string{ - "data: message 1\n\n", - "data: message 2\n\n", - "data: message 3\n\n", - } - - for _, msg := range messages { - if _, err := c.WriteString(msg); err != nil { - return err - } - } - - return nil + c.Context().SetBodyStreamWriter(func(w *bufio.Writer) { + msgs := []string{"data: message 1\n\n", "data: message 2\n\n", "data: message 3\n\n"} + for i, m := range msgs { + _, _ = w.WriteString(m) + _ = w.Flush() + if i < len(msgs)-1 { + time.Sleep(5 * time.Millisecond) + } + } + }) + return nil }) }) @@ bodyStream := resp.BodyStream() require.NotNil(t, bodyStream) - buffer := make([]byte, 1024) - n, err := bodyStream.Read(buffer) - require.NoError(t, err) - require.Positive(t, n) - - content := string(buffer[:n]) - require.Contains(t, content, "data: message 1") + buffer := make([]byte, 1024) + n, err := bodyStream.Read(buffer) + require.Greater(t, n, 0) + if err != nil { + require.ErrorIs(t, err, io.EOF) + } + content := string(buffer[:n]) + require.Contains(t, content, "data: message 1")Also add:
import "bufio"at the top imports.
1821-1841: LGTM: non-streaming fallback validated.
1843-1852: LGTM: chaining preserves last writer wins.client/response_test.go (2)
704-726: Fix flakiness: copy resp1.Body() before closing.normalBody aliases internal buffer; after resp1.Close() it may be reused. Copy bytes first.
- normalBody := resp1.Body() + normalBody := append([]byte(nil), resp1.Body()...)
779-828: SSE “incremental reads” should actually stream and read progressively.The test name says incremental, but it uses io.ReadAll. Stream with a writer and read line by line.
- data, err := io.ReadAll(bodyStream) - require.NoError(t, err) - - content := string(data) - require.Contains(t, content, "event 1") - require.Contains(t, content, "event 2") - require.Contains(t, content, "event 3") - require.Contains(t, content, "event 4") - require.Contains(t, content, "data: event") - require.Contains(t, content, "\n\n") + r := bufio.NewReader(bodyStream) + for i := 1; i <= 4; i++ { + line, err := r.ReadString('\n') + require.NoError(t, err) + require.Contains(t, line, fmt.Sprintf("data: event %d", i)) + _, _ = r.ReadString('\n') // consume blank line + }And change the handler to stream and flush:
- messages := []string{ + messages := []string{ "data: event 1\n\n", "data: event 2\n\n", "data: event 3\n\n", "data: event 4\n\n", } - - for i, msg := range messages { - if _, err := c.WriteString(msg); err != nil { - return err - } - c.Response().ImmediateHeaderFlush = true - if i < len(messages)-1 { - time.Sleep(5 * time.Millisecond) - } - } - return nil + c.Context().SetBodyStreamWriter(func(w *bufio.Writer) { + for i, msg := range messages { + _, _ = w.WriteString(msg) + _ = w.Flush() + if i < len(messages)-1 { + time.Sleep(5 * time.Millisecond) + } + } + }) + return nilAdd:
import "bufio"if not already imported.
client/request.go (1)
47-65: LGTM: field reordering only; keep betteralign happy.No behavioral change observed. Ensure make betteralign/make format pass on this struct layout.
d310851 to
72ece4b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
client/client_test.go (1)
1750-1788: SSE test isn’t actually streaming; stream from server and assert incremental reads (allow EOF with partial read)Use SendStreamWriter with flushes and gate the stream to deterministically verify incremental delivery. Also, io.Reader may return (n>0, EOF); don’t require NoError in that case.
Apply this diff:
- app, addr := startTestServerWithPort(t, func(app *fiber.App) { + firstSent := make(chan struct{}) + cont := make(chan struct{}) + app, addr := startTestServerWithPort(t, func(app *fiber.App) { app.Get("/sse", func(c fiber.Ctx) error { c.Set("Content-Type", "text/event-stream") c.Set("Cache-Control", "no-cache") c.Set("Connection", "keep-alive") - messages := []string{ - "data: message 1\n\n", - "data: message 2\n\n", - "data: message 3\n\n", - } - - for _, msg := range messages { - if _, err := c.WriteString(msg); err != nil { - return err - } - } - - return nil + return c.SendStreamWriter(func(w *bufio.Writer) { + _, _ = w.WriteString("data: message 1\n\n") + _ = w.Flush() + close(firstSent) // signal first event flushed + <-cont // wait for client to continue + _, _ = w.WriteString("data: message 2\n\n") + _ = w.Flush() + _, _ = w.WriteString("data: message 3\n\n") + _ = w.Flush() + }) }) }) @@ bodyStream := resp.BodyStream() require.NotNil(t, bodyStream) - buffer := make([]byte, 1024) - n, err := bodyStream.Read(buffer) - require.NoError(t, err) - require.Positive(t, n) - - content := string(buffer[:n]) - require.Contains(t, content, "data: message 1") + // Wait for first event to be flushed, then read it + select { + case <-firstSent: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for first SSE message") + } + r := bufio.NewReader(bodyStream) + line, err := r.ReadString('\n') + require.Contains(t, line, "data: message 1") + _, _ = r.ReadString('\n') // consume blank line + + // Unblock the server to send the rest, then read and verify + close(cont) + line, _ = r.ReadString('\n') + require.Contains(t, line, "data: message 2") + _, _ = r.ReadString('\n') + line, err = r.ReadString('\n') + require.Contains(t, line, "data: message 3") + if err != nil { + require.ErrorIs(t, err, io.EOF) + }Add this import at the top of the file if missing:
import "bufio"client/response.go (2)
92-101: Tighten BodyStream contractClarify that the reader is tied to the underlying connection when streaming is enabled, must be consumed before Close(), and Body() may be empty after consumption. Current comment is fine but could be more explicit.
155-176: Simplify Save(): always copy from BodyStream()BodyStream() already returns the correct reader for both streaming and non‑streaming. Remove the IsStreaming branches and copy from BodyStream() in both file and writer paths.
Apply this diff:
- if r.IsStreaming() { - _, err = io.CopyBuffer(outFile, r.BodyStream(), nil) - } else { - _, err = io.Copy(outFile, bytes.NewReader(r.Body())) - } + _, err = io.CopyBuffer(outFile, r.BodyStream(), nil) @@ - var err error - if r.IsStreaming() { - _, err = io.CopyBuffer(p, r.BodyStream(), nil) - } else { - _, err = io.Copy(p, bytes.NewReader(r.Body())) - } + _, err := io.CopyBuffer(p, r.BodyStream(), nil)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
client/client.go(1 hunks)client/client_test.go(1 hunks)client/request.go(1 hunks)client/response.go(2 hunks)client/response_test.go(2 hunks)docs/client/rest.md(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- docs/client/rest.md
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Format Go code using gofumpt (enforced viamake format)
Ensure code passes golangci-lint checks (enforced viamake lint)
Optimize struct field alignment using betteralign (enforced viamake betteralign)
Modernize Go code using gopls modernize (enforced viamake modernize)
Files:
client/client_test.goclient/client.goclient/response.goclient/request.goclient/response_test.go
🧬 Code graph analysis (3)
client/client_test.go (1)
client/client.go (2)
New(646-651)Get(705-707)
client/request.go (1)
client/client.go (1)
Client(32-61)
client/response_test.go (2)
client/client.go (1)
New(646-651)client/response.go (1)
Response(19-25)
🔇 Additional comments (3)
client/response_test.go (2)
714-726: Avoid retaining RawResponse buffer after Close()normalBody points at resp1’s internal buffer, which may be reused after Close(), causing flaky equality. Copy it before closing.
Apply this diff:
- normalBody := resp1.Body() + normalBody := append([]byte(nil), resp1.Body()...)
779-828: “Incremental reads” test uses io.ReadAll; stream and read progressivelyUse a streaming writer with flushes and read line-by-line to actually assert incremental availability.
Apply this diff:
- server := startTestServer(t, func(app *fiber.App) { + server := startTestServer(t, func(app *fiber.App) { app.Get("/sse", func(c fiber.Ctx) error { c.Set("Content-Type", "text/event-stream") c.Set("Cache-Control", "no-cache") c.Set("Connection", "keep-alive") - - messages := []string{ - "data: event 1\n\n", - "data: event 2\n\n", - "data: event 3\n\n", - "data: event 4\n\n", - } - - for i, msg := range messages { - if _, err := c.WriteString(msg); err != nil { - return err - } - c.Response().ImmediateHeaderFlush = true - if i < len(messages)-1 { - time.Sleep(5 * time.Millisecond) - } - } - return nil + return c.SendStreamWriter(func(w *bufio.Writer) { + for i := 1; i <= 4; i++ { + _, _ = w.WriteString(fmt.Sprintf("data: event %d\n\n", i)) + _ = w.Flush() + if i < 4 { + time.Sleep(5 * time.Millisecond) + } + } + }) }) }) @@ - data, err := io.ReadAll(bodyStream) - require.NoError(t, err) - - content := string(data) - require.Contains(t, content, "event 1") - require.Contains(t, content, "event 2") - require.Contains(t, content, "event 3") - require.Contains(t, content, "event 4") - require.Contains(t, content, "data: event") - require.Contains(t, content, "\n\n") + r := bufio.NewReader(bodyStream) + for i := 1; i <= 4; i++ { + line, err := r.ReadString('\n') + require.NoError(t, err) + require.Contains(t, line, fmt.Sprintf("data: event %d", i)) + _, _ = r.ReadString('\n') // consume blank line + }Add this import at the top of the file if missing:
import "bufio"client/request.go (1)
47-65: Ensure exported struct field ordering remains unchanged or documented
- Reordering exported
Requestfields breaks unkeyed literals and downstream binary layouts; revert to the original order or document this breaking change in the v3 release notes.- Run
make betteralignto validate struct alignment.
| // StreamResponseBody returns the current StreamResponseBody setting. | ||
| func (c *Client) StreamResponseBody() bool { | ||
| return c.fasthttp.StreamResponseBody | ||
| } | ||
|
|
||
| // SetStreamResponseBody enables or disables response body streaming. | ||
| // When enabled, the response body can be read as a stream using BodyStream() | ||
| // instead of being fully loaded into memory. This is useful for large responses | ||
| // or server-sent events. | ||
| func (c *Client) SetStreamResponseBody(enable bool) *Client { | ||
| c.fasthttp.StreamResponseBody = enable | ||
| return c | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
🧩 Analysis chain
Data race: unsynchronized toggling of fasthttp.Client.StreamResponseBody
SetStreamResponseBody writes to c.fasthttp.StreamResponseBody without locking; concurrent requests can read/flip this flag in core and cause races. Guard both getter and setter with the existing client mutex.
Apply this diff:
-// StreamResponseBody returns the current StreamResponseBody setting.
-func (c *Client) StreamResponseBody() bool {
- return c.fasthttp.StreamResponseBody
-}
+// StreamResponseBody returns the current StreamResponseBody setting.
+func (c *Client) StreamResponseBody() bool {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return c.fasthttp.StreamResponseBody
+}
// SetStreamResponseBody enables or disables response body streaming.
// When enabled, the response body can be read as a stream using BodyStream()
// instead of being fully loaded into memory. This is useful for large responses
// or server-sent events.
func (c *Client) SetStreamResponseBody(enable bool) *Client {
- c.fasthttp.StreamResponseBody = enable
+ c.mu.Lock()
+ c.fasthttp.StreamResponseBody = enable
+ c.mu.Unlock()
return c
}Additionally, ensure per-request temporary flips in core are serialized with a dedicated mutex on Client (e.g., streamRespMu) to avoid races with setters.
🏁 Script executed:
#!/bin/bash
# Locate the Client struct definition to inspect fields (including mu and fasthttp)
rg -nP '^type\s+Client\s+struct' -C10 --type go client/client.goLength of output: 643
Add synchronization to StreamResponseBody getter/setter
StreamResponseBody() and SetStreamResponseBody() directly read/write c.fasthttp.StreamResponseBody without locking, causing data races under concurrent use. Add a sync.RWMutex (e.g. streamRespMu) to Client, wrap the getter in RLock()/RUnlock() and the setter in Lock()/Unlock().
🤖 Prompt for AI Agents
In client/client.go around lines 438 to 450, StreamResponseBody() and
SetStreamResponseBody() access c.fasthttp.StreamResponseBody without
synchronization; add a sync.RWMutex field to Client (name it e.g. streamRespMu),
wrap the getter with streamRespMu.RLock()/RUnlock() and the setter with
streamRespMu.Lock()/Unlock(), and keep returning *Client from the setter; ensure
the mutex is a value in the Client struct so it is available to both methods and
prevents data races under concurrent use.
|
@gaby can you check this out? |
client/client_test.go
Outdated
| app.Get("/sse", func(c fiber.Ctx) error { | ||
| c.Set("Content-Type", "text/event-stream") | ||
| c.Set("Cache-Control", "no-cache") | ||
| c.Set("Connection", "keep-alive") | ||
|
|
||
| messages := []string{ | ||
| "data: message 1\n\n", | ||
| "data: message 2\n\n", | ||
| "data: message 3\n\n", | ||
| } | ||
|
|
||
| for _, msg := range messages { | ||
| if _, err := c.WriteString(msg); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@grivera64 can you check this out?
@Abhirup-99 It seems that Client currently uses fasthttp.Response.CopyTo() on client/core.go:113 which does not copy bodyStream's from fasthttp.Response to client.Response.RawResponse. This causes resp.BodyStream() to always return nil and lead to an EOF error whenever StreamResponseBody is set to true.
Lines 85 to 116 in c1f7afe
| go func() { | |
| respv := fasthttp.AcquireResponse() | |
| defer func() { | |
| fasthttp.ReleaseRequest(reqv) | |
| fasthttp.ReleaseResponse(respv) | |
| }() | |
| if cfg != nil { | |
| // Use an exponential backoff retry strategy. | |
| err = retry.NewExponentialBackoff(*cfg).Retry(func() error { | |
| if c.req.maxRedirects > 0 && (string(reqv.Header.Method()) == fiber.MethodGet || string(reqv.Header.Method()) == fiber.MethodHead) { | |
| return c.client.fasthttp.DoRedirects(reqv, respv, c.req.maxRedirects) | |
| } | |
| return c.client.fasthttp.Do(reqv, respv) | |
| }) | |
| } else { | |
| if c.req.maxRedirects > 0 && (string(reqv.Header.Method()) == fiber.MethodGet || string(reqv.Header.Method()) == fiber.MethodHead) { | |
| err = c.client.fasthttp.DoRedirects(reqv, respv, c.req.maxRedirects) | |
| } else { | |
| err = c.client.fasthttp.Do(reqv, respv) | |
| } | |
| } | |
| if atomic.CompareAndSwapInt32(&done, 0, 1) { | |
| if err != nil { | |
| errCh <- err | |
| return | |
| } | |
| respv.CopyTo(resp.RawResponse) | |
| errCh <- nil | |
| } | |
| }() |
To fix this, I found a solution, though it isn't the cleanest approach (maybe @gaby or @ReneWerner87 are more familiar with these changes from PR #1986, and can help with a cleaner approach):
git diff
diff --git a/client/client_test.go b/client/client_test.go
index eb9662fe..edd4b1f4 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -1,6 +1,7 @@
package client
import (
+ "bufio"
"context"
"crypto/tls"
"encoding/hex"
@@ -1758,14 +1759,20 @@ func Test_Client_StreamResponseBody_ServerSentEvents(t *testing.T) {
"data: message 2\n\n",
"data: message 3\n\n",
}
-
- for _, msg := range messages {
- if _, err := c.WriteString(msg); err != nil {
- return err
+ return c.SendStreamWriter(func(w *bufio.Writer) {
+ for i, msg := range messages {
+ if _, err := w.WriteString(msg); err != nil {
+ return
+ }
+ if err := w.Flush(); err != nil {
+ return
+ }
+ // Add delay to ensure proper streaming behavior
+ if i < len(messages)-1 {
+ time.Sleep(10 * time.Millisecond)
+ }
}
- }
-
- return nil
+ })
})
})
defer func() { require.NoError(t, app.Shutdown()) }()
diff --git a/client/core.go b/client/core.go
index 38db93bf..593b1185 100644
--- a/client/core.go
+++ b/client/core.go
@@ -84,10 +84,7 @@ func (c *core) execFunc() (*Response, error) {
var err error
go func() {
respv := fasthttp.AcquireResponse()
- defer func() {
- fasthttp.ReleaseRequest(reqv)
- fasthttp.ReleaseResponse(respv)
- }()
+ defer fasthttp.ReleaseRequest(reqv)
if cfg != nil {
// Use an exponential backoff retry strategy.
@@ -108,9 +105,11 @@ func (c *core) execFunc() (*Response, error) {
if atomic.CompareAndSwapInt32(&done, 0, 1) {
if err != nil {
errCh <- err
+ fasthttp.ReleaseResponse(respv)
return
}
- respv.CopyTo(resp.RawResponse)
+ fasthttp.ReleaseResponse(resp.RawResponse)
+ resp.RawResponse = respv
errCh <- nil
}
}()
Essentially, this directly uses the fasthttp.Response as our resp.RawResponse rather than copying the data, as we will lose the body stream if we attempt to copy the response. If a failure happened, do not use the fasthttp.Response and simply release it. If it succeeded, release the current resp.RawResponse and replace it with the successful fasthttp.Response.
On a side note: While fiber uses c.fasthttp.DoXXX() internally, c.fasthttp.Get(...) seems to not be compatible with StreamResponseBody: true either from my testing. I am not sure if this is intended or a bug, but it may be a good idea to write an issue about this in the upstream valyala/fasthttp repo for in case it's not an intended feature.
72ece4b to
250b1c2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
client/response_test.go (2)
704-726: Fix buffer retention issue before closing Response.Line 718 stores a reference to
resp1.Body()innormalBody, then Line 717 defersresp1.Close(). After closing, the internal buffer can be reused by the pool, leading to flaky comparisons.Apply this diff to copy the bytes before closing:
client1 := New().SetDial(server.dial()) resp1, err := client1.Get("http://example.com/stream") require.NoError(t, err) defer resp1.Close() - normalBody := resp1.Body() + normalBody := append([]byte(nil), resp1.Body()...)
779-828: Test name says "incremental reads" but doesn't test incremental behavior.The subtest uses
io.ReadAll(line 818) which reads the entire response at once, not incrementally. Additionally, the server handler doesn't useSetBodyStreamWriterto truly stream the response—it usesImmediateHeaderFlushwhich may not force streaming semantics.To properly test incremental reads:
- Use a buffered reader and read line-by-line or in chunks
- Use
SetBodyStreamWriteron the server to ensure true streaming:c.Context().SetBodyStreamWriter(func(w *bufio.Writer) { messages := []string{ "data: event 1\n\n", "data: event 2\n\n", "data: event 3\n\n", "data: event 4\n\n", } for i, msg := range messages { _, _ = w.WriteString(msg) _ = w.Flush() if i < len(messages)-1 { time.Sleep(5 * time.Millisecond) } } })Then read incrementally:
r := bufio.NewReader(bodyStream) for i := 1; i <= 4; i++ { line, err := r.ReadString('\n') require.NoError(t, err) require.Contains(t, line, fmt.Sprintf("data: event %d", i)) _, _ = r.ReadString('\n') // consume blank line }
🧹 Nitpick comments (2)
client/response_test.go (2)
728-777: Consider using SetBodyStreamWriter for guaranteed streaming behavior.The test uses
ImmediateHeaderFlush(line 740) which may not guarantee true streaming. For more robust streaming tests, consider usingSetBodyStreamWriter:c.Context().SetBodyStreamWriter(func(w *bufio.Writer) { chunks := []string{"chunk1", "chunk2", "chunk3"} for i, chunk := range chunks { _, _ = w.WriteString(chunk) _ = w.Flush() if i < len(chunks)-1 { time.Sleep(10 * time.Millisecond) } } })This ensures the server actually streams rather than buffering and sending at once.
830-876: Consider using SetBodyStreamWriter for true progressive streaming.Similar to the chunked test, this test uses
ImmediateHeaderFlushwhich may not force true streaming behavior. UsingSetBodyStreamWriterwould guarantee progressive writes and better validate the streaming feature.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
client/client.go(1 hunks)client/client_test.go(1 hunks)client/request.go(1 hunks)client/response.go(2 hunks)client/response_test.go(2 hunks)docs/client/rest.md(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- client/client_test.go
🚧 Files skipped from review as they are similar to previous changes (3)
- docs/client/rest.md
- client/client.go
- client/response.go
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Format Go code using gofumpt (Make target:make format).
Run golangci-lint for linting (Make target:make lint).
Rungo vetas part of audit to catch suspicious constructs (Make target:make audit).
Optimize struct field alignment with betteralign (Make target:make betteralign).
Applygopls modernizeto update code patterns (Make target:make modernize).
Files:
client/response_test.goclient/request.go
**/*_test.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*_test.go: Run the test suite with gotestsum (Make target:make test).
Run benchmarks withgo test(Make target:make benchmark).
Files:
client/response_test.go
🧬 Code graph analysis (2)
client/response_test.go (2)
client/client.go (1)
New(745-750)client/response.go (1)
Response(19-25)
client/request.go (1)
client/client.go (1)
Client(37-67)
🔇 Additional comments (11)
client/response_test.go (8)
6-15: LGTM: Standard library imports support new test scenarios.The new imports are appropriate for the expanded streaming tests and error handling validation.
546-604: LGTM: Good coverage of streaming Save() scenarios.The tests correctly verify that
Save()doesn't close the writer and properly handles copy errors. The error handling and test helpers are well-implemented.
606-638: LGTM: Well-designed test helpers.Both helper types are focused and appropriate for their use cases. The implementations correctly simulate the behaviors needed for testing.
643-663: LGTM: Basic streaming test is well-structured.The test correctly enables streaming, verifies the stream is not nil, and validates the content.
665-702: LGTM: Progressive reading properly validates streaming behavior.This test correctly reads the response in chunks using a buffer and validates both the total size and the data pattern. The loop with explicit buffer reads confirms streaming semantics.
878-917: LGTM: Comprehensive coverage of streaming edge cases.These subtests effectively cover:
- Connection interruption and cleanup (lines 878-917)
- Large response streaming with multiple reads (lines 919-981)
- Stream object identity guarantees (lines 983-1023)
The tests properly validate streaming semantics through progressive reads and appropriate assertions.
Also applies to: 919-981, 983-1023
1026-1109: LGTM: Fallback behavior is thoroughly tested.The subtests correctly verify that
BodyStream()returns abytes.Readerwhen streaming is disabled, and that this fallback works correctly for empty, regular, and large responses. The progressive reading in the "large non-streaming response" test confirms the Reader interface works as expected.
1111-1184: LGTM: Complete coverage of IsStreaming() API.The tests thoroughly validate the
IsStreaming()method and confirm thatBodyStream()provides a consistent interface regardless of whether streaming is enabled or disabled. The comparison between streaming and non-streaming clients (lines 1163-1182) is particularly valuable.client/request.go (3)
605-615: LGTM: DisablePathNormalizing methods follow established patterns.The getter and setter are well-implemented. The setter correctly propagates the setting to the underlying
RawRequest.URI().DisablePathNormalizing(line 613), ensuring consistency across the request chain.
670-697: LGTM: Reset method correctly clears the new field.Line 683 properly resets
disablePathNormalizingtofalse, maintaining the Reset contract of returning the Request to its default state.
46-68: Confirmmake betteralignwas run per project guidelines.The Makefile includes a betteralign target using v0.7.2. The Request struct has been extensively reordered and the new
disablePathNormalizingfield correctly mirrors the Client struct layout.However, I cannot verify in this environment whether the field alignment is actually optimal. Please confirm you ran
make betteralign(or included it in your build process) to ensure the struct meets the project's optimization standards, as required by the coding guidelines.
|
@codex review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
client/client.go
Outdated
| // StreamResponseBody returns the current StreamResponseBody setting. | ||
| func (c *Client) StreamResponseBody() bool { | ||
| return c.fasthttp.StreamResponseBody | ||
| } | ||
|
|
||
| // SetStreamResponseBody enables or disables response body streaming. | ||
| // When enabled, the response body can be read as a stream using BodyStream() | ||
| // instead of being fully loaded into memory. This is useful for large responses | ||
| // or server-sent events. | ||
| func (c *Client) SetStreamResponseBody(enable bool) *Client { | ||
| c.fasthttp.StreamResponseBody = enable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reference undefined fasthttp client in streaming helpers
StreamResponseBody() and SetStreamResponseBody() try to read and write c.fasthttp.StreamResponseBody, but Client has no fasthttp field—only a transport interface. This code does not compile (*Client has no field or method fasthttp) and therefore the new streaming API cannot be built. The implementation should operate on the existing transport (e.g., via FasthttpClient()/HostClient()/LBClient()) or store the required field on the struct.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
client/client.go
Outdated
| // StreamResponseBody returns the current StreamResponseBody setting. | ||
| func (c *Client) StreamResponseBody() bool { | ||
| return c.fasthttp.StreamResponseBody | ||
| } | ||
|
|
||
| // SetStreamResponseBody enables or disables response body streaming. | ||
| // When enabled, the response body can be read as a stream using BodyStream() | ||
| // instead of being fully loaded into memory. This is useful for large responses | ||
| // or server-sent events. | ||
| func (c *Client) SetStreamResponseBody(enable bool) *Client { | ||
| c.fasthttp.StreamResponseBody = enable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix compile error when toggling streaming flag
Both StreamResponseBody() and SetStreamResponseBody() attempt to read and write c.fasthttp.StreamResponseBody, but Client has no fasthttp field—only the transport interface. As written, the file does not build (*Client has no field or method fasthttp). To expose this option you must obtain the underlying fasthttp client via the transport (e.g. FasthttpClient(), HostClient(), LBClient()) and set their StreamResponseBody flags instead of referencing a nonexistent field.
Useful? React with 👍 / 👎.
|
PR #3830 got merged, and seems to start some work on refactoring the core execution logic. @Abhirup-99 Can you merge the most recent changes to this PR and retry the Stream Writer change I requested some time ago? I still think the CopyToResponse method from fasthttp still doesn't copy the stream body in this case, but I think we can start experimenting with how to add this via fasthttp or Fiber if we merge the most recent updates to this PR. |
f8dce32 to
efd9696
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
client/core.go (1)
100-114: Per-request stream flag never applied
execFunccallsc.client.Do/DoRedirectswith whatever streaming mode the transport already has, so a request-levelSetStreamResponseBody(...)override is silently ignored. Before invoking the transport you need to inspect the request’s override, temporarily flip the transport’sStreamResponseBodyflag, and restore it afterwards (ideally under a small mutex to avoid interleaving with other in-flight requests). Without that handoff the new per-request API is a no-op.ctx_test.go (2)
3828-3832: Duplicate test case in Subdomains table; remove the second copyThe "offset equal len returns empty" case appears twice back-to-back. Drop the duplicate to avoid redundant execution.
@@ - { - name: "offset equal len returns empty", - host: "john.doe.com", - offset: []int{3}, - want: []string{}, - },
4312-4314: Duplicate '/test2' entry in testCasesThe same test case is listed twice. Remove one to keep the table minimal and unambiguous.
@@ - {url: "/test2", body: "type DefaultCtx struct", contentDisposition: "attachment"},
♻️ Duplicate comments (2)
client/response_test.go (2)
779-828: Read SSE progressively; avoid io.ReadAll for “incremental reads”Use a bufio.Reader and consume event by event to validate streaming semantics.
@@ - bodyStream := resp.BodyStream() - require.NotNil(t, bodyStream) - - data, err := io.ReadAll(bodyStream) - require.NoError(t, err) - - content := string(data) - require.Contains(t, content, "event 1") - require.Contains(t, content, "event 2") - require.Contains(t, content, "event 3") - require.Contains(t, content, "event 4") - require.Contains(t, content, "data: event") - require.Contains(t, content, "\n\n") + bodyStream := resp.BodyStream() + require.NotNil(t, bodyStream) + r := bufio.NewReader(bodyStream) + for i := 1; i <= 4; i++ { + line, err := r.ReadString('\n') + require.NoError(t, err) + require.Contains(t, line, fmt.Sprintf("data: event %d", i)) + _, _ = r.ReadString('\n') // blank line + }Add import if missing:
@@ -import ( +import ( + "bufio"
714-726: Fix flakiness: copy Body() before closing the pooled response
The slice points to pooled memory; copy it before Close.- defer resp1.Close() - normalBody := resp1.Body() + defer resp1.Close() + normalBody := append([]byte(nil), resp1.Body()...)
🧹 Nitpick comments (3)
ctx_test.go (1)
5585-5596: Release acquired context to avoid pool leakage in testsAdd a deferred ReleaseCtx to mirror prevailing patterns and keep pools healthy under -race.
- c := app.AcquireCtx(&fasthttp.RequestCtx{}) + c := app.AcquireCtx(&fasthttp.RequestCtx{}) + defer app.ReleaseCtx(c)client/response.go (2)
92-101: Tighten BodyStream contract docsClarify lifetime/ordering and fallback to bytes.Reader for non-streaming responses.
-// BodyStream returns the response body as a stream reader. -// Note: When using BodyStream(), the response body is not copied to memory, -// so calling Body() afterwards may return an empty slice. +// BodyStream returns an io.Reader for the response body. +// When the underlying client/request enabled streaming, the reader yields data +// directly from the connection without buffering; consume it before Close(). +// When streaming is disabled, BodyStream returns a bytes.Reader over Body(). +// Note: If you read via BodyStream while streaming is enabled, Body() may be empty.
155-160: Simplify Save: always copy from BodyStream()BodyStream already returns a bytes.Reader when not streaming; drop the IsStreaming branches and copy once.
- if r.IsStreaming() { - _, err = io.Copy(outFile, r.BodyStream()) - } else { - _, err = io.Copy(outFile, bytes.NewReader(r.Body())) - } + _, err = io.Copy(outFile, r.BodyStream()) @@ - var err error - if r.IsStreaming() { - _, err = io.Copy(p, r.BodyStream()) - } else { - _, err = io.Copy(p, bytes.NewReader(r.Body())) - } + _, err := io.Copy(p, r.BodyStream())[gaby suggested io.CopyBuffer; using plain io.Copy keeps it simple and idiomatic. Based on learnings]
Also applies to: 169-177
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
client/client.go(1 hunks)client/core.go(2 hunks)client/core_test.go(1 hunks)client/response.go(3 hunks)client/response_test.go(2 hunks)client/transport.go(4 hunks)ctx_test.go(23 hunks)docs/client/rest.md(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- docs/client/rest.md
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Apply formatting using gofumpt (Make target: format)
Optimize struct field alignment using betteralign (Make target: betteralign)
Modernize code using gopls modernize (Make target: modernize)
Files:
client/client.goclient/core_test.goclient/response.goclient/response_test.goctx_test.goclient/core.goclient/transport.go
🧠 Learnings (14)
📓 Common learnings
Learnt from: ReneWerner87
Repo: gofiber/fiber PR: 0
File: :0-0
Timestamp: 2024-12-01T10:28:36.011Z
Learning: Feature request #3224 has been created to add support for square bracket notation and comma-separated values in multipart form data in Fiber, while maintaining binary data transfer capabilities. This would bring parity with the existing form-urlencoded functionality.
📚 Learning: 2025-10-16T07:19:52.418Z
Learnt from: grivera64
Repo: gofiber/fiber PR: 3807
File: adapter_test.go:118-144
Timestamp: 2025-10-16T07:19:52.418Z
Learning: In the Fiber codebase, the linter does not allow `require` assertions from within HTTP handlers (including net/http-style handlers). Use `t.Fatalf`, `t.Errorf`, or similar `testing.T` methods for error handling inside handler functions instead.
Applied to files:
client/response.goclient/response_test.goctx_test.go
📚 Learning: 2025-09-28T17:39:26.644Z
Learnt from: arturmelanchyk
Repo: gofiber/fiber PR: 3768
File: client/request.go:162-166
Timestamp: 2025-09-28T17:39:26.644Z
Learning: In fasthttp-based code like gofiber, using utils.UnsafeString(key) instead of string(key) is acceptable when values are used immediately within the same call stack and not stored for later use, as it avoids allocations and aligns with fasthttp's zero-allocation design philosophy.
Applied to files:
client/response.go
📚 Learning: 2025-10-16T07:15:26.529Z
Learnt from: grivera64
Repo: gofiber/fiber PR: 3807
File: adapter_test.go:118-144
Timestamp: 2025-10-16T07:15:26.529Z
Learning: In Fiber v3, net/http handlers (http.Handler, http.HandlerFunc, or raw func(http.ResponseWriter, *http.Request)) can be passed directly to routing methods like app.Get(), app.Post(), etc. The framework automatically detects and wraps them internally via toFiberHandler/collectHandlers. The github.com/gofiber/fiber/v3/middleware/adaptor package is legacy and should not be suggested for tests or code using native net/http handler support.
Applied to files:
client/response.goctx_test.go
📚 Learning: 2024-11-29T12:37:27.581Z
Learnt from: efectn
Repo: gofiber/fiber PR: 3162
File: app_test.go:893-895
Timestamp: 2024-11-29T12:37:27.581Z
Learning: In the `Test_App_ShutdownWithContext` function in `app_test.go`, the `clientDone` channel is used to synchronize the client's request completion before proceeding, eliminating the need for additional `time.Sleep` calls.
Applied to files:
client/response_test.goctx_test.go
📚 Learning: 2024-07-26T21:00:12.902Z
Learnt from: sixcolors
Repo: gofiber/fiber PR: 2922
File: middleware/cors/utils.go:63-71
Timestamp: 2024-07-26T21:00:12.902Z
Learning: The project uses the testify/assert package for assertions in unit tests.
Applied to files:
ctx_test.go
📚 Learning: 2024-12-13T08:14:22.851Z
Learnt from: efectn
Repo: gofiber/fiber PR: 3162
File: hooks_test.go:228-228
Timestamp: 2024-12-13T08:14:22.851Z
Learning: In Go test files, prefer using the `require` methods from the `testify` package for assertions instead of manual comparisons and calls to `t.Fatal` or `t.Fatalf`.
Applied to files:
ctx_test.go
📚 Learning: 2024-11-15T07:56:21.623Z
Learnt from: ReneWerner87
Repo: gofiber/fiber PR: 3161
File: app.go:923-932
Timestamp: 2024-11-15T07:56:21.623Z
Learning: In the Fiber framework, breaking changes are acceptable when moving from version 2 to version 3, including modifications to method signatures such as in the `Test` method in `app.go`.
Applied to files:
ctx_test.go
📚 Learning: 2024-10-08T19:06:06.583Z
Learnt from: sixcolors
Repo: gofiber/fiber PR: 3016
File: middleware/session/store.go:164-167
Timestamp: 2024-10-08T19:06:06.583Z
Learning: Unit tests in this project use testify require.
Applied to files:
ctx_test.go
📚 Learning: 2025-10-22T13:48:12.485Z
Learnt from: CR
Repo: gofiber/fiber PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-10-22T13:48:12.485Z
Learning: Run tests with gotestsum (Make target: test)
Applied to files:
ctx_test.go
📚 Learning: 2024-11-10T23:44:13.704Z
Learnt from: gaby
Repo: gofiber/fiber PR: 3193
File: middleware/adaptor/adaptor.go:111-111
Timestamp: 2024-11-10T23:44:13.704Z
Learning: In the `middleware/adaptor/adaptor.go` file of the Fiber framework, when updating context handling, replacing `c.Context()` with `c.RequestCtx()` is appropriate to access the `fasthttp.RequestCtx`.
Applied to files:
ctx_test.go
📚 Learning: 2024-09-25T15:57:10.221Z
Learnt from: sixcolors
Repo: gofiber/fiber PR: 3016
File: middleware/csrf/csrf_test.go:188-193
Timestamp: 2024-09-25T15:57:10.221Z
Learning: In the Fiber framework tests, using `ctx.Response.Header.Cookie` may not be suitable for parsing cookies from the response header, as it requires a `*Cookie` and fills it rather than returning a string value; thus, manual parsing of the `Set-Cookie` header may be necessary.
Applied to files:
ctx_test.go
📚 Learning: 2024-09-25T17:05:06.991Z
Learnt from: sixcolors
Repo: gofiber/fiber PR: 3016
File: middleware/session/middleware_test.go:190-191
Timestamp: 2024-09-25T17:05:06.991Z
Learning: When testing session `IdleTimeout` expiration, it's acceptable to use `time.Sleep` to simulate the passage of time in tests.
Applied to files:
ctx_test.go
📚 Learning: 2024-10-08T19:06:06.583Z
Learnt from: sixcolors
Repo: gofiber/fiber PR: 3051
File: middleware/session/session.go:215-216
Timestamp: 2024-10-08T19:06:06.583Z
Learning: Parallel tests for `Session.Save` already exist in the `middleware/session/session_test.go` file, specifically in the `Test_Session_Save` and `Test_Session_Save_Expiration` functions.
Applied to files:
ctx_test.go
🧬 Code graph analysis (3)
client/response_test.go (2)
client/client.go (1)
New(752-757)client/response.go (1)
Response(19-25)
ctx_test.go (3)
req_interface_gen.go (1)
Req(12-188)req.go (1)
Params(619-625)app.go (1)
TestConfig(1090-1101)
client/core.go (1)
client/response.go (1)
ReleaseResponse(232-235)
🔇 Additional comments (1)
ctx_test.go (1)
676-701: Toolchain compatibility verified — no action requiredThe go.mod file correctly specifies Go 1.25.0, which exceeds the minimum Go 1.23 requirement for
strings.SplitSeqand range-over-iterators. The code at the flagged lines (ctx_test.go 676-701 and 912-936) is compatible with the configured Go version.
efd9696 to
79e9380
Compare
This pull request introduces support for streaming HTTP response bodies in the client, allowing responses to be read as streams rather than being fully loaded into memory. This is particularly useful for handling large responses or server-sent events. The changes include new configuration options at both the client and request levels, implementation of the streaming logic, and comprehensive tests to ensure correct behavior.
Streaming response body support:
streamResponseBodyfield to theClientstruct, along withSetStreamResponseBodyandStreamResponseBodymethods to enable or disable response body streaming at the client level (client/client.go). [1] [2]streamResponseBodyfield to theRequeststruct, with correspondingSetStreamResponseBodyandStreamResponseBodymethods to allow per-request configuration that overrides the client setting (client/request.go). [1] [2]client/core.go).BodyStreammethod on theResponsestruct to provide anio.Readerfor streaming the response body, falling back to an in-memory reader if streaming is not enabled (client/response.go).Testing and validation:
client/client_test.go,client/response_test.go). [1] [2]streamResponseBodyoverride (client/request.go).# DescriptionPlease provide a clear and concise description of the changes you've made and the problem they address. Include the purpose of the change, any relevant issues it solves, and the benefits it brings to the project. If this change introduces new features or adjustments, highlight them here.
Related #3425