File tree Expand file tree Collapse file tree 3 files changed +33
-9
lines changed Expand file tree Collapse file tree 3 files changed +33
-9
lines changed Original file line number Diff line number Diff line change @@ -87,8 +87,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
8787 }
8888
8989 void Cancel () {
90- // Close socket for clean disconnect .
91- CloseSocket ();
90+ // Shutdown socket and allow IO loops to return .
91+ ShutdownSocket ();
9292 streamer_.Cancel ();
9393 }
9494
@@ -100,6 +100,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
100100 return exec_st_.GetError ();
101101 }
102102
103+ using ProtocolClient::CloseSocket;
104+
103105 private:
104106 RestoreStreamer streamer_;
105107};
@@ -120,7 +122,10 @@ OutgoingMigration::~OutgoingMigration() {
120122 exec_st_.JoinErrorHandler ();
121123 // Destroy each flow in its dedicated thread, because we could be the last
122124 // owner of the db tables
123- OnAllShards ([](auto & migration) { migration.reset (); });
125+ OnAllShards ([](auto & migration) {
126+ migration->CloseSocket ();
127+ migration.reset ();
128+ });
124129}
125130
126131bool OutgoingMigration::ChangeState (MigrationState new_state) {
Original file line number Diff line number Diff line change @@ -220,21 +220,30 @@ error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_time
220220 return error_code{};
221221}
222222
223- void ProtocolClient::CloseSocket ( ) {
223+ void ProtocolClient::ShutdownSocketImpl ( bool should_close ) {
224224 unique_lock lk (sock_mu_);
225225 if (sock_) {
226- sock_->proactor ()->Await ([this ] {
226+ sock_->proactor ()->Await ([this , should_close ] {
227227 if (sock_->IsOpen ()) {
228228 auto ec = sock_->Shutdown (SHUT_RDWR);
229229 LOG_IF (ERROR, ec) << " Could not shutdown socket " << ec;
230230 }
231-
232- auto ec = sock_->Close (); // Quietly close.
233- LOG_IF (WARNING, ec) << " Error closing socket " << ec << " /" << ec.message ();
231+ if (should_close) {
232+ auto ec = sock_->Close (); // Quietly close.
233+ LOG_IF (WARNING, ec) << " Error closing socket " << ec << " /" << ec.message ();
234+ }
234235 });
235236 }
236237}
237238
239+ void ProtocolClient::CloseSocket () {
240+ return ShutdownSocketImpl (true );
241+ }
242+
243+ void ProtocolClient::ShutdownSocket () {
244+ return ShutdownSocketImpl (false );
245+ }
246+
238247void ProtocolClient::DefaultErrorHandler (const GenericError& err) {
239248 LOG (WARNING) << " Socket error: " << err.Format () << " in " << server_context_.Description ()
240249 << " , socket info: " << GetSocketInfo (sock_ ? sock_->native_handle () : -1 );
Original file line number Diff line number Diff line change @@ -40,7 +40,15 @@ class ProtocolClient {
4040 ProtocolClient (std::string master_host, uint16_t port);
4141 virtual ~ProtocolClient ();
4242
43- void CloseSocket (); // Close replica sockets.
43+ // First Shutdown() the socket and immediately Close() it.
44+ // Any attempt for IO in the socket after Close() will crash with CHECK fail.
45+ void CloseSocket ();
46+
47+ // Shutdown the underline socket but do not Close() it. By decoupling this, api
48+ // callers can shutdown the socket, wait for the relevant flows to gracefully exit
49+ // (by observing during an IO operation that the socket was shut down) and then finally
50+ // Close() the socket.
51+ void ShutdownSocket ();
4452
4553 uint64_t LastIoTime () const ;
4654 void TouchIoTime ();
@@ -117,6 +125,8 @@ class ProtocolClient {
117125 private:
118126 std::error_code Recv (util::FiberSocketBase* input, base::IoBuf* dest);
119127
128+ void ShutdownSocketImpl (bool should_close);
129+
120130 ServerContext server_context_;
121131
122132 std::unique_ptr<facade::RedisParser> parser_;
You can’t perform that action at this time.
0 commit comments