Skip to content

Commit 340467f

Browse files
committed
f [skip-ci]
1 parent aa327c1 commit 340467f

29 files changed

+491
-373
lines changed

README.md

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,24 +106,23 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
106106
while (conn->will_reconnect()) {
107107

108108
// Reconnect to channels.
109-
co_await conn->async_exec(req, ignore);
109+
co_await conn->async_exec(req);
110110

111111
// Loop reading Redis pushes.
112-
for (;;) {
113-
error_code ec;
114-
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
112+
for (error_code ec;;) {
113+
co_await conn->async_receive2(resp, redirect_error(ec));
115114
if (ec)
116115
break; // Connection lost, break so we can reconnect to channels.
117116

118117
// Use the response resp in some way and then clear it.
119118
...
120119

121-
consume_one(resp);
120+
resp.value().clear();
122121
}
123122
}
124123
}
125124
```
126125
127126
## Further reading
128127
129-
Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html).
128+
Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html).

doc/modules/ROOT/pages/index.adoc

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,19 +117,18 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
117117
while (conn->will_reconnect()) {
118118
119119
// Reconnect to channels.
120-
co_await conn->async_exec(req, ignore);
120+
co_await conn->async_exec(req);
121121
122122
// Loop reading Redis pushes.
123-
for (;;) {
124-
error_code ec;
125-
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
123+
for (error_code ec;;) {
124+
co_await conn->async_receive2(resp, redirect_error(ec));
126125
if (ec)
127126
break; // Connection lost, break so we can reconnect to channels.
128127
129128
// Use the response resp in some way and then clear it.
130129
...
131130
132-
consume_one(resp);
131+
resp.value().clear();
133132
}
134133
}
135134
}

example/cpp20_chat_room.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,9 @@ using boost::asio::consign;
2929
using boost::asio::detached;
3030
using boost::asio::dynamic_buffer;
3131
using boost::asio::redirect_error;
32-
using boost::asio::use_awaitable;
3332
using boost::redis::config;
3433
using boost::redis::connection;
35-
using boost::redis::generic_flat_response;
36-
using boost::redis::ignore;
34+
using boost::redis::generic_response;
3735
using boost::redis::request;
3836
using boost::system::error_code;
3937
using namespace std::chrono_literals;
@@ -46,20 +44,22 @@ auto receiver(std::shared_ptr<connection> conn) -> awaitable<void>
4644
request req;
4745
req.push("SUBSCRIBE", "channel");
4846

49-
generic_flat_response resp;
47+
generic_response resp;
5048
conn->set_receive_response(resp);
5149

5250
while (conn->will_reconnect()) {
5351
// Subscribe to channels.
54-
co_await conn->async_exec(req, ignore);
52+
co_await conn->async_exec(req);
5553

5654
// Loop reading Redis push messages.
5755
for (error_code ec;;) {
58-
co_await conn->async_receive(redirect_error(use_awaitable, ec));
56+
co_await conn->async_receive2(redirect_error(ec));
5957
if (ec)
6058
break; // Connection lost, break so we can reconnect to channels.
59+
6160
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
6261
<< resp.value().at(3).value << std::endl;
62+
6363
resp.value().clear();
6464
}
6565
}
@@ -73,7 +73,7 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
7373
auto n = co_await async_read_until(*in, dynamic_buffer(msg, 1024), "\n");
7474
request req;
7575
req.push("PUBLISH", "channel", msg);
76-
co_await conn->async_exec(req, ignore);
76+
co_await conn->async_exec(req);
7777
msg.erase(0, n);
7878
}
7979
}

example/cpp20_streams.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
namespace net = boost::asio;
2525
using boost::redis::config;
26-
using boost::redis::generic_flat_response;
26+
using boost::redis::generic_response;
2727
using boost::redis::operation;
2828
using boost::redis::request;
2929
using boost::redis::connection;
@@ -33,7 +33,7 @@ auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>
3333
{
3434
std::string redisStreamKey_;
3535
request req;
36-
generic_flat_response resp;
36+
generic_response resp;
3737

3838
std::string stream_id{"$"};
3939
std::string const field = "myfield";
@@ -51,7 +51,7 @@ auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>
5151
// The following approach was taken in order to be able to
5252
// deal with the responses, as generated by redis in the case
5353
// that there are multiple stream 'records' within a single
54-
// generic_flat_response. The nesting and number of values in
54+
// generic_response. The nesting and number of values in
5555
// resp.value() are different, depending on the contents
5656
// of the stream in redis. Uncomment the above commented-out
5757
// code for examples while running the XADD command.

example/cpp20_subscriber.cpp

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,8 @@ namespace asio = boost::asio;
2323
using namespace std::chrono_literals;
2424
using boost::redis::request;
2525
using boost::redis::generic_flat_response;
26-
using boost::redis::consume_one;
2726
using boost::redis::logger;
2827
using boost::redis::config;
29-
using boost::redis::ignore;
30-
using boost::redis::error;
3128
using boost::system::error_code;
3229
using boost::redis::connection;
3330
using asio::signal_set;
@@ -60,24 +57,23 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
6057
// Loop while reconnection is enabled
6158
while (conn->will_reconnect()) {
6259
// Reconnect to the channels.
63-
co_await conn->async_exec(req, ignore);
60+
co_await conn->async_exec(req);
6461

65-
// Loop reading Redis pushs messages.
62+
// Loop reading Redis push messages.
6663
for (error_code ec;;) {
67-
// First tries to read any buffered pushes.
68-
conn->receive(ec);
69-
if (ec == error::sync_receive_push_failed) {
70-
ec = {};
71-
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
72-
}
73-
64+
// Wait for a push
65+
co_await conn->async_receive2(asio::redirect_error(ec));
7466
if (ec)
7567
break; // Connection lost, break so we can reconnect to channels.
7668

77-
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
78-
<< resp.value().at(3).value << std::endl;
69+
// The response must be consumed without suspending the
70+
// coroutine i.e. without the use of async operations.
71+
for (auto const& elem: resp.value().get_view())
72+
std::cout << elem.value.data << "\n";
73+
74+
std::cout << std::endl;
7975

80-
consume_one(resp);
76+
resp.value().clear();
8177
}
8278
}
8379
}

include/boost/redis/adapter/any_adapter.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class any_adapter {
5353
static auto create_impl(T& resp) -> impl_t
5454
{
5555
using namespace boost::redis::adapter;
56-
5756
return [adapter2 = boost_redis_adapt(resp)](
5857
any_adapter::parse_event ev,
5958
resp3::node_view const& nd,

include/boost/redis/adapter/detail/adapters.hpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,14 @@
1212
#include <boost/redis/resp3/node.hpp>
1313
#include <boost/redis/resp3/serialization.hpp>
1414
#include <boost/redis/resp3/type.hpp>
15-
#include <boost/redis/response.hpp>
15+
#include <boost/redis/generic_flat_response_value.hpp>
1616

1717
#include <boost/assert.hpp>
1818

1919
#include <array>
2020
#include <charconv>
2121
#include <deque>
2222
#include <forward_list>
23-
#include <iostream>
2423
#include <list>
2524
#include <map>
2625
#include <optional>
@@ -138,6 +137,8 @@ void boost_redis_from_bulk(T& t, resp3::basic_node<String> const& node, system::
138137
from_bulk_impl<T>::apply(t, node, ec);
139138
}
140139

140+
//================================================
141+
141142
template <class Result>
142143
class general_aggregate {
143144
private:
@@ -177,20 +178,20 @@ class general_aggregate {
177178
};
178179

179180
template <>
180-
class general_aggregate<result<flat_response_value>> {
181+
class general_aggregate<result<generic_flat_response_value>> {
181182
private:
182-
result<flat_response_value>* result_;
183+
result<generic_flat_response_value>* result_ = nullptr;
183184

184185
public:
185-
explicit general_aggregate(result<flat_response_value>* c = nullptr)
186+
explicit general_aggregate(result<generic_flat_response_value>* c = nullptr)
186187
: result_(c)
187188
{ }
188189

189190
void on_init() { }
190191
void on_done()
191192
{
192193
if (result_->has_value()) {
193-
result_->value().set_view();
194+
result_->value().notify_done();
194195
}
195196
}
196197

@@ -206,7 +207,7 @@ class general_aggregate<result<flat_response_value>> {
206207
std::string{std::cbegin(nd.value), std::cend(nd.value)}
207208
};
208209
break;
209-
default: result_->value().add_node(nd);
210+
default: result_->value().push(nd);
210211
}
211212
}
212213
};

include/boost/redis/adapter/detail/result_traits.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ struct result_traits<result<std::vector<resp3::basic_node<String>, Allocator>>>
6666
template <>
6767
struct result_traits<generic_flat_response> {
6868
using response_type = generic_flat_response;
69-
using adapter_type = adapter::detail::general_aggregate<response_type>;
69+
using adapter_type = general_aggregate<response_type>;
7070
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
7171
};
7272

include/boost/redis/connection.hpp

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -743,7 +743,7 @@ class basic_connection {
743743
return async_run(config{}, std::forward<CompletionToken>(token));
744744
}
745745

746-
/** @brief Receives server side pushes asynchronously.
746+
/** @brief (Deprecated) Receives server side pushes asynchronously.
747747
*
748748
* When pushes arrive and there is no `async_receive` operation in
749749
* progress, pushed data, requests, and responses will be paused
@@ -773,12 +773,76 @@ class basic_connection {
773773
* @param token Completion token.
774774
*/
775775
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
776+
BOOST_DEPRECATED("Please use async_receive2 instead.")
776777
auto async_receive(CompletionToken&& token = {})
777778
{
778779
return impl_->receive_channel_.async_receive(std::forward<CompletionToken>(token));
779780
}
780781

781-
/** @brief Receives server pushes synchronously without blocking.
782+
/** @brief Wait for server pushes asynchronously
783+
*
784+
* This function suspends until a server push is received by the
785+
* connection. On completion an unspecified number of pushes will
786+
* have been added to the response object set with @ref
787+
* boost::redis::connection::set_receive_response.
788+
*
789+
* To prevent receiving an unbound number of pushes the connection
790+
* blocks further read operations on the socket when 256 pushes
791+
* accumulate internally (we don't make any commitment to this
792+
* exact number). When that happens calls to `async_exec` won't be
793+
* able to make progress including the health-checks, which might
794+
* result in a connection timeout. To avoid that Apps should call
795+
* async_receive2 continuously in a loop.
796+
*
797+
* @Note To avoid deadlocks the task calling `async_receive2 should
798+
* not call `async_exec` in a way where they could block each
799+
* other.
800+
*
801+
* For an example see cpp20_subscriber.cpp. The completion token
802+
* must have the following signature
803+
*
804+
* @code
805+
* void f(system::error_code);
806+
* @endcode
807+
*
808+
* @par Per-operation cancellation
809+
* This operation supports the following cancellation types:
810+
*
811+
* @li `asio::cancellation_type_t::terminal`.
812+
* @li `asio::cancellation_type_t::partial`.
813+
* @li `asio::cancellation_type_t::total`.
814+
*
815+
* Calling `basic_connection::cancel(operation::receive)` will
816+
* also cancel any ongoing receive operations.
817+
*
818+
* @param token Completion token.
819+
*/
820+
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
821+
auto async_receive2(CompletionToken&& token = {})
822+
{
823+
return
824+
impl_->receive_channel_.async_receive(
825+
asio::deferred(
826+
[&conn = *this](system::error_code ec, std::size_t)
827+
{
828+
if (!ec) {
829+
auto f = [](system::error_code, std::size_t) {
830+
// There is no point in checking for errors
831+
// here since async_receive just completed
832+
// without errors.
833+
};
834+
835+
// We just want to drain the channel.
836+
while (conn.impl_->receive_channel_.try_receive(f));
837+
}
838+
839+
return asio::deferred.values(ec);
840+
}
841+
)
842+
)(std::forward<CompletionToken>(token));
843+
}
844+
845+
/** @brief (Deprecated) Receives server pushes synchronously without blocking.
782846
*
783847
* Receives a server push synchronously by calling `try_receive` on
784848
* the underlying channel. If the operation fails because
@@ -788,6 +852,7 @@ class basic_connection {
788852
* @param ec Contains the error if any occurred.
789853
* @returns The number of bytes read from the socket.
790854
*/
855+
BOOST_DEPRECATED("Please use async_receive2.")
791856
std::size_t receive(system::error_code& ec)
792857
{
793858
std::size_t size = 0;
@@ -987,7 +1052,7 @@ class basic_connection {
9871052
"the other member functions to interact with the connection.")
9881053
auto const& next_layer() const noexcept { return impl_->stream_.next_layer(); }
9891054

990-
/// Sets the response object of @ref async_receive operations.
1055+
/// Sets the response object of @ref async_receive2 operations.
9911056
template <class Response>
9921057
void set_receive_response(Response& resp)
9931058
{
@@ -1178,12 +1243,21 @@ class connection {
11781243

11791244
/// @copydoc basic_connection::async_receive
11801245
template <class CompletionToken = asio::deferred_t>
1246+
BOOST_DEPRECATED("Please use async_receive2 instead.")
11811247
auto async_receive(CompletionToken&& token = {})
11821248
{
11831249
return impl_.async_receive(std::forward<CompletionToken>(token));
11841250
}
11851251

1252+
/// @copydoc basic_connection::async_receive2
1253+
template <class CompletionToken = asio::deferred_t>
1254+
auto async_receive2(CompletionToken&& token = {})
1255+
{
1256+
return impl_.async_receive2(std::forward<CompletionToken>(token));
1257+
}
1258+
11861259
/// @copydoc basic_connection::receive
1260+
BOOST_DEPRECATED("Please use async_receive2 instead.")
11871261
std::size_t receive(system::error_code& ec) { return impl_.receive(ec); }
11881262

11891263
/**

include/boost/redis/detail/writer_fsm.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//
22
// Copyright (c) 2025 Marcelo Zimbres Silva ([email protected]),
3-
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
3+
// Nikolai Vladimirov (TODO)
44
//
55
// Distributed under the Boost Software License, Version 1.0. (See accompanying
66
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

0 commit comments

Comments
 (0)