Skip to content

Commit 82e5105

Browse files
committed
Reuses async_run parallel group for health-checker tasks.
1 parent 320c2ab commit 82e5105

File tree

3 files changed

+32
-75
lines changed

3 files changed

+32
-75
lines changed

include/boost/redis/detail/connection_base.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include <boost/system.hpp>
2121
#include <boost/asio/basic_stream_socket.hpp>
2222
#include <boost/asio/bind_executor.hpp>
23-
#include <boost/asio/experimental/parallel_group.hpp>
2423
#include <boost/asio/ip/tcp.hpp>
2524
#include <boost/asio/steady_timer.hpp>
2625
#include <boost/asio/write.hpp>

include/boost/redis/detail/health_checker.hpp

Lines changed: 18 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
#include <boost/asio/consign.hpp>
1818
#include <boost/asio/coroutine.hpp>
1919
#include <boost/asio/post.hpp>
20-
#include <boost/asio/experimental/parallel_group.hpp>
2120
#include <memory>
2221
#include <chrono>
2322

@@ -36,6 +35,14 @@ class ping_op {
3635
{
3736
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
3837
{
38+
if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
39+
logger_.trace("ping-op: timeout disabled. Exiting ...");
40+
BOOST_ASIO_CORO_YIELD
41+
asio::post(std::move(self));
42+
self.complete({});
43+
return;
44+
}
45+
3946
if (checker_->checker_has_exited_) {
4047
logger_.trace("ping_op: checker has exited. Exiting ...");
4148
self.complete({});
@@ -77,6 +84,14 @@ class check_timeout_op {
7784
{
7885
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
7986
{
87+
if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
88+
logger_.trace("check-timeout-op: timeout disabled. Exiting ...");
89+
BOOST_ASIO_CORO_YIELD
90+
asio::post(std::move(self));
91+
self.complete({});
92+
return;
93+
}
94+
8095
checker_->wait_timer_.expires_after(2 * checker_->ping_interval_);
8196
BOOST_ASIO_CORO_YIELD
8297
checker_->wait_timer_.async_wait(std::move(self));
@@ -108,51 +123,6 @@ class check_timeout_op {
108123
}
109124
};
110125

111-
template <class HealthChecker, class Connection, class Logger>
112-
class check_health_op {
113-
public:
114-
HealthChecker* checker_ = nullptr;
115-
Connection* conn_ = nullptr;
116-
Logger logger_;
117-
asio::coroutine coro_{};
118-
119-
template <class Self>
120-
void
121-
operator()(
122-
Self& self,
123-
std::array<std::size_t, 2> order = {},
124-
system::error_code ec1 = {},
125-
system::error_code ec2 = {})
126-
{
127-
BOOST_ASIO_CORO_REENTER (coro_)
128-
{
129-
if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
130-
logger_.trace("check-health-op: timeout disabled.");
131-
BOOST_ASIO_CORO_YIELD
132-
asio::post(std::move(self));
133-
self.complete({});
134-
return;
135-
}
136-
137-
BOOST_ASIO_CORO_YIELD
138-
asio::experimental::make_parallel_group(
139-
[this](auto token) { return checker_->async_ping(*conn_, logger_, token); },
140-
[this](auto token) { return checker_->async_check_timeout(*conn_, logger_, token);}
141-
).async_wait(
142-
asio::experimental::wait_for_one(),
143-
std::move(self));
144-
145-
logger_.on_check_health(ec1, ec2);
146-
147-
switch (order[0]) {
148-
case 0: self.complete(ec1); return;
149-
case 1: self.complete(ec2); return;
150-
default: BOOST_ASSERT(false);
151-
}
152-
}
153-
}
154-
};
155-
156126
template <class Executor>
157127
class health_checker {
158128
private:
@@ -177,24 +147,6 @@ class health_checker {
177147
ping_interval_ = cfg.health_check_interval;
178148
}
179149

180-
template <
181-
class Connection,
182-
class Logger,
183-
class CompletionToken = asio::default_completion_token_t<Executor>
184-
>
185-
auto
186-
async_check_health(
187-
Connection& conn,
188-
Logger l,
189-
CompletionToken token = CompletionToken{})
190-
{
191-
checker_has_exited_ = false;
192-
return asio::async_compose
193-
< CompletionToken
194-
, void(system::error_code)
195-
>(check_health_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn);
196-
}
197-
198150
std::size_t cancel(operation op)
199151
{
200152
switch (op) {
@@ -209,7 +161,6 @@ class health_checker {
209161
return 0;
210162
}
211163

212-
private:
213164
template <class Connection, class Logger, class CompletionToken>
214165
auto async_ping(Connection& conn, Logger l, CompletionToken token)
215166
{
@@ -228,9 +179,10 @@ class health_checker {
228179
>(check_timeout_op<health_checker, Connection, Logger>{this, &conn, l}, token, conn, wait_timer_);
229180
}
230181

182+
private:
183+
231184
template <class, class, class> friend class ping_op;
232185
template <class, class, class> friend class check_timeout_op;
233-
template <class, class, class> friend class check_health_op;
234186

235187
timer_type ping_timer_;
236188
timer_type wait_timer_;

include/boost/redis/detail/runner.hpp

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ namespace boost::redis::detail
3232

3333
void push_hello(config const& cfg, request& req);
3434

35+
// TODO: Can we avoid this whole function whose only purpose is to
36+
// check for an error in the hello response and complete with an error
37+
// so that the parallel group that starts it can exit?
3538
template <class Runner, class Connection, class Logger>
3639
struct hello_op {
3740
Runner* runner_ = nullptr;
@@ -50,10 +53,10 @@ struct hello_op {
5053
conn_->async_exec(runner_->hello_req_, runner_->hello_resp_, std::move(self));
5154
logger_.on_hello(ec, runner_->hello_resp_);
5255

53-
if (ec || runner_->has_error_in_response() || is_cancelled(self)) {
56+
if (ec || runner_->has_error_in_response()) {
5457
logger_.trace("hello-op: error/canceled. Exiting ...");
5558
conn_->cancel(operation::run);
56-
self.complete(!!ec ? ec : asio::error::operation_aborted);
59+
self.complete(ec);
5760
return;
5861
}
5962

@@ -70,7 +73,7 @@ class runner_op {
7073
Logger logger_;
7174
asio::coroutine coro_{};
7275

73-
using order_t = std::array<std::size_t, 4>;
76+
using order_t = std::array<std::size_t, 5>;
7477

7578
public:
7679
runner_op(Runner* runner, Connection* conn, Logger l)
@@ -85,7 +88,8 @@ class runner_op {
8588
, system::error_code ec0 = {}
8689
, system::error_code ec1 = {}
8790
, system::error_code ec2 = {}
88-
, system::error_code ec3 = {})
91+
, system::error_code ec3 = {}
92+
, system::error_code ec4 = {})
8993
{
9094
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
9195
{
@@ -130,18 +134,20 @@ class runner_op {
130134

131135
// Note: Oder is important here because the writer might
132136
// trigger an async_write before the async_hello thereby
133-
// causing authentication problems.
137+
// causing an authentication problem.
134138
BOOST_ASIO_CORO_YIELD
135139
asio::experimental::make_parallel_group(
136140
[this](auto token) { return runner_->async_hello(*conn_, logger_, token); },
137-
[this](auto token) { return runner_->health_checker_.async_check_health(*conn_, logger_, token); },
141+
[this](auto token) { return runner_->health_checker_.async_ping(*conn_, logger_, token); },
142+
[this](auto token) { return runner_->health_checker_.async_check_timeout(*conn_, logger_, token);},
138143
[this](auto token) { return conn_->reader(logger_, token);},
139144
[this](auto token) { return conn_->writer(logger_, token);}
140145
).async_wait(
141146
asio::experimental::wait_for_one_error(),
142147
std::move(self));
143148

144149
// TODO: Unify these lines.
150+
logger_.on_check_health(ec1, ec2);
145151
logger_.on_run(ec0, ec1);
146152
logger_.on_runner(ec0, ec1, ec2);
147153
logger_.on_connection_lost({});
@@ -151,7 +157,7 @@ class runner_op {
151157
return;
152158
}
153159

154-
if (order[0] == 1 && ec1 == error::pong_timeout) {
160+
if (order[0] == 2 && ec2 == error::pong_timeout) {
155161
self.complete(ec1);
156162
return;
157163
}
@@ -163,7 +169,7 @@ class runner_op {
163169

164170
if (!conn_->will_reconnect()) {
165171
conn_->cancel(operation::reconnection);
166-
self.complete(ec2);
172+
self.complete(ec3);
167173
return;
168174
}
169175

0 commit comments

Comments
 (0)