Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,20 @@ func (c *Client) DisableDebug() *Client {
return c
}

// StreamResponseBody returns the current StreamResponseBody setting.
func (c *Client) StreamResponseBody() bool {
return c.transport.StreamResponseBody()
}

// SetStreamResponseBody enables or disables response body streaming.
// When enabled, the response body can be read as a stream using BodyStream()
// instead of being fully loaded into memory. This is useful for large responses
// or server-sent events.
func (c *Client) SetStreamResponseBody(enable bool) *Client {
c.transport.SetStreamResponseBody(enable)
return c
}

// SetCookieJar sets the cookie jar for the client.
func (c *Client) SetCookieJar(cookieJar *CookieJar) *Client {
c.cookieJar = cookieJar
Expand Down
62 changes: 62 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2343,3 +2343,65 @@ func Benchmark_Client_Request_Send_ContextCancel(b *testing.B) {
require.ErrorIs(b, <-errCh, ErrTimeoutOrCancel)
}
}

func Test_Client_StreamResponseBody(t *testing.T) {
t.Parallel()

t.Run("default value", func(t *testing.T) {
t.Parallel()
client := New()
require.False(t, client.StreamResponseBody())
})

t.Run("enable streaming", func(t *testing.T) {
t.Parallel()
client := New()
result := client.SetStreamResponseBody(true)
require.True(t, client.StreamResponseBody())
require.Equal(t, client, result)
})

t.Run("disable streaming", func(t *testing.T) {
t.Parallel()
client := New()
client.SetStreamResponseBody(true)
require.True(t, client.StreamResponseBody())
client.SetStreamResponseBody(false)
require.False(t, client.StreamResponseBody())
})

t.Run("with standard client", func(t *testing.T) {
t.Parallel()
client := New()
client.SetStreamResponseBody(true)
require.True(t, client.StreamResponseBody())
})

t.Run("with host client", func(t *testing.T) {
t.Parallel()
hostClient := &fasthttp.HostClient{}
client := NewWithHostClient(hostClient)
client.SetStreamResponseBody(true)
require.True(t, client.StreamResponseBody())
require.True(t, hostClient.StreamResponseBody)
})

t.Run("with lb client", func(t *testing.T) {
t.Parallel()
lbClient := &fasthttp.LBClient{
Clients: []fasthttp.BalancingClient{
&fasthttp.HostClient{Addr: "example.com:80"},
},
}
client := NewWithLBClient(lbClient)
client.SetStreamResponseBody(true)
require.True(t, client.StreamResponseBody())
})

t.Run("getter with standard client without setter", func(t *testing.T) {
t.Parallel()
client := New()
// Test getter directly without calling setter
require.False(t, client.StreamResponseBody())
})
}
16 changes: 14 additions & 2 deletions client/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,16 @@ func (c *core) execFunc() (*Response, error) {
defer fasthttp.ReleaseRequest(reqv)

respv := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(respv)
defer func() {
if respv != nil {
fasthttp.ReleaseResponse(respv)
}
}()

c.req.RawRequest.CopyTo(reqv)
if bodyStream := c.req.RawRequest.BodyStream(); bodyStream != nil {
reqv.SetBodyStream(bodyStream, c.req.RawRequest.Header.ContentLength())
}

var err error
if cfg != nil {
Expand All @@ -115,7 +122,12 @@ func (c *core) execFunc() (*Response, error) {
resp := AcquireResponse()
resp.setClient(c.client)
resp.setRequest(c.req)
respv.CopyTo(resp.RawResponse)
originalRaw := resp.RawResponse
resp.RawResponse = respv
respv = nil
if originalRaw != nil {
fasthttp.ReleaseResponse(originalRaw)
}
respChan <- resp
}()

Expand Down
7 changes: 7 additions & 0 deletions client/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,13 @@ func (*blockingErrTransport) Client() any {
return nil
}

func (*blockingErrTransport) StreamResponseBody() bool {
return false
}

func (*blockingErrTransport) SetStreamResponseBody(_ bool) {
}

func (b *blockingErrTransport) release() {
b.releaseOnce.Do(func() { close(b.unblock) })
}
49 changes: 37 additions & 12 deletions client/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"path/filepath"
"sync"

"github.com/gofiber/utils/v2"
utils "github.com/gofiber/utils/v2"
"github.com/valyala/fasthttp"
)

Expand Down Expand Up @@ -89,22 +89,38 @@ func (r *Response) Body() []byte {
return r.RawResponse.Body()
}

// BodyStream returns the response body as a stream reader.
// Note: When using BodyStream(), the response body is not copied to memory,
// so calling Body() afterwards may return an empty slice.
func (r *Response) BodyStream() io.Reader {
if stream := r.RawResponse.BodyStream(); stream != nil {
return stream
}
// If streaming is not enabled, return a bytes.Reader from the regular body
return bytes.NewReader(r.RawResponse.Body())
}

// IsStreaming returns true if the response body is being streamed.
func (r *Response) IsStreaming() bool {
return r.RawResponse.BodyStream() != nil
}

// String returns the response body as a trimmed string.
func (r *Response) String() string {
return utils.Trim(string(r.Body()), ' ')
}

// JSON unmarshal the response body into the given interface{} using JSON.
// JSON unmarshals the response body into the given interface{} using JSON.
func (r *Response) JSON(v any) error {
return r.client.jsonUnmarshal(r.Body(), v)
}

// CBOR unmarshal the response body into the given interface{} using CBOR.
// CBOR unmarshals the response body into the given interface{} using CBOR.
func (r *Response) CBOR(v any) error {
return r.client.cborUnmarshal(r.Body(), v)
}

// XML unmarshal the response body into the given interface{} using XML.
// XML unmarshals the response body into the given interface{} using XML.
func (r *Response) XML(v any) error {
return r.client.xmlUnmarshal(r.Body(), v)
}
Expand Down Expand Up @@ -136,21 +152,30 @@ func (r *Response) Save(v any) error {
}
defer func() { _ = outFile.Close() }() //nolint:errcheck // not needed

if _, err = io.Copy(outFile, bytes.NewReader(r.Body())); err != nil {
if r.IsStreaming() {
_, err = io.Copy(outFile, r.BodyStream())
} else {
_, err = io.Copy(outFile, bytes.NewReader(r.Body()))
}

if err != nil {
return fmt.Errorf("failed to write response body to file: %w", err)
}

return nil

case io.Writer:
if _, err := io.Copy(p, bytes.NewReader(r.Body())); err != nil {
return fmt.Errorf("failed to write response body to io.Writer: %w", err)
var err error
if r.IsStreaming() {
_, err = io.Copy(p, r.BodyStream())
} else {
_, err = io.Copy(p, bytes.NewReader(r.Body()))
}
defer func() {
if pc, ok := p.(io.WriteCloser); ok {
_ = pc.Close() //nolint:errcheck // not needed
}
}()

if err != nil {
return fmt.Errorf("failed to write response body to writer: %w", err)
}

return nil

default:
Expand Down
Loading