Skip to content

Commit 7628bb9

Browse files
committed
Simplifies async operations.
1 parent 6eba0ea commit 7628bb9

File tree

8 files changed

+61
-266
lines changed

8 files changed

+61
-266
lines changed

include/boost/redis/connection.hpp

Lines changed: 3 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -21,43 +21,6 @@
2121
#include <limits>
2222

2323
namespace boost::redis {
24-
namespace detail
25-
{
26-
template <class Connection, class Logger>
27-
struct reconnection_op {
28-
Connection* conn_ = nullptr;
29-
Logger logger_;
30-
asio::coroutine coro_{};
31-
32-
template <class Self>
33-
void operator()(Self& self, system::error_code ec = {})
34-
{
35-
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
36-
{
37-
BOOST_ASIO_CORO_YIELD
38-
conn_->impl_.async_run(conn_->cfg_, logger_, std::move(self));
39-
conn_->cancel(operation::receive);
40-
logger_.on_connection_lost(ec);
41-
if (!conn_->will_reconnect()) {
42-
conn_->cancel(operation::reconnection);
43-
self.complete(ec);
44-
return;
45-
}
46-
47-
conn_->timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
48-
BOOST_ASIO_CORO_YIELD
49-
conn_->timer_.async_wait(std::move(self));
50-
BOOST_REDIS_CHECK_OP0(;)
51-
if (!conn_->will_reconnect()) {
52-
self.complete(asio::error::operation_aborted);
53-
return;
54-
}
55-
56-
conn_->reset_stream();
57-
}
58-
}
59-
};
60-
} // detail
6124

6225
/** @brief A SSL connection to the Redis server.
6326
* @ingroup high-level-api
@@ -100,7 +63,6 @@ class basic_connection {
10063
asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
10164
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
10265
: impl_{ex, std::move(ctx), max_read_size}
103-
, timer_{ex}
10466
{ }
10567

10668
/// Contructs from a context.
@@ -158,14 +120,7 @@ class basic_connection {
158120
Logger l = Logger{},
159121
CompletionToken token = CompletionToken{})
160122
{
161-
using this_type = basic_connection<executor_type>;
162-
163-
cfg_ = cfg;
164-
l.set_prefix(cfg_.log_prefix);
165-
return asio::async_compose
166-
< CompletionToken
167-
, void(system::error_code)
168-
>(detail::reconnection_op<this_type, Logger>{this, l}, token, timer_);
123+
return impl_.async_run(cfg, l, std::move(token));
169124
}
170125

171126
/** @brief Receives server side pushes asynchronously.
@@ -272,22 +227,11 @@ class basic_connection {
272227
* @param op: The operation to be cancelled.
273228
*/
274229
void cancel(operation op = operation::all)
275-
{
276-
switch (op) {
277-
case operation::reconnection:
278-
case operation::all:
279-
cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
280-
timer_.cancel();
281-
break;
282-
default: /* ignore */;
283-
}
284-
285-
impl_.cancel(op);
286-
}
230+
{ impl_.cancel(op); }
287231

288232
/// Returns true if the connection was canceled.
289233
bool will_reconnect() const noexcept
290-
{ return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();}
234+
{ return impl_.will_reconnect();}
291235

292236
/// Returns the ssl context.
293237
auto const& get_ssl_context() const noexcept
@@ -315,17 +259,7 @@ class basic_connection {
315259
{ return impl_.get_usage(); }
316260

317261
private:
318-
using timer_type =
319-
asio::basic_waitable_timer<
320-
std::chrono::steady_clock,
321-
asio::wait_traits<std::chrono::steady_clock>,
322-
Executor>;
323-
324-
template <class, class> friend struct detail::reconnection_op;
325-
326-
config cfg_;
327262
detail::connection_base<executor_type> impl_;
328-
timer_type timer_;
329263
};
330264

331265
/** \brief A basic_connection that type erases the executor.

include/boost/redis/detail/connection_base.hpp

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -313,13 +313,6 @@ struct reader_op {
313313

314314
logger_.on_read(ec, n);
315315

316-
// EOF is not treated as error.
317-
if (ec == asio::error::eof) {
318-
logger_.trace("reader-op: EOF received. Exiting ...");
319-
conn_->cancel(operation::run);
320-
return self.complete(ec);
321-
}
322-
323316
// The connection is not viable after an error.
324317
if (ec) {
325318
logger_.trace("reader-op: error. Exiting ...");
@@ -429,7 +422,17 @@ class connection_base {
429422
/// Cancels specific operations.
430423
void cancel(operation op)
431424
{
425+
// TODO: Simplify
426+
switch (op) {
427+
case operation::reconnection:
428+
case operation::all:
429+
cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
430+
break;
431+
default: /* ignore */;
432+
}
433+
432434
runner_.cancel(op);
435+
433436
if (op == operation::all) {
434437
cancel_impl(operation::run);
435438
cancel_impl(operation::receive);
@@ -490,8 +493,9 @@ class connection_base {
490493
template <class Logger, class CompletionToken>
491494
auto async_run(config const& cfg, Logger l, CompletionToken token)
492495
{
496+
cfg_ = cfg;
493497
runner_.set_config(cfg);
494-
l.set_prefix(runner_.get_config().log_prefix);
498+
l.set_prefix(cfg.log_prefix);
495499
return runner_.async_run(*this, l, std::move(token));
496500
}
497501

@@ -509,6 +513,9 @@ class connection_base {
509513
auto run_is_canceled() const noexcept
510514
{ return cancel_run_called_; }
511515

516+
bool will_reconnect() const noexcept
517+
{ return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();}
518+
512519
private:
513520
using receive_channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
514521
using runner_type = runner<executor_type>;
@@ -517,7 +524,7 @@ class connection_base {
517524
using exec_notifier_type = receive_channel_type;
518525

519526
auto use_ssl() const noexcept
520-
{ return runner_.get_config().use_ssl;}
527+
{ return cfg_.use_ssl;}
521528

522529
auto cancel_on_conn_lost() -> std::size_t
523530
{
@@ -760,10 +767,8 @@ class connection_base {
760767
}
761768

762769
template <class Logger, class CompletionToken>
763-
auto async_run_lean(config const& cfg, Logger l, CompletionToken token)
770+
auto async_run_lean(Logger l, CompletionToken token)
764771
{
765-
runner_.set_config(cfg);
766-
l.set_prefix(runner_.get_config().log_prefix);
767772
return asio::async_compose
768773
< CompletionToken
769774
, void(system::error_code)
@@ -948,6 +953,7 @@ class connection_base {
948953

949954
using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
950955

956+
config cfg_;
951957
std::string read_buffer_;
952958
dyn_buffer_type dbuf_;
953959
std::string write_buffer_;

include/boost/redis/detail/runner.hpp

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class runner_op {
8787
, system::error_code ec2 = {}
8888
, std::size_t = 0)
8989
{
90-
BOOST_ASIO_CORO_REENTER (coro_)
90+
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
9191
{
9292
BOOST_ASIO_CORO_YIELD
9393
runner_->resv_.async_resolve(asio::prepend(std::move(self), order_t {}));
@@ -133,17 +133,14 @@ class runner_op {
133133
asio::experimental::make_parallel_group(
134134
[this](auto token) { return runner_->async_hello(*conn_, logger_, token); },
135135
[this](auto token) { return runner_->health_checker_.async_check_health(*conn_, logger_, token); },
136-
[this](auto token) { return conn_->async_run_lean(runner_->cfg_, logger_, token); }
136+
[this](auto token) { return conn_->async_run_lean(logger_, token); }
137137
).async_wait(
138138
asio::experimental::wait_for_one_error(),
139139
std::move(self));
140140

141+
// TODO: Unify these lines.
141142
logger_.on_runner(ec0, ec1, ec2);
142-
143-
if (is_cancelled(self)) {
144-
self.complete(asio::error::operation_aborted);
145-
return;
146-
}
143+
logger_.on_connection_lost({});
147144

148145
if (order[0] == 0 && !!ec0) {
149146
self.complete(ec0);
@@ -155,7 +152,34 @@ class runner_op {
155152
return;
156153
}
157154

158-
self.complete(ec2);
155+
// The receive operation must be cancelled because channel
156+
// subscription does not survive a reconnection but requires
157+
// re-subscription.
158+
conn_->cancel(operation::receive);
159+
160+
if (!conn_->will_reconnect()) {
161+
conn_->cancel(operation::reconnection);
162+
self.complete(ec2);
163+
return;
164+
}
165+
166+
// It is safe to use the writer timer here because we are not
167+
// connected.
168+
conn_->writer_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
169+
170+
BOOST_ASIO_CORO_YIELD
171+
conn_->writer_timer_.async_wait(asio::prepend(std::move(self), order_t {}));
172+
if (ec0) {
173+
self.complete(ec0);
174+
return;
175+
}
176+
177+
if (!conn_->will_reconnect()) {
178+
self.complete(asio::error::operation_aborted);
179+
return;
180+
}
181+
182+
conn_->reset_stream();
159183
}
160184
}
161185
};
@@ -196,8 +220,6 @@ class runner {
196220
>(runner_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
197221
}
198222

199-
config const& get_config() const noexcept {return cfg_;}
200-
201223
private:
202224
using resolver_type = resolver<Executor>;
203225
using handshaker_type = detail::handshaker<Executor>;

test/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ make_test(test_conn_reconnect 20)
4646
make_test(test_conn_exec_cancel 20)
4747
make_test(test_conn_exec_cancel2 20)
4848
make_test(test_conn_echo_stress 20)
49-
make_test(test_conn_run_cancel 20)
5049
make_test(test_issue_50 20)
5150
make_test(test_issue_181 17)
5251

test/common.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ run(
2424
boost::redis::config cfg = make_test_config(),
2525
boost::system::error_code ec = boost::asio::error::operation_aborted,
2626
boost::redis::operation op = boost::redis::operation::receive,
27-
boost::redis::logger::level l = boost::redis::logger::level::disabled);
27+
boost::redis::logger::level l = boost::redis::logger::level::debug);
2828

test/test_conn_exec_retry.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,13 @@ BOOST_AUTO_TEST_CASE(request_retry_false)
7575
conn->async_exec(req0, ignore, c0);
7676

7777
auto cfg = make_test_config();
78-
cfg.health_check_interval = 5s;
79-
run(conn);
78+
conn->async_run(cfg, {boost::redis::logger::level::debug},
79+
[&](boost::system::error_code const& ec)
80+
{
81+
std::cout << "async_run: " << ec.message() << std::endl;
82+
conn->cancel();
83+
}
84+
);
8085

8186
ioc.run();
8287
}

test/test_conn_quit.cpp

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,6 @@ using boost::redis::response;
2020
using boost::redis::ignore;
2121
using namespace std::chrono_literals;
2222

23-
BOOST_AUTO_TEST_CASE(test_eof_no_error)
24-
{
25-
request req;
26-
req.get_config().cancel_on_connection_lost = false;
27-
req.push("QUIT");
28-
29-
net::io_context ioc;
30-
auto conn = std::make_shared<connection>(ioc);
31-
32-
conn->async_exec(req, ignore, [&](auto ec, auto) {
33-
BOOST_TEST(!ec);
34-
conn->cancel(operation::reconnection);
35-
});
36-
37-
run(conn);
38-
ioc.run();
39-
}
40-
4123
// Test if quit causes async_run to exit.
4224
BOOST_AUTO_TEST_CASE(test_async_run_exits)
4325
{

0 commit comments

Comments
 (0)