From adc84aa02570bcd13e5814ac36a32790dd1d9372 Mon Sep 17 00:00:00 2001 From: kostas Date: Mon, 27 Oct 2025 15:59:53 +0000 Subject: [PATCH] chore: close socket on destructor for outgoing migrations Signed-off-by: kostas --- src/server/cluster/outgoing_slot_migration.cc | 19 +++++++++++------ src/server/protocol_client.cc | 21 +++++++++++++------ src/server/protocol_client.h | 12 ++++++++++- src/server/replica.cc | 6 +++++- 4 files changed, 44 insertions(+), 14 deletions(-) diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index c5ad9525bbdb..ee7342ec459b 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -41,7 +41,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { } ~SliceSlotMigration() { - Cancel(); + CloseSocket(); exec_st_.JoinErrorHandler(); } @@ -87,8 +87,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { } void Cancel() { - // Close socket for clean disconnect. - CloseSocket(); + // Shutdown socket and allow IO loops to return. + ShutdownSocket(); streamer_.Cancel(); } @@ -100,6 +100,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { return exec_st_.GetError(); } + using ProtocolClient::CloseSocket; + private: RestoreStreamer streamer_; }; @@ -120,7 +122,13 @@ OutgoingMigration::~OutgoingMigration() { exec_st_.JoinErrorHandler(); // Destroy each flow in its dedicated thread, because we could be the last // owner of the db tables - OnAllShards([](auto& migration) { migration.reset(); }); + OnAllShards([](auto& migration) { + if (migration) { + migration.reset(); + } + }); + + CloseSocket(); } bool OutgoingMigration::ChangeState(MigrationState new_state) { @@ -161,7 +169,7 @@ void OutgoingMigration::Finish(const GenericError& error) { } bool should_cancel_flows = false; - absl::Cleanup on_exit([this]() { CloseSocket(); }); + absl::Cleanup on_exit([this]() { ShutdownSocket(); }); { util::fb2::LockGuard lk(state_mu_); @@ -315,7 +323,6 @@ void OutgoingMigration::SyncFb() { break; } - CloseSocket(); VLOG(1) << "Exiting outgoing migration fiber for migration " << migration_info_.ToString(); } diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index 166d439d2ae5..e6fb5052d25e 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -220,25 +220,34 @@ error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_time return error_code{}; } -void ProtocolClient::CloseSocket() { +void ProtocolClient::ShutdownSocketImpl(bool should_close) { unique_lock lk(sock_mu_); if (sock_) { - sock_->proactor()->Await([this] { + sock_->proactor()->Await([this, should_close] { if (sock_->IsOpen()) { auto ec = sock_->Shutdown(SHUT_RDWR); LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec; } - - auto ec = sock_->Close(); // Quietly close. - LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message(); + if (should_close) { + auto ec = sock_->Close(); // Quietly close. + LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message(); + } }); } } +void ProtocolClient::CloseSocket() { + return ShutdownSocketImpl(true); +} + +void ProtocolClient::ShutdownSocket() { + return ShutdownSocketImpl(false); +} + void ProtocolClient::DefaultErrorHandler(const GenericError& err) { LOG(WARNING) << "Socket error: " << err.Format() << " in " << server_context_.Description() << ", socket info: " << GetSocketInfo(sock_ ? sock_->native_handle() : -1); - CloseSocket(); + ShutdownSocket(); } io::Result ProtocolClient::ReadRespReply(base::IoBuf* buffer, diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index 53d487866657..c521856a9310 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -40,7 +40,15 @@ class ProtocolClient { ProtocolClient(std::string master_host, uint16_t port); virtual ~ProtocolClient(); - void CloseSocket(); // Close replica sockets. + // First Shutdown() the socket and immediately Close() it. + // Any attempt for IO in the socket after Close() will crash with CHECK fail. + void CloseSocket(); + + // Shutdown the underline socket but do not Close() it. By decoupling this, api + // callers can shutdown the socket, wait for the relevant flows to gracefully exit + // (by observing during an IO operation that the socket was shut down) and then finally + // Close() the socket. + void ShutdownSocket(); uint64_t LastIoTime() const; void TouchIoTime(); @@ -117,6 +125,8 @@ class ProtocolClient { private: std::error_code Recv(util::FiberSocketBase* input, base::IoBuf* dest); + void ShutdownSocketImpl(bool should_close); + ServerContext server_context_; std::unique_ptr parser_; diff --git a/src/server/replica.cc b/src/server/replica.cc index 57f72cf8e911..da792cdf23b7 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -112,9 +112,11 @@ GenericError Replica::Start() { auto check_connection_error = [this](error_code ec, const char* msg) -> GenericError { if (!exec_st_.IsRunning()) { + CloseSocket(); return {"replication cancelled"}; } if (ec) { + CloseSocket(); exec_st_.ReportCancelError(); return {absl::StrCat(msg, ec.message())}; } @@ -169,6 +171,7 @@ std::optional Replica::Stop() { // Make sure the replica fully stopped and did all cleanup, // so we can freely release resources (connections). sync_fb_.JoinIfNeeded(); + CloseSocket(); DVLOG(1) << "MainReplicationFb stopped " << this; acks_fb_.JoinIfNeeded(); for (auto& flow : shard_flows_) { @@ -1091,6 +1094,7 @@ DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext m } DflyShardReplica::~DflyShardReplica() { + CloseSocket(); JoinFlow(); } @@ -1328,7 +1332,7 @@ void DflyShardReplica::JoinFlow() { void DflyShardReplica::Cancel() { if (rdb_loader_) rdb_loader_->stop(); - CloseSocket(); + ShutdownSocket(); shard_replica_waker_.notifyAll(); }