|
3 | 3 | * SPDX-License-Identifier: Apache-2.0. |
4 | 4 | */ |
5 | 5 |
|
6 | | -#include <aws/core/http/curl/CurlHttpClient.h> |
7 | 6 | #include <aws/core/http/HttpRequest.h> |
| 7 | +#include <aws/core/http/curl/CurlHttpClient.h> |
8 | 8 | #include <aws/core/http/standard/StandardHttpResponse.h> |
9 | | -#include <aws/core/utils/StringUtils.h> |
| 9 | +#include <aws/core/monitoring/HttpClientMetrics.h> |
| 10 | +#include <aws/core/utils/DateTime.h> |
10 | 11 | #include <aws/core/utils/HashingUtils.h> |
| 12 | +#include <aws/core/utils/Outcome.h> |
| 13 | +#include <aws/core/utils/StringUtils.h> |
| 14 | +#include <aws/core/utils/crypto/Hash.h> |
11 | 15 | #include <aws/core/utils/logging/LogMacros.h> |
12 | 16 | #include <aws/core/utils/ratelimiter/RateLimiterInterface.h> |
13 | | -#include <aws/core/utils/DateTime.h> |
14 | | -#include <aws/core/utils/crypto/Hash.h> |
15 | | -#include <aws/core/utils/Outcome.h> |
16 | | -#include <aws/core/monitoring/HttpClientMetrics.h> |
17 | | -#include <cassert> |
| 17 | +#include <aws/core/utils/stream/AwsChunkedStream.h> |
| 18 | + |
18 | 19 | #include <algorithm> |
| 20 | +#include <cassert> |
19 | 21 | #include <thread> |
20 | 22 |
|
21 | | - |
22 | 23 | using namespace Aws::Client; |
23 | 24 | using namespace Aws::Http; |
24 | 25 | using namespace Aws::Http::Standard; |
25 | 26 | using namespace Aws::Utils; |
26 | 27 | using namespace Aws::Utils::Logging; |
| 28 | +using namespace Aws::Utils::Stream; |
27 | 29 | using namespace Aws::Monitoring; |
28 | 30 |
|
29 | 31 | #ifdef USE_AWS_MEMORY_MANAGEMENT |
@@ -144,25 +146,28 @@ struct CurlWriteCallbackContext |
144 | 146 | int64_t m_numBytesResponseReceived; |
145 | 147 | }; |
146 | 148 |
|
| 149 | +static const char* CURL_HTTP_CLIENT_TAG = "CurlHttpClient"; |
| 150 | + |
147 | 151 | struct CurlReadCallbackContext |
148 | 152 | { |
149 | | - CurlReadCallbackContext(const CurlHttpClient* client, CURL* curlHandle, HttpRequest* request, Aws::Utils::RateLimits::RateLimiterInterface* limiter) : |
150 | | - m_client(client), |
| 153 | + CurlReadCallbackContext(const CurlHttpClient* client, CURL* curlHandle, HttpRequest* request, |
| 154 | + Aws::Utils::RateLimits::RateLimiterInterface* limiter, |
| 155 | + std::shared_ptr<AwsChunkedStream<>> chunkedStream = nullptr) |
| 156 | + : m_client(client), |
151 | 157 | m_curlHandle(curlHandle), |
152 | 158 | m_rateLimiter(limiter), |
153 | 159 | m_request(request), |
154 | | - m_chunkEnd(false) |
155 | | - {} |
156 | | - |
157 | | - const CurlHttpClient* m_client; |
158 | | - CURL* m_curlHandle; |
159 | | - Aws::Utils::RateLimits::RateLimiterInterface* m_rateLimiter; |
160 | | - HttpRequest* m_request; |
161 | | - bool m_chunkEnd; |
| 160 | + m_chunkEnd(false), |
| 161 | + m_chunkedStream{std::move(chunkedStream)} {} |
| 162 | + |
| 163 | + const CurlHttpClient* m_client; |
| 164 | + CURL* m_curlHandle; |
| 165 | + Aws::Utils::RateLimits::RateLimiterInterface* m_rateLimiter; |
| 166 | + HttpRequest* m_request; |
| 167 | + bool m_chunkEnd; |
| 168 | + std::shared_ptr<Stream::AwsChunkedStream<>> m_chunkedStream; |
162 | 169 | }; |
163 | 170 |
|
164 | | -static const char* CURL_HTTP_CLIENT_TAG = "CurlHttpClient"; |
165 | | - |
166 | 171 | static int64_t GetContentLengthFromHeader(CURL* connectionHandle, |
167 | 172 | bool& hasContentLength) { |
168 | 173 | #if LIBCURL_VERSION_NUM >= 0x073700 // 7.55.0 |
@@ -293,67 +298,24 @@ static size_t ReadBody(char* ptr, size_t size, size_t nmemb, void* userdata, boo |
293 | 298 | size_t amountToRead = size * nmemb; |
294 | 299 | bool isAwsChunked = request->HasHeader(Aws::Http::CONTENT_ENCODING_HEADER) && |
295 | 300 | request->GetHeaderValue(Aws::Http::CONTENT_ENCODING_HEADER) == Aws::Http::AWS_CHUNKED_VALUE; |
296 | | - // aws-chunk = hex(chunk-size) + CRLF + chunk-data + CRLF |
297 | | - // Needs to reserve bytes of sizeof(hex(chunk-size)) + sizeof(CRLF) + sizeof(CRLF) |
298 | | - if (isAwsChunked) |
299 | | - { |
300 | | - Aws::String amountToReadHexString = Aws::Utils::StringUtils::ToHexString(amountToRead); |
301 | | - amountToRead -= (amountToReadHexString.size() + 4); |
302 | | - } |
303 | 301 |
|
304 | 302 | if (ioStream != nullptr && amountToRead > 0) |
305 | 303 | { |
306 | 304 | size_t amountRead = 0; |
307 | | - if (isStreaming) |
308 | | - { |
309 | | - if (!ioStream->eof() && ioStream->peek() != EOF) |
310 | | - { |
311 | | - amountRead = (size_t) ioStream->readsome(ptr, amountToRead); |
312 | | - } |
313 | | - if (amountRead == 0 && !ioStream->eof()) |
314 | | - { |
315 | | - return CURL_READFUNC_PAUSE; |
316 | | - } |
317 | | - } |
318 | | - else |
319 | | - { |
320 | | - ioStream->read(ptr, amountToRead); |
321 | | - amountRead = static_cast<size_t>(ioStream->gcount()); |
322 | | - } |
323 | | - |
324 | | - if (isAwsChunked) |
325 | | - { |
326 | | - if (amountRead > 0) |
327 | | - { |
328 | | - if (request->GetRequestHash().second != nullptr) |
329 | | - { |
330 | | - request->GetRequestHash().second->Update(reinterpret_cast<unsigned char*>(ptr), amountRead); |
331 | | - } |
332 | | - |
333 | | - Aws::String hex = Aws::Utils::StringUtils::ToHexString(amountRead); |
334 | | - memmove(ptr + hex.size() + 2, ptr, amountRead); |
335 | | - memmove(ptr + hex.size() + 2 + amountRead, "\r\n", 2); |
336 | | - memmove(ptr, hex.c_str(), hex.size()); |
337 | | - memmove(ptr + hex.size(), "\r\n", 2); |
338 | | - amountRead += hex.size() + 4; |
339 | | - } |
340 | | - else if (!context->m_chunkEnd) |
341 | | - { |
342 | | - Aws::StringStream chunkedTrailer; |
343 | | - chunkedTrailer << "0\r\n"; |
344 | | - if (request->GetRequestHash().second != nullptr) |
345 | | - { |
346 | | - chunkedTrailer << "x-amz-checksum-" |
347 | | - << request->GetRequestHash().first |
348 | | - << ":" |
349 | | - << HashingUtils::Base64Encode(request->GetRequestHash().second->GetHash().GetResult()) |
350 | | - << "\r\n"; |
351 | | - } |
352 | | - chunkedTrailer << "\r\n"; |
353 | | - amountRead = chunkedTrailer.str().size(); |
354 | | - memcpy(ptr, chunkedTrailer.str().c_str(), amountRead); |
355 | | - context->m_chunkEnd = true; |
356 | | - } |
| 305 | + if (isStreaming) { |
| 306 | + if (!ioStream->eof() && ioStream->peek() != EOF) { |
| 307 | + amountRead = (size_t)ioStream->readsome(ptr, amountToRead); |
| 308 | + } |
| 309 | + if (amountRead == 0 && !ioStream->eof()) { |
| 310 | + return CURL_READFUNC_PAUSE; |
| 311 | + } |
| 312 | + } else if (isAwsChunked && context->m_chunkedStream != nullptr) { |
| 313 | + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Called with size: " << amountToRead); |
| 314 | + amountRead = context->m_chunkedStream->BufferedRead(ptr, amountToRead); |
| 315 | + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "read: " << amountRead); |
| 316 | + } else { |
| 317 | + ioStream->read(ptr, amountToRead); |
| 318 | + amountRead = static_cast<size_t>(ioStream->gcount()); |
357 | 319 | } |
358 | 320 |
|
359 | 321 | auto& sentHandler = request->GetDataSentEventHandler(); |
@@ -724,7 +686,14 @@ std::shared_ptr<HttpResponse> CurlHttpClient::MakeRequest(const std::shared_ptr< |
724 | 686 | } |
725 | 687 |
|
726 | 688 | CurlWriteCallbackContext writeContext(this, request.get(), response.get(), readLimiter); |
727 | | - CurlReadCallbackContext readContext(this, connectionHandle, request.get(), writeLimiter); |
| 689 | + |
| 690 | + const auto readContext = [this, &connectionHandle, &request, &writeLimiter]() -> CurlReadCallbackContext { |
| 691 | + if (request->GetContentBody() != nullptr) { |
| 692 | + auto chunkedBodyPtr = Aws::MakeShared<AwsChunkedStream<>>(CURL_HTTP_CLIENT_TAG, request.get(), request->GetContentBody()); |
| 693 | + return {this, connectionHandle, request.get(), writeLimiter, std::move(chunkedBodyPtr)}; |
| 694 | + } |
| 695 | + return {this, connectionHandle, request.get(), writeLimiter}; |
| 696 | + }(); |
728 | 697 |
|
729 | 698 | SetOptCodeForHttpMethod(connectionHandle, request); |
730 | 699 |
|
|
0 commit comments