Skip to content

Commit fc657ca

Browse files
authored
chore: separate shutdown and close for ProtocolClient (#5969)
* separate Close() and Shutdown() in ProtocolClient Signed-off-by: kostas <[email protected]>
1 parent 8ce10fe commit fc657ca

File tree

4 files changed

+44
-14
lines changed

4 files changed

+44
-14
lines changed

src/server/cluster/outgoing_slot_migration.cc

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
4141
}
4242

4343
~SliceSlotMigration() {
44-
Cancel();
44+
CloseSocket();
4545
exec_st_.JoinErrorHandler();
4646
}
4747

@@ -87,8 +87,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
8787
}
8888

8989
void Cancel() {
90-
// Close socket for clean disconnect.
91-
CloseSocket();
90+
// Shutdown socket and allow IO loops to return.
91+
ShutdownSocket();
9292
streamer_.Cancel();
9393
}
9494

@@ -100,6 +100,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
100100
return exec_st_.GetError();
101101
}
102102

103+
using ProtocolClient::CloseSocket;
104+
103105
private:
104106
RestoreStreamer streamer_;
105107
};
@@ -120,7 +122,13 @@ OutgoingMigration::~OutgoingMigration() {
120122
exec_st_.JoinErrorHandler();
121123
// Destroy each flow in its dedicated thread, because we could be the last
122124
// owner of the db tables
123-
OnAllShards([](auto& migration) { migration.reset(); });
125+
OnAllShards([](auto& migration) {
126+
if (migration) {
127+
migration.reset();
128+
}
129+
});
130+
131+
CloseSocket();
124132
}
125133

126134
bool OutgoingMigration::ChangeState(MigrationState new_state) {
@@ -161,7 +169,7 @@ void OutgoingMigration::Finish(const GenericError& error) {
161169
}
162170

163171
bool should_cancel_flows = false;
164-
absl::Cleanup on_exit([this]() { CloseSocket(); });
172+
absl::Cleanup on_exit([this]() { ShutdownSocket(); });
165173

166174
{
167175
util::fb2::LockGuard lk(state_mu_);
@@ -315,7 +323,6 @@ void OutgoingMigration::SyncFb() {
315323
break;
316324
}
317325

318-
CloseSocket();
319326
VLOG(1) << "Exiting outgoing migration fiber for migration " << migration_info_.ToString();
320327
}
321328

src/server/protocol_client.cc

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,25 +220,34 @@ error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_time
220220
return error_code{};
221221
}
222222

223-
void ProtocolClient::CloseSocket() {
223+
void ProtocolClient::ShutdownSocketImpl(bool should_close) {
224224
unique_lock lk(sock_mu_);
225225
if (sock_) {
226-
sock_->proactor()->Await([this] {
226+
sock_->proactor()->Await([this, should_close] {
227227
if (sock_->IsOpen()) {
228228
auto ec = sock_->Shutdown(SHUT_RDWR);
229229
LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec;
230230
}
231-
232-
auto ec = sock_->Close(); // Quietly close.
233-
LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message();
231+
if (should_close) {
232+
auto ec = sock_->Close(); // Quietly close.
233+
LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message();
234+
}
234235
});
235236
}
236237
}
237238

239+
void ProtocolClient::CloseSocket() {
240+
return ShutdownSocketImpl(true);
241+
}
242+
243+
void ProtocolClient::ShutdownSocket() {
244+
return ShutdownSocketImpl(false);
245+
}
246+
238247
void ProtocolClient::DefaultErrorHandler(const GenericError& err) {
239248
LOG(WARNING) << "Socket error: " << err.Format() << " in " << server_context_.Description()
240249
<< ", socket info: " << GetSocketInfo(sock_ ? sock_->native_handle() : -1);
241-
CloseSocket();
250+
ShutdownSocket();
242251
}
243252

244253
io::Result<ProtocolClient::ReadRespRes> ProtocolClient::ReadRespReply(base::IoBuf* buffer,

src/server/protocol_client.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,15 @@ class ProtocolClient {
4040
ProtocolClient(std::string master_host, uint16_t port);
4141
virtual ~ProtocolClient();
4242

43-
void CloseSocket(); // Close replica sockets.
43+
// First Shutdown() the socket and immediately Close() it.
44+
// Any attempt for IO in the socket after Close() will crash with CHECK fail.
45+
void CloseSocket();
46+
47+
// Shutdown the underline socket but do not Close() it. By decoupling this, api
48+
// callers can shutdown the socket, wait for the relevant flows to gracefully exit
49+
// (by observing during an IO operation that the socket was shut down) and then finally
50+
// Close() the socket.
51+
void ShutdownSocket();
4452

4553
uint64_t LastIoTime() const;
4654
void TouchIoTime();
@@ -117,6 +125,8 @@ class ProtocolClient {
117125
private:
118126
std::error_code Recv(util::FiberSocketBase* input, base::IoBuf* dest);
119127

128+
void ShutdownSocketImpl(bool should_close);
129+
120130
ServerContext server_context_;
121131

122132
std::unique_ptr<facade::RedisParser> parser_;

src/server/replica.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,11 @@ GenericError Replica::Start() {
112112

113113
auto check_connection_error = [this](error_code ec, const char* msg) -> GenericError {
114114
if (!exec_st_.IsRunning()) {
115+
CloseSocket();
115116
return {"replication cancelled"};
116117
}
117118
if (ec) {
119+
CloseSocket();
118120
exec_st_.ReportCancelError();
119121
return {absl::StrCat(msg, ec.message())};
120122
}
@@ -169,6 +171,7 @@ std::optional<Replica::LastMasterSyncData> Replica::Stop() {
169171
// Make sure the replica fully stopped and did all cleanup,
170172
// so we can freely release resources (connections).
171173
sync_fb_.JoinIfNeeded();
174+
CloseSocket();
172175
DVLOG(1) << "MainReplicationFb stopped " << this;
173176
acks_fb_.JoinIfNeeded();
174177
for (auto& flow : shard_flows_) {
@@ -1091,6 +1094,7 @@ DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext m
10911094
}
10921095

10931096
DflyShardReplica::~DflyShardReplica() {
1097+
CloseSocket();
10941098
JoinFlow();
10951099
}
10961100

@@ -1328,7 +1332,7 @@ void DflyShardReplica::JoinFlow() {
13281332
void DflyShardReplica::Cancel() {
13291333
if (rdb_loader_)
13301334
rdb_loader_->stop();
1331-
CloseSocket();
1335+
ShutdownSocket();
13321336
shard_replica_waker_.notifyAll();
13331337
}
13341338

0 commit comments

Comments
 (0)