Skip to content

Commit db78219

Browse files
committed
transport: remove per-connection Tx/Rx stats; proto errors vs eof
* transport pkg: - remove Stats, RxStats, and GetStats() - simplify - collapse handler interface into single struct - split IsEOF() into IsOkEOF/IsAnyEOF for more precise semantics - distinguish benign shutdown (n=0 + EOF) from protocol errors - add unified _loghdr() helper (both Tx and Rx) - amend x-moss Rx draining - prep for protocol error callbacks (ref 020943, disabled for now) Signed-off-by: Alex Aizman <[email protected]>
1 parent 007a783 commit db78219

File tree

19 files changed

+112
-394
lines changed

19 files changed

+112
-394
lines changed

ais/prxclu.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2096,7 +2096,7 @@ func (p *proxy) rmNodeFinal(msg *apc.ActMsg, si *meta.Snode, ctx *smapModifier)
20962096
emsg := fmt.Sprintf("%s: (%s %s) final: %v - proceeding anyway...", p, msg, sname, err)
20972097
switch msg.Action {
20982098
case apc.ActShutdownNode, apc.ActDecommissionNode: // expecting EOF
2099-
if !cos.IsEOF(err) {
2099+
if !cos.IsAnyEOF(err) {
21002100
nlog.Errorln(emsg)
21012101
}
21022102
case apc.ActRmNodeUnsafe:

api/cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ func DecommissionCluster(bp BaseParams, rmUserData bool) error {
543543
}
544544
err := reqParams.DoRequest()
545545
FreeRp(reqParams)
546-
if err != nil && cos.IsEOF(err) {
546+
if err != nil && cos.IsAnyEOF(err) {
547547
err = nil
548548
}
549549
return err

cmn/cos/err.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func IsUnreachable(err error, status int) bool {
263263
errors.Is(err, context.DeadlineExceeded) ||
264264
status == http.StatusRequestTimeout ||
265265
status == http.StatusServiceUnavailable ||
266-
IsEOF(err) ||
266+
IsAnyEOF(err) ||
267267
status == http.StatusBadGateway
268268
}
269269

cmn/cos/ioutils.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,16 @@ func ReadAll(r io.Reader) ([]byte, error) {
5050

5151
// including "unexpecting EOF" to accommodate unsized streaming and
5252
// early termination of the other side (prior to sending the first byte)
53-
func IsEOF(err error) bool {
53+
func IsAnyEOF(err error) bool {
5454
return err == io.EOF || err == io.ErrUnexpectedEOF ||
5555
errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF)
5656
}
5757

58+
// graceful
59+
func IsOkEOF(err error) bool {
60+
return err == io.EOF || errors.Is(err, io.EOF)
61+
}
62+
5863
// ExpandPath replaces common abbreviations in file path (eg. `~` with absolute
5964
// path to the current user home directory) and cleans the path.
6065
func ExpandPath(path string) string {

ec/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func (mgr *Manager) recvRequest(hdr *transport.ObjHdr, objReader io.Reader, err
193193
// the body must be drained to avoid errors
194194
if hdr.ObjAttrs.Size != 0 {
195195
n, err := io.Copy(io.Discard, objReader)
196-
if err != nil && !cos.IsEOF(err) {
196+
if err != nil && !cos.IsOkEOF(err) {
197197
nlog.Errorf("failed to read request body: %v", err)
198198
return err
199199
}

transport/api.go

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"github.com/NVIDIA/aistore/cmn/cos"
1818
"github.com/NVIDIA/aistore/cmn/debug"
1919
"github.com/NVIDIA/aistore/core"
20-
"github.com/NVIDIA/aistore/hk"
2120
"github.com/NVIDIA/aistore/memsys"
2221
)
2322

@@ -115,12 +114,6 @@ type (
115114
RecvObj func(hdr *ObjHdr, objReader io.Reader, err error) error
116115
)
117116

118-
// receive-side session stats indexed by session ID (see recv.go for "uid")
119-
// optional, currently tests only
120-
type (
121-
RxStats map[uint64]*Stats
122-
)
123-
124117
// shared data mover (SDM)
125118
type (
126119
Receiver interface {
@@ -204,16 +197,8 @@ func (s *Stream) Fin() {
204197
// receive-side API //
205198
//////////////////////
206199

207-
func Handle(trname string, rxObj RecvObj, withStats ...bool) error {
208-
var h handler
209-
if len(withStats) > 0 && withStats[0] {
210-
hkName := ObjURLPath(trname)
211-
hex := &hdlExtra{hdl: hdl{trname: trname, rxObj: rxObj}, hkName: hkName}
212-
hk.Reg(hkName+hk.NameSuffix, hex.cleanup, sessionIsOld)
213-
h = hex
214-
} else {
215-
h = &hdl{trname: trname, rxObj: rxObj}
216-
}
200+
func Handle(trname string, rxObj RecvObj) error {
201+
h := &handler{trname: trname, rxObj: rxObj}
217202
return oput(trname, h)
218203
}
219204

@@ -231,17 +216,3 @@ func _urlPath(endp, trname string) string {
231216
}
232217
return cos.JoinW0(apc.Version, endp, trname)
233218
}
234-
235-
func GetRxStats() (netstats map[string]RxStats) {
236-
netstats = make(map[string]RxStats)
237-
for i, hmap := range hmaps {
238-
hmtxs[i].Lock()
239-
for trname, h := range hmap {
240-
if s := h.getStats(); s != nil {
241-
netstats[trname] = s
242-
}
243-
}
244-
hmtxs[i].Unlock()
245-
}
246-
return
247-
}

transport/base.go

Lines changed: 29 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/NVIDIA/aistore/cmn/cos"
2727
"github.com/NVIDIA/aistore/cmn/debug"
2828
"github.com/NVIDIA/aistore/cmn/nlog"
29-
"github.com/NVIDIA/aistore/core"
3029
)
3130

3231
// stream TCP/HTTP session: inactive <=> active transitions
@@ -88,9 +87,9 @@ type (
8887
reason string
8988
mu sync.Mutex
9089
done atomic.Bool
90+
yelp atomic.Bool
9191
}
92-
stats Stats // stream stats (send side - compare with rxStats)
93-
time struct {
92+
time struct {
9493
idleTeardown time.Duration // idle timeout
9594
inSend atomic.Bool // true upon Send() or Read() - info for Collector to delay cleanup
9695
ticks int // num 1s ticks until idle timeout
@@ -138,6 +137,7 @@ func newBase(client Client, dstURL, dstID string, extra *Extra) (s *streamBase)
138137
buf, _ := g.mm.AllocSize(int64(extra.SizePDU))
139138
s.pdu = newSendPDU(buf)
140139
}
140+
// idle time
141141
if extra.IdleTeardown > 0 {
142142
s.time.idleTeardown = extra.IdleTeardown
143143
} else {
@@ -146,35 +146,14 @@ func newBase(client Client, dstURL, dstID string, extra *Extra) (s *streamBase)
146146
debug.Assert(s.time.idleTeardown >= dfltTick, s.time.idleTeardown, " vs ", dfltTick)
147147
s.time.ticks = int(s.time.idleTeardown / dfltTick)
148148

149-
s._loghdr(sid, dstID, extra)
150-
149+
s.loghdr = _loghdr(s.trname, sid, dstID, true, extra.Compressed())
151150
s.maxhdr, _ = g.mm.AllocSize(_sizeHdr(extra.Config, int64(extra.MaxHdrSize)))
152151

153-
s.sessST.Store(inactive) // initiate HTTP session upon the first arrival
152+
// fsm: initiate HTTP session upon the first arrival
153+
s.sessST.Store(inactive)
154154
return s
155155
}
156156

157-
func (s *streamBase) _loghdr(sid, dstID string, extra *Extra) {
158-
var (
159-
sb strings.Builder
160-
l = 2 + len(s.trname) + len(sid) + 32 + len(dstID)
161-
)
162-
sb.Grow(l)
163-
164-
sb.WriteString("s-")
165-
sb.WriteString(s.trname)
166-
sb.WriteString(sid)
167-
sb.WriteByte('[')
168-
sb.WriteString(core.T.SID()) // (consider adding back session ID, as in: [node-ID/110])
169-
170-
extra.Lid(&sb) // + compressed
171-
172-
sb.WriteString("]=>")
173-
sb.WriteString(dstID)
174-
175-
s.loghdr = sb.String() // looks like: s-<trname><sid>[<sender-id>]=><dest-id>
176-
}
177-
178157
// (used on the receive side as well)
179158
func _sizeHdr(config *cmn.Config, size int64) int64 {
180159
if size != 0 {
@@ -193,7 +172,9 @@ func (s *streamBase) startSend(streamable fmt.Stringer) (err error) {
193172
// slow path
194173
reason, errT := s.TermInfo()
195174
err = cmn.NewErrStreamTerminated(s.String(), errT, reason, "dropping "+streamable.String())
196-
nlog.Errorln(err)
175+
if s.term.yelp.CAS(false, true) { // only once
176+
nlog.Errorln(err)
177+
}
197178
return
198179
}
199180

@@ -230,15 +211,6 @@ func (s *streamBase) TermInfo() (reason string, err error) {
230211
return
231212
}
232213

233-
func (s *streamBase) GetStats() (stats Stats) {
234-
// byte-num transfer stats
235-
stats.Num.Store(s.stats.Num.Load())
236-
stats.Offset.Store(s.stats.Offset.Load())
237-
stats.Size.Store(s.stats.Size.Load())
238-
stats.CompressedSize.Store(s.stats.CompressedSize.Load())
239-
return
240-
}
241-
242214
func (s *streamBase) isNextReq() (reason string) {
243215
for {
244216
select {
@@ -267,7 +239,7 @@ func (s *streamBase) isNextReq() (reason string) {
267239
func (s *streamBase) deactivate() (n int, err error) {
268240
err = io.EOF
269241
if cmn.Rom.V(5, cos.ModTransport) {
270-
nlog.Infoln(s.String(), "connection teardown: [", s.numCur, s.stats.Num.Load(), "]")
242+
nlog.Infoln(s.String(), "connection teardown: [", s.numCur, "]")
271243
}
272244
return
273245
}
@@ -384,14 +356,6 @@ func (extra *Extra) Compressed() bool {
384356
return extra.Compression != "" && extra.Compression != apc.CompressNever
385357
}
386358

387-
func (extra *Extra) Lid(sb *strings.Builder) {
388-
if extra.Compressed() {
389-
sb.WriteByte('[')
390-
sb.WriteString(extra.Config.Transport.LZ4BlockMaxSize.String())
391-
sb.WriteByte(']')
392-
}
393-
}
394-
395359
//
396360
// misc
397361
//
@@ -406,6 +370,25 @@ func dryrun() (dryrun bool) {
406370
return
407371
}
408372

373+
func _loghdr(trname, from, to string, transmit, compressed bool) string {
374+
var (
375+
sb strings.Builder
376+
l = len(trname) + len(from) + len(to) + 16
377+
)
378+
sb.Grow(l)
379+
380+
sb.WriteString(trname)
381+
sb.WriteByte('[')
382+
if compressed {
383+
sb.WriteString("(z)")
384+
}
385+
sb.WriteString(from)
386+
sb.WriteString(cos.Ternary(transmit, "=>", "<="))
387+
sb.WriteString(to)
388+
sb.WriteByte(']')
389+
return sb.String()
390+
}
391+
409392
//////////
410393
// rtry //
411394
//////////

transport/bundle/bundle.go

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ type (
5151
multiplier int // optionally: multiple streams per destination (round-robin)
5252
manualResync bool
5353
}
54-
Stats map[string]*transport.Stats // by DaemonID
5554

5655
Args struct {
5756
Extra *transport.Extra // additional parameters
@@ -103,7 +102,7 @@ func New(cl transport.Client, args Args) (sb *Streams) {
103102
sb.Resync()
104103
sb.smaplock.Unlock()
105104

106-
sb._lid()
105+
sb.lid = sb._lid()
107106
nlog.Infoln("open", sb.lid)
108107

109108
// for auto-resync, register this stream-bundle as Smap listener
@@ -114,10 +113,14 @@ func New(cl transport.Client, args Args) (sb *Streams) {
114113
return sb
115114
}
116115

117-
func (sb *Streams) _lid() {
116+
func (sb *Streams) String() string { return sb.lid }
117+
118+
// (compare w/ transport._loghdr)
119+
func (sb *Streams) _lid() string {
118120
var s strings.Builder
121+
s.Grow(20 + len(sb.trname))
119122

120-
s.WriteString("sb-[")
123+
s.WriteString(cos.Ternary(sb.extra.Compressed(), "sb(z)-[", "sb-["))
121124
s.WriteString(core.T.SID())
122125
if sb.network != cmn.NetIntraData {
123126
s.WriteByte('-')
@@ -128,11 +131,9 @@ func (sb *Streams) _lid() {
128131
s.WriteByte('-')
129132
s.WriteString(sb.trname)
130133

131-
sb.extra.Lid(&s)
132-
133134
s.WriteByte(']')
134135

135-
sb.lid = s.String() // approx. "sb[%s-%s-%s...]"
136+
return s.String()
136137
}
137138

138139
// Close closes all contained streams and unregisters the bundle from Smap listeners;
@@ -232,7 +233,6 @@ func _doCmpl(obj *transport.Obj, roc cos.ReadOpenCloser, err error) {
232233
}
233234
}
234235

235-
func (sb *Streams) String() string { return sb.lid }
236236
func (sb *Streams) Smap() *meta.Smap { return sb.smap }
237237

238238
// keep streams to => (clustered nodes as per rxNodeType) in sync at all times
@@ -247,17 +247,6 @@ func (sb *Streams) ListenSmapChanged() {
247247
sb.smaplock.Unlock()
248248
}
249249

250-
func (sb *Streams) GetStats() Stats {
251-
streams := sb.get()
252-
stats := make(Stats, len(streams))
253-
for id, robin := range streams {
254-
s := robin.stsdest[0]
255-
tstat := s.GetStats()
256-
stats[id] = &tstat
257-
}
258-
return stats
259-
}
260-
261250
//
262251
// private methods
263252
//

transport/handlers.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const (
2323
numOld = 32
2424
)
2525

26-
type hmap map[string]handler
26+
type hmap map[string]*handler
2727

2828
type (
2929
errTrname struct {
@@ -52,7 +52,7 @@ func _idx(trname string) byte {
5252
return byte(hash & mskHmaps)
5353
}
5454

55-
func oget(trname string) (h handler, err error) {
55+
func oget(trname string) (h *handler, err error) {
5656
i := _idx(trname)
5757
hmtxs[i].Lock()
5858
hmap := hmaps[i]
@@ -77,7 +77,7 @@ func _lookup(trname string) error {
7777
return &errUnknownTrname{errTrname{trname}}
7878
}
7979

80-
func oput(trname string, h handler) (err error) {
80+
func oput(trname string, h *handler) (err error) {
8181
i := _idx(trname)
8282
hmtxs[i].Lock()
8383
hmap := hmaps[i]
@@ -95,7 +95,7 @@ func odel(trname string) (err error) {
9595
i := _idx(trname)
9696
hmtxs[i].Lock()
9797
hmap := hmaps[i]
98-
h, ok := hmap[trname]
98+
_, ok := hmap[trname]
9999
if !ok {
100100
hmtxs[i].Unlock()
101101
return &errAlreadyRemovedTrname{errTrname{trname}}
@@ -104,8 +104,6 @@ func odel(trname string) (err error) {
104104
delete(hmap, trname)
105105
hmtxs[i].Unlock()
106106

107-
h.unreg()
108-
109107
oldMtx.Lock()
110108
old[oldIdx] = trname
111109
oldIdx++

0 commit comments

Comments
 (0)