Skip to content

Commit 19fbcba

Browse files
lwfacebook-github-bot
authored andcommitted
Rely on enrollment to propagate closing from ctx to conn/list
Summary: And thus retire the ClosingEmitter/Receiver from transports. Differential Revision: D25495914 fbshipit-source-id: 46451235ce678e85a17bc6f8d356573ea8157e6d
1 parent 0da0632 commit 19fbcba

File tree

3 files changed

+23
-25
lines changed

3 files changed

+23
-25
lines changed

tensorpipe/transport/connection_impl_boilerplate.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,6 @@ class ConnectionImplBoilerplate : public std::enable_shared_from_this<TConn> {
126126
// Deal with an error.
127127
void handleError();
128128

129-
ClosingReceiver closingReceiver_;
130-
131129
// A sequence number for the calls to read and write.
132130
uint64_t nextBufferBeingRead_{0};
133131
uint64_t nextBufferBeingWritten_{0};
@@ -145,9 +143,7 @@ ConnectionImplBoilerplate<TCtx, TList, TConn>::ConnectionImplBoilerplate(
145143
ConstructorToken /* unused */,
146144
std::shared_ptr<TCtx> context,
147145
std::string id)
148-
: context_(std::move(context)),
149-
id_(std::move(id)),
150-
closingReceiver_(context_, context_->getClosingEmitter()) {}
146+
: context_(std::move(context)), id_(std::move(id)) {}
151147

152148
template <typename TCtx, typename TList, typename TConn>
153149
void ConnectionImplBoilerplate<TCtx, TList, TConn>::init() {
@@ -166,8 +162,6 @@ void ConnectionImplBoilerplate<TCtx, TList, TConn>::initFromLoop() {
166162
return;
167163
}
168164

169-
closingReceiver_.activate(*this);
170-
171165
initImplFromLoop();
172166
}
173167

tensorpipe/transport/context_impl_boilerplate.h

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
#include <unordered_map>
1616
#include <utility>
1717

18-
#include <tensorpipe/common/callback.h>
1918
#include <tensorpipe/common/defs.h>
2019
#include <tensorpipe/transport/connection_boilerplate.h>
2120
#include <tensorpipe/transport/listener_boilerplate.h>
@@ -56,8 +55,6 @@ class ContextImplBoilerplate : public virtual DeferredExecutor,
5655
// this must be called from within the loop.
5756
bool closed();
5857

59-
ClosingEmitter& getClosingEmitter();
60-
6158
void setId(std::string id);
6259

6360
void close();
@@ -78,7 +75,6 @@ class ContextImplBoilerplate : public virtual DeferredExecutor,
7875
private:
7976
std::atomic<bool> closed_{false};
8077
std::atomic<bool> joined_{false};
81-
ClosingEmitter closingEmitter_;
8278

8379
const std::string domainDescriptor_;
8480

@@ -172,12 +168,6 @@ bool ContextImplBoilerplate<TCtx, TList, TConn>::closed() {
172168
return closed_;
173169
};
174170

175-
template <typename TCtx, typename TList, typename TConn>
176-
ClosingEmitter& ContextImplBoilerplate<TCtx, TList, TConn>::
177-
getClosingEmitter() {
178-
return closingEmitter_;
179-
};
180-
181171
template <typename TCtx, typename TList, typename TConn>
182172
void ContextImplBoilerplate<TCtx, TList, TConn>::setId(std::string id) {
183173
TP_VLOG(7) << "Transport context " << id_ << " was renamed to " << id;
@@ -192,7 +182,21 @@ void ContextImplBoilerplate<TCtx, TList, TConn>::close() {
192182
if (!closed_.exchange(true)) {
193183
TP_VLOG(7) << "Transport context " << id_ << " is closing";
194184

195-
closingEmitter_.close();
185+
// Make a copy as they could unenroll themselves inline.
186+
decltype(listeners_) listenersCopy = listeners_;
187+
decltype(connections_) connectionsCopy = connections_;
188+
// We call closeFromLoop, rather than just close, because we need these
189+
// objects to transition _immediately_ to error, "atomically". If we just
190+
// deferred closing to later, this could come after some already-enqueued
191+
// operations that could try to access the context, which would be closed,
192+
// and this could fail.
193+
for (auto& iter : listenersCopy) {
194+
iter.second->closeFromLoop();
195+
}
196+
for (auto& iter : connectionsCopy) {
197+
iter.second->closeFromLoop();
198+
}
199+
196200
closeImpl();
197201

198202
TP_VLOG(7) << "Transport context " << id_ << " done closing";

tensorpipe/transport/listener_impl_boilerplate.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,25 +104,27 @@ class ListenerImplBoilerplate : public std::enable_shared_from_this<TList> {
104104
// Deal with an error.
105105
void handleError();
106106

107-
ClosingReceiver closingReceiver_;
108-
109107
// A sequence number for the calls to accept.
110108
uint64_t nextConnectionBeingAccepted_{0};
111109

112110
// Sequence numbers for the connections created by this listener, used to
113111
// create their identifiers based off this listener's identifier. They will
114112
// only be used for logging and debugging.
115113
std::atomic<uint64_t> connectionCounter_{0};
114+
115+
// Contexts do sometimes need to call directly into closeForLoop, in order to
116+
// make sure that some of their operations can happen "atomically" on the
117+
// connection, without possibly other operations occurring in between (e.g.,
118+
// an error).
119+
friend ContextImplBoilerplate<TCtx, TList, TConn>;
116120
};
117121

118122
template <typename TCtx, typename TList, typename TConn>
119123
ListenerImplBoilerplate<TCtx, TList, TConn>::ListenerImplBoilerplate(
120124
ConstructorToken /* unused */,
121125
std::shared_ptr<TCtx> context,
122126
std::string id)
123-
: context_(std::move(context)),
124-
id_(std::move(id)),
125-
closingReceiver_(context_, context_->getClosingEmitter()) {}
127+
: context_(std::move(context)), id_(std::move(id)) {}
126128

127129
template <typename TCtx, typename TList, typename TConn>
128130
void ListenerImplBoilerplate<TCtx, TList, TConn>::init() {
@@ -141,8 +143,6 @@ void ListenerImplBoilerplate<TCtx, TList, TConn>::initFromLoop() {
141143
return;
142144
}
143145

144-
closingReceiver_.activate(*this);
145-
146146
initImplFromLoop();
147147
}
148148

0 commit comments

Comments
 (0)