Skip to content

Commit e98d7d3

Browse files
committed
Removes run_op from runner and renames runner to resp3_handshaker.
1 parent 8c9c9e2 commit e98d7d3

File tree

2 files changed

+152
-170
lines changed

2 files changed

+152
-170
lines changed

include/boost/redis/detail/connection_base.hpp

Lines changed: 141 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
#include <boost/asio/read_until.hpp>
3333
#include <boost/asio/buffer.hpp>
3434
#include <boost/asio/experimental/channel.hpp>
35+
#include <boost/asio/prepend.hpp>
36+
#include <boost/asio/cancel_after.hpp>
37+
#include <boost/asio/experimental/parallel_group.hpp>
3538

3639
#include <algorithm>
3740
#include <array>
@@ -318,6 +321,135 @@ struct reader_op {
318321
}
319322
};
320323

324+
template <class Conn, class Logger>
325+
class run_op {
326+
private:
327+
Conn* conn_ = nullptr;
328+
Logger logger_;
329+
asio::coroutine coro_{};
330+
331+
using order_t = std::array<std::size_t, 5>;
332+
333+
public:
334+
run_op(Conn* conn, Logger l)
335+
: conn_{conn}
336+
, logger_{l}
337+
{}
338+
339+
template <class Self>
340+
void operator()( Self& self
341+
, order_t order = {}
342+
, system::error_code ec0 = {}
343+
, system::error_code ec1 = {}
344+
, system::error_code ec2 = {}
345+
, system::error_code ec3 = {}
346+
, system::error_code ec4 = {})
347+
{
348+
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
349+
{
350+
BOOST_ASIO_CORO_YIELD
351+
conn_->resv_.async_resolve(asio::prepend(std::move(self), order_t {}));
352+
353+
logger_.on_resolve(ec0, conn_->resv_.results());
354+
355+
if (ec0) {
356+
self.complete(ec0);
357+
return;
358+
}
359+
360+
BOOST_ASIO_CORO_YIELD
361+
conn_->ctor_.async_connect(
362+
conn_->next_layer().next_layer(),
363+
conn_->resv_.results(),
364+
asio::prepend(std::move(self), order_t {}));
365+
366+
logger_.on_connect(ec0, conn_->ctor_.endpoint());
367+
368+
if (ec0) {
369+
self.complete(ec0);
370+
return;
371+
}
372+
373+
if (conn_->use_ssl()) {
374+
BOOST_ASIO_CORO_YIELD
375+
conn_->next_layer().async_handshake(
376+
asio::ssl::stream_base::client,
377+
asio::prepend(
378+
asio::cancel_after(
379+
conn_->cfg_.ssl_handshake_timeout,
380+
std::move(self)
381+
),
382+
order_t {}
383+
)
384+
);
385+
386+
logger_.on_ssl_handshake(ec0);
387+
388+
if (ec0) {
389+
self.complete(ec0);
390+
return;
391+
}
392+
}
393+
394+
conn_->reset();
395+
396+
// Note: Oder is important here because the writer might
397+
// trigger an async_write before the async_hello thereby
398+
// causing an authentication problem.
399+
BOOST_ASIO_CORO_YIELD
400+
asio::experimental::make_parallel_group(
401+
[this](auto token) { return conn_->handshaker_.async_hello(*conn_, logger_, token); },
402+
[this](auto token) { return conn_->health_checker_.async_ping(*conn_, logger_, token); },
403+
[this](auto token) { return conn_->health_checker_.async_check_timeout(*conn_, logger_, token);},
404+
[this](auto token) { return conn_->reader(logger_, token);},
405+
[this](auto token) { return conn_->writer(logger_, token);}
406+
).async_wait(
407+
asio::experimental::wait_for_one_error(),
408+
std::move(self));
409+
410+
if (order[0] == 0 && !!ec0) {
411+
self.complete(ec0);
412+
return;
413+
}
414+
415+
if (order[0] == 2 && ec2 == error::pong_timeout) {
416+
self.complete(ec1);
417+
return;
418+
}
419+
420+
// The receive operation must be cancelled because channel
421+
// subscription does not survive a reconnection but requires
422+
// re-subscription.
423+
conn_->cancel(operation::receive);
424+
425+
if (!conn_->will_reconnect()) {
426+
conn_->cancel(operation::reconnection);
427+
self.complete(ec3);
428+
return;
429+
}
430+
431+
// It is safe to use the writer timer here because we are not
432+
// connected.
433+
conn_->writer_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
434+
435+
BOOST_ASIO_CORO_YIELD
436+
conn_->writer_timer_.async_wait(asio::prepend(std::move(self), order_t {}));
437+
if (ec0) {
438+
self.complete(ec0);
439+
return;
440+
}
441+
442+
if (!conn_->will_reconnect()) {
443+
self.complete(asio::error::operation_aborted);
444+
return;
445+
}
446+
447+
conn_->reset_stream();
448+
}
449+
}
450+
};
451+
452+
321453
/** @brief Base class for high level Redis asynchronous connections.
322454
* @ingroup high-level-api
323455
*
@@ -350,7 +482,6 @@ class connection_base {
350482
, receive_channel_{ex, 256}
351483
, resv_{ex}
352484
, health_checker_{ex}
353-
, runner_{ex, {}}
354485
, dbuf_{read_buffer_, max_read_size}
355486
{
356487
set_receive_response(ignore);
@@ -464,9 +595,13 @@ class connection_base {
464595
resv_.set_config(cfg);
465596
ctor_.set_config(cfg);
466597
health_checker_.set_config(cfg);
467-
runner_.set_config(cfg);
598+
handshaker_.set_config(cfg);
468599
l.set_prefix(cfg.log_prefix);
469-
return runner_.async_run(*this, l, std::move(token));
600+
601+
return asio::async_compose
602+
< CompletionToken
603+
, void(system::error_code)
604+
>(run_op<this_type, Logger>{this, l}, token, writer_timer_);
470605
}
471606

472607
template <class Response>
@@ -490,7 +625,7 @@ class connection_base {
490625
using receive_channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
491626
using resolver_type = resolver<Executor>;
492627
using health_checker_type = health_checker<Executor>;
493-
using runner_type = runner<executor_type>;
628+
using resp3_handshaker_type = resp3_handshaker<executor_type>;
494629
using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
495630
using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
496631
using exec_notifier_type = receive_channel_type;
@@ -669,7 +804,7 @@ class connection_base {
669804
template <class, class> friend struct reader_op;
670805
template <class, class> friend struct writer_op;
671806
template <class> friend struct exec_op;
672-
template <class, class, class> friend struct runner_op;
807+
template <class, class> friend class run_op;
673808

674809
void cancel_push_requests()
675810
{
@@ -899,7 +1034,7 @@ class connection_base {
8991034
resolver_type resv_;
9001035
connector ctor_;
9011036
health_checker_type health_checker_;
902-
runner_type runner_;
1037+
resp3_handshaker_type handshaker_;
9031038
receiver_adapter_type receive_adapter_;
9041039

9051040
using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;

0 commit comments

Comments
 (0)