Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
}

~SliceSlotMigration() {
Cancel();
CloseSocket();
exec_st_.JoinErrorHandler();
}

Expand Down Expand Up @@ -87,8 +87,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
}

void Cancel() {
// Close socket for clean disconnect.
CloseSocket();
// Shutdown socket and allow IO loops to return.
ShutdownSocket();
streamer_.Cancel();
}

Expand All @@ -100,6 +100,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
return exec_st_.GetError();
}

using ProtocolClient::CloseSocket;

private:
RestoreStreamer streamer_;
};
Expand All @@ -120,7 +122,13 @@ OutgoingMigration::~OutgoingMigration() {
exec_st_.JoinErrorHandler();
// Destroy each flow in its dedicated thread, because we could be the last
// owner of the db tables
OnAllShards([](auto& migration) { migration.reset(); });
OnAllShards([](auto& migration) {
if (migration) {
migration.reset();
}
});

CloseSocket();
}

bool OutgoingMigration::ChangeState(MigrationState new_state) {
Expand Down Expand Up @@ -161,7 +169,7 @@ void OutgoingMigration::Finish(const GenericError& error) {
}

bool should_cancel_flows = false;
absl::Cleanup on_exit([this]() { CloseSocket(); });
absl::Cleanup on_exit([this]() { ShutdownSocket(); });

{
util::fb2::LockGuard lk(state_mu_);
Expand Down Expand Up @@ -315,7 +323,6 @@ void OutgoingMigration::SyncFb() {
break;
}

CloseSocket();
VLOG(1) << "Exiting outgoing migration fiber for migration " << migration_info_.ToString();
}

Expand Down
21 changes: 15 additions & 6 deletions src/server/protocol_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,25 +220,34 @@ error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_time
return error_code{};
}

void ProtocolClient::CloseSocket() {
void ProtocolClient::ShutdownSocketImpl(bool should_close) {
unique_lock lk(sock_mu_);
if (sock_) {
sock_->proactor()->Await([this] {
sock_->proactor()->Await([this, should_close] {
if (sock_->IsOpen()) {
auto ec = sock_->Shutdown(SHUT_RDWR);
LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec;
}

auto ec = sock_->Close(); // Quietly close.
LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message();
if (should_close) {
auto ec = sock_->Close(); // Quietly close.
LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message();
}
});
}
}

void ProtocolClient::CloseSocket() {
return ShutdownSocketImpl(true);
}

void ProtocolClient::ShutdownSocket() {
return ShutdownSocketImpl(false);
}

void ProtocolClient::DefaultErrorHandler(const GenericError& err) {
LOG(WARNING) << "Socket error: " << err.Format() << " in " << server_context_.Description()
<< ", socket info: " << GetSocketInfo(sock_ ? sock_->native_handle() : -1);
CloseSocket();
ShutdownSocket();
}

io::Result<ProtocolClient::ReadRespRes> ProtocolClient::ReadRespReply(base::IoBuf* buffer,
Expand Down
12 changes: 11 additions & 1 deletion src/server/protocol_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,15 @@ class ProtocolClient {
ProtocolClient(std::string master_host, uint16_t port);
virtual ~ProtocolClient();

void CloseSocket(); // Close replica sockets.
// First Shutdown() the socket and immediately Close() it.
// Any attempt for IO in the socket after Close() will crash with CHECK fail.
void CloseSocket();

// Shutdown the underline socket but do not Close() it. By decoupling this, api
// callers can shutdown the socket, wait for the relevant flows to gracefully exit
// (by observing during an IO operation that the socket was shut down) and then finally
// Close() the socket.
void ShutdownSocket();

uint64_t LastIoTime() const;
void TouchIoTime();
Expand Down Expand Up @@ -117,6 +125,8 @@ class ProtocolClient {
private:
std::error_code Recv(util::FiberSocketBase* input, base::IoBuf* dest);

void ShutdownSocketImpl(bool should_close);

ServerContext server_context_;

std::unique_ptr<facade::RedisParser> parser_;
Expand Down
6 changes: 5 additions & 1 deletion src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ GenericError Replica::Start() {

auto check_connection_error = [this](error_code ec, const char* msg) -> GenericError {
if (!exec_st_.IsRunning()) {
CloseSocket();
return {"replication cancelled"};
}
if (ec) {
CloseSocket();
exec_st_.ReportCancelError();
return {absl::StrCat(msg, ec.message())};
}
Expand Down Expand Up @@ -169,6 +171,7 @@ std::optional<Replica::LastMasterSyncData> Replica::Stop() {
// Make sure the replica fully stopped and did all cleanup,
// so we can freely release resources (connections).
sync_fb_.JoinIfNeeded();
CloseSocket();
DVLOG(1) << "MainReplicationFb stopped " << this;
acks_fb_.JoinIfNeeded();
for (auto& flow : shard_flows_) {
Expand Down Expand Up @@ -1091,6 +1094,7 @@ DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext m
}

DflyShardReplica::~DflyShardReplica() {
CloseSocket();
JoinFlow();
}

Expand Down Expand Up @@ -1328,7 +1332,7 @@ void DflyShardReplica::JoinFlow() {
void DflyShardReplica::Cancel() {
if (rdb_loader_)
rdb_loader_->stop();
CloseSocket();
ShutdownSocket();
shard_replica_waker_.notifyAll();
}

Expand Down
Loading