Skip to content

Commit 008dea5

Browse files
committed
transport: add stream termination callback (recovery; part one)
* introduce `transport.Parent` - parent-child between higher-level entities (xactions, data movers) - and their underlying streams * add Parent.TermedCB() to notify parent when stream aborts * add ReopenPeerStream() to replace the failed stream (or round-robin streams) to a given destination * with substantial refactoring Signed-off-by: Alex Aizman <[email protected]>
1 parent d42ec2f commit 008dea5

File tree

19 files changed

+189
-99
lines changed

19 files changed

+189
-99
lines changed

ais/tgtobj.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1957,17 +1957,16 @@ func (coi *coi) _send(t *target, lom *core.LOM, sargs *sendArgs) (res xs.CoiRes)
19571957

19581958
// use data mover to transmit objects to other targets
19591959
// (compare with coi.put())
1960-
func (coi *coi) _dm(lom *core.LOM, sargs *sendArgs) error {
1960+
func (*coi) _dm(lom *core.LOM, sargs *sendArgs) error {
19611961
debug.Assert(sargs.dm.OWT() == sargs.owt)
1962-
debug.Assert(sargs.dm.GetXact() == coi.Xact || sargs.dm.GetXact().ID() == coi.Xact.ID())
19631962
o := transport.AllocSend()
19641963
hdr, oa := &o.Hdr, sargs.objAttrs
19651964
{
19661965
hdr.Bck.Copy(sargs.bckTo.Bucket())
19671966
hdr.ObjName = sargs.objNameTo
19681967
hdr.ObjAttrs.CopyFrom(oa, false /*skip cksum*/)
19691968
}
1970-
o.Callback = func(_ *transport.ObjHdr, _ io.ReadCloser, _ any, _ error) {
1969+
o.SentCB = func(_ *transport.ObjHdr, _ io.ReadCloser, _ any, _ error) {
19711970
core.FreeLOM(lom)
19721971
}
19731972
return sargs.dm.Send(o, sargs.reader, sargs.tsi)

ec/getjogger.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,7 @@ func (c *getJogger) uploadRestoredSlices(ctx *restoreCtx, slices []*slice) error
761761
}
762762

763763
// Every slice's SGL is freed upon transfer completion
764-
cb := func(daemonID string, s *slice, rdr cos.ReadOpenCloser) transport.ObjSentCB {
764+
cb := func(daemonID string, s *slice, rdr cos.ReadOpenCloser) transport.SentCB {
765765
return func(_ *transport.ObjHdr, _ io.ReadCloser, _ any, err error) {
766766
if err != nil {
767767
nlog.Errorf("%s failed to send %s to %v: %v", core.T, ctx.lom, daemonID, err)

ec/manager.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,11 @@ func (mgr *Manager) OpenStreams(withRefc bool) {
101101
client = transport.NewIntraDataClient()
102102
config = cmn.GCO.Get()
103103
compression = config.EC.Compression
104-
extraReq = transport.Extra{Callback: cbReq, Compression: compression, Config: config}
104+
extraReq = transport.Extra{
105+
Parent: &transport.Parent{SentCB: cbReq},
106+
Compression: compression,
107+
Config: config,
108+
}
105109
)
106110
reqSbArgs := bundle.Args{
107111
Multiplier: config.EC.SbundleMult,

ec/putjogger.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ func (c *putJogger) cleanup(lom *core.LOM) error {
346346
o := transport.AllocSend()
347347
o.Hdr = transport.ObjHdr{ObjName: lom.ObjName, Opaque: request, Opcode: reqDel}
348348
o.Hdr.Bck.Copy(lom.Bucket())
349-
o.Callback = c.ctSendCallback
349+
o.SentCB = c.ctSendCallback
350350
c.parent.IncPending()
351351
return c.parent.mgr.req().Send(o, nil, nodes...)
352352
}

ec/xaction.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (r *xactECBase) dataResponse(act intraReqType, hdr *transport.ObjHdr, fqn s
175175
rHdr.Opaque = ireq.NewPack(g.smm)
176176

177177
o := transport.AllocSend()
178-
o.Hdr, o.Callback = rHdr, r.sendCb
178+
o.Hdr, o.SentCB = rHdr, r.sendCb
179179

180180
r.ObjsAdd(1, objAttrs.Size)
181181
r.IncPending()
@@ -312,7 +312,7 @@ func (r *xactECBase) unregWriter(uname string) {
312312
// The counter is used for sending slices of one big SGL to a few nodes. In
313313
// this case every slice must be sent to only one target, and transport bundle
314314
// cannot help to track automatically when SGL should be freed.
315-
func (r *xactECBase) writeRemote(daemonIDs []string, lom *core.LOM, src *dataSource, cb transport.ObjSentCB) error {
315+
func (r *xactECBase) writeRemote(daemonIDs []string, lom *core.LOM, src *dataSource, cb transport.SentCB) error {
316316
if src.metadata != nil && src.metadata.ObjVersion == "" {
317317
src.metadata.ObjVersion = lom.Version()
318318
}
@@ -351,7 +351,7 @@ func (r *xactECBase) writeRemote(daemonIDs []string, lom *core.LOM, src *dataSou
351351
}
352352

353353
o := transport.AllocSend()
354-
o.Hdr, o.Callback = hdr, cb
354+
o.Hdr, o.SentCB = hdr, cb
355355

356356
r.IncPending()
357357
return r.sendByDaemonID(daemonIDs, o, src.reader, false)

ext/dsort/dsort.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ func (m *Manager) createShard(s *shard.Shard, lom *core.LOM) error {
381381
// Make send synchronous.
382382
streamWg := &sync.WaitGroup{}
383383
errCh := make(chan error, 1)
384-
o.Callback = func(_ *transport.ObjHdr, _ io.ReadCloser, _ any, err error) {
384+
o.SentCB = func(_ *transport.ObjHdr, _ io.ReadCloser, _ any, err error) {
385385
errCh <- err
386386
streamWg.Done()
387387
}

ext/dsort/dsort_general.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ func (ds *dsorterGeneral) loadRemote(w io.Writer, rec *shard.Record, obj *shard.
329329
opaque := cos.MustMarshal(req)
330330
o := transport.AllocSend()
331331
o.Hdr = transport.ObjHdr{Opaque: opaque}
332-
o.Callback, o.CmplArg = ds.sentCallback, &req
332+
o.SentCB, o.CmplArg = ds.sentCallback, &req
333333

334334
if err := ds.streams.request.Send(o, nil, tsi); err != nil {
335335
return 0, errors.WithStack(err)
@@ -431,7 +431,7 @@ func (ds *dsorterGeneral) recvReq(hdr *transport.ObjHdr, objReader io.Reader, er
431431

432432
o := transport.AllocSend()
433433
o.Hdr = transport.ObjHdr{ObjName: req.Record.MakeUniqueName(req.RecordObj)}
434-
o.Callback = ds.responseCallback
434+
o.SentCB = ds.responseCallback
435435

436436
fullContentPath := ds.m.recm.FullContentPath(req.RecordObj)
437437

ext/dsort/dsort_mem.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,7 @@ func (resp *dsmCS) connectOrSend(r cos.ReadOpenCloser) (err error) {
628628
} else {
629629
o := transport.AllocSend()
630630
o.Hdr = resp.hdr
631-
o.Callback, o.CmplArg = resp.ds.sentCallback, &resp.rsp
631+
o.SentCB, o.CmplArg = resp.ds.sentCallback, &resp.rsp
632632
err = resp.ds.streams.response.Send(o, r, resp.tsi)
633633
resp.decRef = true // sentCallback will call decrementRef
634634
}

reb/globrun.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,6 @@ func (rj *rebJogger) doSend(lom *core.LOM, tsi *meta.Snode, roc cos.ReadOpenClos
912912
o.Hdr.ObjName = lom.ObjName
913913
o.Hdr.Opaque = opaque
914914
o.Hdr.ObjAttrs.CopyFrom(lom.ObjAttrs(), false /*skip cksum*/)
915-
o.Callback, o.CmplArg = rj.objSentCallback, lom
915+
o.SentCB, o.CmplArg = rj.objSentCallback, lom
916916
return rj.m.dm.Send(o, roc, tsi)
917917
}

transport/api.go

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,28 @@ const sizeofh = int(unsafe.Sizeof(Obj{}))
5858
// Correct usage: consume ObjHdr synchronously and inline inside the recv() callback.
5959

6060
type (
61-
// advanced usage: additional stream control
61+
// object-sent callback that has the following signature can optionally be defined on a:
62+
// a) per-stream basis (via NewStream constructor - see Extra struct above)
63+
// b) for a given object that is being sent (for instance, to support a call-per-batch semantics)
64+
// Naturally, object callback "overrides" the per-stream one: when object callback is defined
65+
// (i.e., non-nil), the stream callback is ignored/skipped.
66+
// NOTE: if defined, the callback executes asynchronously as far as the sending part is concerned
67+
SentCB func(*ObjHdr, io.ReadCloser, any, error)
68+
69+
// scope: stream
70+
TermedCB func(error)
71+
72+
// usage and scope:
73+
// - entire stream's lifetime (all Send() calls)
74+
// - additional stream control
75+
// - global or optional params (to override defaults)
76+
Parent struct {
77+
Xact core.Xact // sender ID; abort
78+
SentCB SentCB // to free SGLs, close files, etc. cleanup
79+
TermedCB TermedCB // when err-ed
80+
}
6281
Extra struct {
63-
Xact core.Xact // usage: sender ID; abort
64-
Callback ObjSentCB // typical usage: to free SGLs, close files
82+
Parent *Parent
6583
Config *cmn.Config // (to optimize-out GCO.Get())
6684
Compression string // see CompressAlways, etc. enum
6785
IdleTeardown time.Duration // when exceeded, causes PUT to terminate (and to renew upon the very next send)
@@ -70,11 +88,7 @@ type (
7088
MaxHdrSize int32 // overrides config.Transport.MaxHeaderSize
7189
}
7290

73-
// receive-side session stats indexed by session ID (see recv.go for "uid")
74-
// optional, currently tests only
75-
RxStats map[uint64]*Stats
76-
77-
// object header
91+
// _object_ header (not to confuse w/ objects in buckets)
7892
ObjHdr struct {
7993
Bck cmn.Bck
8094
ObjName string
@@ -86,35 +100,27 @@ type (
86100
}
87101
// object to transmit
88102
Obj struct {
89-
Reader io.ReadCloser // reader (to read the object, and close when done)
90-
CmplArg any // optional context passed to the ObjSentCB callback
91-
Callback ObjSentCB // called when the last byte is sent _or_ when the stream terminates (see term.reason)
92-
prc *atomic.Int64 // private; if present, ref-counts so that we call ObjSentCB only once
93-
Hdr ObjHdr
94-
}
95-
96-
// object-sent callback that has the following signature can optionally be defined on a:
97-
// a) per-stream basis (via NewStream constructor - see Extra struct above)
98-
// b) for a given object that is being sent (for instance, to support a call-per-batch semantics)
99-
// Naturally, object callback "overrides" the per-stream one: when object callback is defined
100-
// (i.e., non-nil), the stream callback is ignored/skipped.
101-
// NOTE: if defined, the callback executes asynchronously as far as the sending part is concerned
102-
ObjSentCB func(*ObjHdr, io.ReadCloser, any, error)
103-
104-
Msg struct {
105-
SID string
106-
Body []byte
107-
Opcode int
103+
Reader io.ReadCloser // reader (to read the object, and close when done)
104+
CmplArg any // optional context passed to the SentCB callback
105+
SentCB SentCB // called when the last byte is sent _or_ when the stream terminates (see term.reason)
106+
prc *atomic.Int64 // private; if present, ref-counts so that we call SentCB only once
107+
Hdr ObjHdr
108108
}
109109

110110
// stream collector
111111
StreamCollector struct{}
112112

113113
// Rx callbacks
114114
RecvObj func(hdr *ObjHdr, objReader io.Reader, err error) error
115-
RecvMsg func(msg Msg, err error) error
116115
)
117116

117+
// receive-side session stats indexed by session ID (see recv.go for "uid")
118+
// optional, currently tests only
119+
type (
120+
RxStats map[uint64]*Stats
121+
)
122+
123+
// shared data mover (SDM)
118124
type (
119125
Receiver interface {
120126
ID() string
@@ -134,7 +140,9 @@ func NewObjStream(client Client, dstURL, dstID string, extra *Extra) (s *Stream)
134140
}
135141
s = &Stream{streamBase: *newBase(client, dstURL, dstID, extra)}
136142
s.streamBase.streamer = s
137-
s.callback = extra.Callback
143+
if extra.Parent != nil {
144+
s.sentCB = extra.Parent.SentCB
145+
}
138146
if extra.Compressed() {
139147
s.initCompression(extra)
140148
}
@@ -162,10 +170,10 @@ func NewObjStream(client Client, dstURL, dstID string, extra *Extra) (s *Stream)
162170
// when the header's Dsize field is set to zero), the reader is not required and the
163171
// corresponding argument in Send() can be set to nil.
164172
// - object reader is *always* closed irrespectively of whether the Send() succeeds
165-
// or fails. On success, if send-completion (ObjSentCB) callback is provided
173+
// or fails. On success, if send-completion (SentCB) callback is provided
166174
// (i.e., non-nil), the closing is done by doCmpl().
167175
// - Optional reference counting is also done by (and in) the doCmpl, so that the
168-
// ObjSentCB gets called if and only when the refcount (if provided i.e., non-nil)
176+
// SentCB gets called if and only when the refcount (if provided i.e., non-nil)
169177
// reaches zero.
170178
// - For every transmission of every object there's always an doCmpl() completion
171179
// (with its refcounting and reader-closing). This holds true in all cases including

0 commit comments

Comments
 (0)