Skip to content

Commit 56a8b02

Browse files
committed
transport: revise retry logic; add stream reconnect (major)
* add `_shouldRetry()` to only retry on write timeouts * complete failed sends immediately before retry check * wire `TermedCB()` via shared data mover's reconnect callback * transport/recv errors will now wrap `io.ErrUnexpectedEOF` (something that libraries may in fact recognize) Signed-off-by: Alex Aizman <[email protected]>
1 parent 9242f29 commit 56a8b02

File tree

6 files changed

+83
-31
lines changed

6 files changed

+83
-31
lines changed

transport/api.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ type (
6767
SentCB func(*ObjHdr, io.ReadCloser, any, error)
6868

6969
// scope: stream
70-
TermedCB func(error)
70+
// flow: connection dead => terminate => TermedCB => [reconnect() => fresh stream to same peer]
71+
TermedCB func(dstID string, err error)
7172

7273
// usage and scope:
7374
// - entire stream's lifetime (all Send() calls)

transport/base.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,18 @@
66
package transport
77

88
import (
9+
"errors"
910
"fmt"
1011
"io"
12+
"net"
1113
"net/url"
1214
"os"
1315
"path"
1416
"runtime"
1517
"strconv"
1618
"strings"
1719
"sync"
20+
"syscall"
1821
"time"
1922

2023
"github.com/NVIDIA/aistore/api/apc"
@@ -290,13 +293,21 @@ func (s *streamBase) sendLoop(config *cmn.Config, dryrun bool) {
290293
retry = nil
291294
}
292295
} else {
293-
retriable := cos.IsRetriableConnErr(err)
294-
if retriable && retry == nil {
296+
// the current send failed - complete right away
297+
s.streamer.errCmpl(err)
298+
299+
if !_shouldRetry(err) {
300+
if cmn.Rom.V(4, cos.ModTransport) {
301+
nlog.Errorln(s.String(), "not retriable:", err)
302+
}
303+
reason = reasonError
304+
break
305+
}
306+
if retry == nil {
295307
retry = newRtry(config, s.String())
296308
}
297-
if !retriable || retry.timeout(err) {
309+
if retry.timeout(err) {
298310
reason = reasonError
299-
s.streamer.errCmpl(err)
300311
break
301312
}
302313

@@ -338,7 +349,7 @@ func (s *streamBase) sendLoop(config *cmn.Config, dryrun bool) {
338349
// notify parent if defined
339350
if reason != reasonStopped {
340351
if s.parent != nil && s.parent.TermedCB != nil {
341-
s.parent.TermedCB(errExt)
352+
s.parent.TermedCB(s.dstID, errExt)
342353
}
343354
}
344355

@@ -350,6 +361,15 @@ func (s *streamBase) sendLoop(config *cmn.Config, dryrun bool) {
350361
}
351362
}
352363

364+
// only for timeouts on *in-flight writes*
365+
func _shouldRetry(err error) bool {
366+
var nerr net.Error
367+
if errors.As(err, &nerr) && nerr.Timeout() {
368+
return true
369+
}
370+
return errors.Is(err, syscall.ETIMEDOUT)
371+
}
372+
353373
func (s *streamBase) yelp(err error) {
354374
nlog.WarningDepth(1, "Error:", s.String(), "[", err, "]")
355375
}

transport/bundle/bundle.go

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -440,10 +440,8 @@ func mdiff(oldMaps, newMaps []meta.NodeMap) (added, removed meta.NodeMap) {
440440
return
441441
}
442442

443-
// replace streams to a single peer (dstID) in the bundle:
444-
// - optional parentOverride
445-
// - otherwise, use sb.extra.Parent as-is
446-
func (sb *Streams) ReopenPeerStream(dstID string, parentOverride *transport.Parent) error {
443+
// renew stream (or streams) to a given peer
444+
func (sb *Streams) ReopenPeerStream(dstID string) error {
447445
// 1) validate
448446
old := sb.get()
449447
orobin, ok := old[dstID]
@@ -467,13 +465,10 @@ func (sb *Streams) ReopenPeerStream(dstID string, parentOverride *transport.Pare
467465
}
468466
dstURL := si.URL(sb.network) + transport.ObjURLPath(sb.trname)
469467

470-
// 2) build new `robin` with same multiplier
468+
// 2) build new `robin` (same multiplier; consider setting nrobin.i)
471469
nrobin := &robin{stsdest: make(stsdest, len(orobin.stsdest))}
472470
for k := range nrobin.stsdest {
473-
extra := sb.extra // copy struct by value
474-
if parentOverride != nil {
475-
extra.Parent = parentOverride
476-
}
471+
extra := sb.extra // by value
477472
ns := transport.NewObjStream(sb.client, dstURL, dstID, &extra)
478473
nrobin.stsdest[k] = ns
479474
}
@@ -483,10 +478,10 @@ func (sb *Streams) ReopenPeerStream(dstID string, parentOverride *transport.Pare
483478
}
484479
nbundle[dstID] = nrobin
485480

486-
// 3) set new CoW instance immediately
481+
// 3) switch over
487482
sb.streams.Store(&nbundle)
488483

489-
// 4) stop old streams (may take a few millis)
484+
// 4) stop old streams async
490485
for _, os := range orobin.stsdest {
491486
if !os.IsTerminated() {
492487
os.Stop() // via stopCh

transport/bundle/shared_dm.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ func (sdm *sharedDM) Open() error {
103103
debug.AssertNoErr(err)
104104
return err
105105
}
106+
sdm.dm.parent = &transport.Parent{
107+
TermedCB: sdm.reconnect,
108+
}
106109
sdm.dm.Open()
107110
sdm.ocmu.Unlock()
108111

@@ -112,6 +115,18 @@ func (sdm *sharedDM) Open() error {
112115
return nil
113116
}
114117

118+
func (sdm *sharedDM) reconnect(dstID string, err error) {
119+
if !sdm.isOpen() {
120+
return
121+
}
122+
if e := sdm.dm.data.streams.ReopenPeerStream(dstID); e != nil {
123+
err = fmt.Errorf("%v => %v", err, e)
124+
nlog.Errorf("%s: failed to reconnect to %s: %v", sdm.trname(), dstID, err)
125+
sdm.Close(err)
126+
}
127+
nlog.Warningln(core.T.String(), "reconnect", sdm.trname(), "=>", dstID)
128+
}
129+
115130
func (sdm *sharedDM) housekeep(now int64) time.Duration {
116131
if !sdm.isOpen() {
117132
return hk.UnregInterval
@@ -125,15 +140,20 @@ func (sdm *sharedDM) housekeep(now int64) time.Duration {
125140
}
126141

127142
// nothing running + cmn.SharedStreamsDflt (10m) inactivity
128-
func (sdm *sharedDM) Close() error {
143+
func (sdm *sharedDM) Close(err ...error) error {
129144
sdm.ocmu.Lock()
130145
sdm.rxmu.Lock()
131146

132-
if xid := sdm.getActive(); xid != "" {
133-
sdm.rxmu.Unlock()
134-
sdm.ocmu.Unlock()
135-
debug.Assert(cos.IsValidUUID(xid), xid)
136-
return fmt.Errorf("cannot close %s: xid %s is still active (num: %d)", sdm.trname(), xid, len(sdm.receivers))
147+
xid := sdm.getActive()
148+
if xid != "" {
149+
msg := fmt.Sprintf("xid %s is still active (num: %d)", xid, len(sdm.receivers))
150+
if len(err) == 0 {
151+
sdm.rxmu.Unlock()
152+
sdm.ocmu.Unlock()
153+
debug.Assert(cos.IsValidUUID(xid), xid)
154+
return fmt.Errorf("cannot close %s: %s", sdm.trname(), msg)
155+
}
156+
nlog.Errorln(sdm.trname(), "closing despite", msg, "[", err[0], "]")
137157
}
138158

139159
sdm.dm.Close(nil)

transport/recv.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package transport
77

88
import (
9+
"errors"
910
"fmt"
1011
"io"
1112
"math"
@@ -318,10 +319,10 @@ func eofOK(err error) error {
318319
func (it *iterator) nextProtoHdr(loghdr string) (int, uint64, error) {
319320
n, err := it.Read(it.hbuf[:sizeProtoHdr])
320321
if n < sizeProtoHdr {
321-
if err == nil {
322-
err = fmt.Errorf("sbr3 %s: failed to receive proto hdr (n=%d)", loghdr, n)
322+
if err == nil || errors.Is(err, io.EOF) {
323+
err = io.ErrUnexpectedEOF
323324
}
324-
return 0, 0, err
325+
return 0, 0, fmt.Errorf("sbr3 %s: failed to receive proto hdr (n=%d): %w", loghdr, n, err)
325326
}
326327
// extract and validate hlen
327328
return extProtoHdr(it.hbuf, loghdr)
@@ -349,7 +350,10 @@ func (it *iterator) nextObj(loghdr string, hlen int) (*objReader, error) {
349350
}
350351
}
351352
if n < hlen {
352-
return nil, fmt.Errorf("sbr4 %s: failed to receive obj hdr (%d < %d)", loghdr, n, hlen)
353+
if err == nil || errors.Is(err, io.EOF) {
354+
err = io.ErrUnexpectedEOF
355+
}
356+
return nil, fmt.Errorf("sbr4 %s: failed to receive obj hdr (%d < %d): %w", loghdr, n, hlen, err)
353357
}
354358
}
355359
hdr := ExtObjHeader(it.hbuf, hlen)
@@ -371,23 +375,34 @@ func (obj *objReader) Read(b []byte) (n int, err error) {
371375
return obj.readPDU(b)
372376
}
373377
debug.Assert(obj.Size() >= 0)
378+
374379
rem := obj.Size() - obj.off
375-
if rem < int64(len(b)) {
380+
if rem < int64(len(b)) && rem >= 0 {
376381
b = b[:int(rem)]
377382
}
383+
378384
n, err = obj.body.Read(b)
379385
obj.off += int64(n) // NOTE: `GORACE` complaining here can be safely ignored
386+
380387
switch err {
381388
case nil:
382389
if obj.off >= obj.Size() {
390+
// ok (w/ EOF to the caller)
383391
err = io.EOF
384392
}
385393
case io.EOF:
394+
// premature EOF: expected size not reached
386395
if obj.off != obj.Size() {
387-
err = fmt.Errorf("sbr6 %s: premature eof %d != %s, err %w", obj.loghdr, obj.off, obj, err)
396+
err = fmt.Errorf("sbr6 %s: premature eof %d != %s: %w", obj.loghdr, obj.off, obj, io.ErrUnexpectedEOF)
388397
}
389398
default:
390-
err = fmt.Errorf("sbr7 %s: off %d, obj %s, err %w", obj.loghdr, obj.off, obj, err)
399+
// we haven't consumed the header-specified size
400+
if obj.off < obj.Size() {
401+
err = fmt.Errorf("sbr7 %s: off %d, obj %s, cause: %v: %w", obj.loghdr, obj.off, obj, err, io.ErrUnexpectedEOF)
402+
} else {
403+
// read the full payload; just annotate
404+
err = fmt.Errorf("sbr7 %s: off %d, obj %s, err %w", obj.loghdr, obj.off, obj, err)
405+
}
391406
}
392407
return n, err
393408
}
@@ -433,7 +448,7 @@ func (obj *objReader) readPDU(b []byte) (n int, err error) {
433448
if obj.IsUnsized() {
434449
obj.hdr.ObjAttrs.Size = obj.off
435450
} else if obj.Size() != obj.off {
436-
err = fmt.Errorf("sbr9 %s: off %d != %s", obj.loghdr, obj.off, obj)
451+
err = fmt.Errorf("sbr9 %s: off %d != %s: %w", obj.loghdr, obj.off, obj, io.ErrUnexpectedEOF)
437452
nlog.Warningln(err)
438453
}
439454
} else {

xact/xs/moss.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,7 @@ func (r *XactMoss) RecvObj(hdr *transport.ObjHdr, reader io.Reader, err error) e
698698
nlog.Errorln(r.Name(), core.T.String(), "RecvObj:", err)
699699
r.BcastAbort(err)
700700
r.Abort(err)
701+
return err
701702
}
702703
if hdr.ObjAttrs.Size > 0 {
703704
r.InObjsAdd(1, hdr.ObjAttrs.Size)

0 commit comments

Comments
 (0)