Skip to content

Commit 29c4ed1

Browse files
committed
transport Rx: error upcall on proto errors; Tx: abort xaction _or_ call back
* transport.RecvObj API change: calling now with protocol error - filter clean EOF (opcFin) - provide an empty header with only sender ID and a nil reader * Tx: handle stream error differently: - use TermedCB XOR abort xaction * refactor transport/err.go -------------- * get-batch: - add gfnMaxCount=2, improve logging, prep for per-WID abort ------------- * dsort: - check transport.RecvObj() error Signed-off-by: Alex Aizman <[email protected]>
1 parent 92dbe79 commit 29c4ed1

File tree

21 files changed

+132
-100
lines changed

21 files changed

+132
-100
lines changed

cmd/cli/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/cli
33
go 1.25
44

55
require (
6-
github.com/NVIDIA/aistore v1.4.1-0.20251103150813-c24776bd0c0f
6+
github.com/NVIDIA/aistore v1.4.1-0.20251103154500-92dbe79ff899
77
github.com/fatih/color v1.18.0
88
github.com/json-iterator/go v1.1.12
99
github.com/onsi/ginkgo/v2 v2.26.0

cmd/cli/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
22
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
33
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
4-
github.com/NVIDIA/aistore v1.4.1-0.20251103150813-c24776bd0c0f h1:iOIIlciARfjdnKZ9izmRIGwEoE9oOIYX0ixs7IPeBjk=
5-
github.com/NVIDIA/aistore v1.4.1-0.20251103150813-c24776bd0c0f/go.mod h1:+lmRm0p5uEVUZ5KCGwneVKIEu3LiBV7hzId6Rmc0F+M=
4+
github.com/NVIDIA/aistore v1.4.1-0.20251103154500-92dbe79ff899 h1:5O3UWSTuwiGhaYluU9X8z3vXyj98M2mXTbYhTLEJI6g=
5+
github.com/NVIDIA/aistore v1.4.1-0.20251103154500-92dbe79ff899/go.mod h1:+lmRm0p5uEVUZ5KCGwneVKIEu3LiBV7hzId6Rmc0F+M=
66
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
77
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
88
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=

cmd/ishard/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/ishard
33
go 1.25
44

55
require (
6-
github.com/NVIDIA/aistore v1.4.1-0.20251024202506-aaa8f8953f96
6+
github.com/NVIDIA/aistore v1.4.1-0.20251103154500-92dbe79ff899
77
github.com/json-iterator/go v1.1.12
88
github.com/vbauerster/mpb/v4 v4.12.2
99
)

cmd/ishard/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
22
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
3-
github.com/NVIDIA/aistore v1.4.1-0.20251024202506-aaa8f8953f96 h1:glOli5bvdW2DUYQw85o+Vzm15pS5CbkzRjNFS2ZW8bA=
4-
github.com/NVIDIA/aistore v1.4.1-0.20251024202506-aaa8f8953f96/go.mod h1:+lmRm0p5uEVUZ5KCGwneVKIEu3LiBV7hzId6Rmc0F+M=
3+
github.com/NVIDIA/aistore v1.4.1-0.20251103154500-92dbe79ff899 h1:5O3UWSTuwiGhaYluU9X8z3vXyj98M2mXTbYhTLEJI6g=
4+
github.com/NVIDIA/aistore v1.4.1-0.20251103154500-92dbe79ff899/go.mod h1:+lmRm0p5uEVUZ5KCGwneVKIEu3LiBV7hzId6Rmc0F+M=
55
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
66
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
77
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=

ext/dsort/dsort_general.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -408,16 +408,17 @@ func (ds *dsorterGeneral) recvReq(hdr *transport.ObjHdr, objReader io.Reader, er
408408
transport.FreeRecv(objReader)
409409
}()
410410
req := remoteRequest{}
411-
if err := jsoniter.Unmarshal(hdr.Opaque, &req); err != nil {
412-
err := fmt.Errorf(cmn.FmtErrUnmarshal, apc.ActDsort, "recv request", cos.BHead(hdr.Opaque), err)
411+
if errM := jsoniter.Unmarshal(hdr.Opaque, &req); errM != nil {
412+
if err == nil {
413+
err = fmt.Errorf(cmn.FmtErrUnmarshal, apc.ActDsort, "recv request", cos.BHead(hdr.Opaque), errM)
414+
}
413415
ds.m.abort(err)
414416
return err
415417
}
416418

417419
fromNode := ds.m.smap.GetTarget(hdr.SID)
418420
if fromNode == nil {
419-
err := fmt.Errorf("received request (%v) from %q not present in the %s", req.Record, hdr.SID, ds.m.smap)
420-
return err
421+
return fmt.Errorf("received request (%v) from %q not present in the %s", req.Record, hdr.SID, ds.m.smap)
421422
}
422423

423424
if err != nil {

transport/api.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
// Package transport provides long-lived http/tcp connections for
2-
// intra-cluster communications (see README for details and usage example).
1+
// Package transport provides long-lived http/tcp connections for intra-cluster communications
32
/*
43
* Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
54
*/

transport/base.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
// Package transport provides long-lived http/tcp connections for
2-
// intra-cluster communications (see README for details and usage example).
1+
// Package transport provides long-lived http/tcp connections for intra-cluster communications
32
/*
43
* Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
54
*/
@@ -183,14 +182,14 @@ func (s *streamBase) startSend(streamable fmt.Stringer) (err error) {
183182
return
184183
}
185184

186-
func (s *streamBase) newErr(detail string) error {
185+
func (s *streamBase) newErr(ctx string) error {
187186
reason, errT := s.TermInfo()
188187
return &ErrStreamTerm{
189188
err: errT,
190189
dst: s.dstID,
191190
loghdr: s.loghdr,
192191
reason: reason,
193-
detail: detail,
192+
ctx: ctx,
194193
}
195194
}
196195

@@ -306,16 +305,17 @@ func (s *streamBase) sendLoop(config *cmn.Config, dryrun bool) {
306305
return
307306
}
308307

309-
// termination is caused by anything other than Fin()
310-
// (reasonStopped is, effectively, abort via Stop() - totally legit)
311-
var errExt error
312-
if reason != reasonStopped {
313-
errExt = fmt.Errorf("%s[term-reason: %s, err: %w]", s, reason, err)
314-
nlog.Errorln(errExt)
308+
// termination is caused by anything other than Fin() ---------------
309+
// 1) reasonStopped via Stop(), or
310+
// 2) broken pipe, connection reset, etc.
311+
// steps:
312+
// - abort parent xaction if defined AND the parent's TermedCB is nil
313+
// - wait and complete
314+
// - call parent's TermedCB if defined ---------- (notice "either/OR")
315315

316-
// NOTE: aborting grandparent xaction
317-
if s.parent != nil && s.parent.Xact != nil {
318-
s.parent.Xact.Abort(errExt)
316+
if reason != reasonStopped {
317+
if s.parent != nil && s.parent.Xact != nil && s.parent.TermedCB == nil {
318+
s.parent.Xact.Abort(s.newErr(""))
319319
}
320320
}
321321

@@ -325,10 +325,9 @@ func (s *streamBase) sendLoop(config *cmn.Config, dryrun bool) {
325325
// cleanup
326326
s.streamer.abortPending(err, false /*completions*/)
327327

328-
// notify parent if defined
329328
if reason != reasonStopped {
330329
if s.parent != nil && s.parent.TermedCB != nil {
331-
s.parent.TermedCB(s.dstID, errExt)
330+
s.parent.TermedCB(s.dstID, err)
332331
}
333332
}
334333

transport/client_fasthttp.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//go:build !nethttp
22

3-
// Package transport provides long-lived http/tcp connections for
4-
// intra-cluster communications (see README for details and usage example).
3+
// Package transport provides long-lived http/tcp connections for intra-cluster communications
54
/*
65
* Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
76
*/

transport/client_nethttp.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//go:build nethttp
22

3-
// Package transport provides long-lived http/tcp connections for
4-
// intra-cluster communications (see README for details and usage example).
3+
// Package transport provides long-lived http/tcp connections for intra-cluster communications
54
/*
65
* Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
76
*/

transport/collect.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
// Package transport provides long-lived http/tcp connections for
2-
// intra-cluster communications (see README for details and usage example).
1+
// Package transport provides long-lived http/tcp connections for intra-cluster communications
32
/*
43
* Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
54
*/

0 commit comments

Comments
 (0)