Skip to content

Commit 977335e

Browse files
committed
shared data mover: non-primary activation; stale xaction cleanup
* non-primary get-batch endpoint: signal SDM active before phase 2 - via fast-kalive * streams-toggle: - use MSB in streamsToggle.last to track non-primary state - nonpSetActive/nonpResetActive/nonpUndo for state transitions * shared data mover: * track per-XID last-called timestamp * isActive() with respect to age * Close() now automatically cleans up stale entries Signed-off-by: Alex Aizman <[email protected]>
1 parent c01851a commit 977335e

File tree

13 files changed

+134
-90
lines changed

13 files changed

+134
-90
lines changed

ais/kalive.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (tkr *talive) cluUptime(now int64) (elapsed time.Duration) {
131131
return
132132
}
133133

134-
func (tkr *talive) sendKalive(smap *smapX, timeout time.Duration, _ int64, fast bool) (pid string, status int, err error) {
134+
func (tkr *talive) sendKalive(smap *smapX, timeout time.Duration, now int64, fast bool) (pid string, status int, err error) {
135135
t := tkr.t
136136
if fast {
137137
// additionally
@@ -140,7 +140,7 @@ func (tkr *talive) sendKalive(smap *smapX, timeout time.Duration, _ int64, fast
140140
}
141141
if fast {
142142
debug.Assert(ec.ECM != nil)
143-
pid, _, err = t.fastKalive(smap, timeout, ec.ECM.IsActive(), bundle.SDM.IsActive())
143+
pid, _, err = t.fastKalive(smap, timeout, ec.ECM.IsActive(), bundle.SDM.IsActive(now))
144144
return pid, 0, err
145145
}
146146
return t.slowKalive(smap, tkr.t, timeout)
@@ -198,7 +198,8 @@ func (pkr *palive) sendKalive(smap *smapX, timeout time.Duration, now int64, fas
198198
debug.Assert(!smap.isPrimary(pkr.p.si))
199199

200200
if fast {
201-
pid, hdr, err := pkr.p.fastKalive(smap, timeout, false, false /*shared streams*/)
201+
last, dmActive := pkr.p.dm.nonpResetActive()
202+
pid, hdr, err := pkr.p.fastKalive(smap, timeout, false, dmActive /*shared streams*/)
202203
if err == nil {
203204
// (shared streams; EC streams)
204205
if pkr.p.ec.isActive(hdr) {
@@ -207,6 +208,8 @@ func (pkr *palive) sendKalive(smap *smapX, timeout time.Duration, now int64, fas
207208
if pkr.p.dm.isActive(hdr) {
208209
pkr.p.dm.setActive(now)
209210
}
211+
} else if dmActive {
212+
pkr.p.dm.nonpUndo(last) // (unlikely)
210213
}
211214
return pid, 0, err
212215
}

ais/ml.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/NVIDIA/aistore/cmn/archive"
1818
"github.com/NVIDIA/aistore/cmn/cos"
1919
"github.com/NVIDIA/aistore/cmn/debug"
20+
"github.com/NVIDIA/aistore/cmn/mono"
2021
"github.com/NVIDIA/aistore/cmn/nlog"
2122
"github.com/NVIDIA/aistore/core/meta"
2223
"github.com/NVIDIA/aistore/transport/bundle"
@@ -168,6 +169,12 @@ func (p *proxy) httpmlget(w http.ResponseWriter, r *http.Request) {
168169
if cmn.Rom.V(5, cos.ModAIS) {
169170
nlog.Infoln(p.String(), apc.Moss, "DT", tsi.String(), "xid", xid, "wid", wid, "[", hreq.Path, hreq.Method, "]")
170171
}
172+
173+
// toggle SDM (fast-kalive => primary)
174+
if !smap.isPrimary(p.si) {
175+
p.dm.nonpSetActive(mono.NanoTime())
176+
}
177+
171178
// phase 2: async broadcast -> all except DT
172179
if nat > 1 {
173180
args := allocBcArgs()
@@ -385,7 +392,6 @@ func (ctx *mossCtx) phase2(w http.ResponseWriter, r *http.Request, smap *smapX,
385392
xmoss.BcastAbort(err)
386393
xmoss.Abort(err)
387394
t.writeErr(w, r, err)
388-
return
389395
}
390396
}
391397

ais/streams_toggle.go

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/NVIDIA/aistore/api/apc"
1313
"github.com/NVIDIA/aistore/cmn"
1414
"github.com/NVIDIA/aistore/cmn/atomic"
15+
"github.com/NVIDIA/aistore/cmn/debug"
1516
"github.com/NVIDIA/aistore/cmn/mono"
1617
"github.com/NVIDIA/aistore/cmn/nlog"
1718
"github.com/NVIDIA/aistore/core/meta"
@@ -21,17 +22,18 @@ import (
2122
// auto-deactivates after a timeout;
2223
// negative timeout (see `tout` below) => never deactivate
2324

24-
// [TODO] shared-DM:
25-
// - apc.HdrActiveDM
26-
// - dmToggle.on()
27-
// - extra checks on then target side (unregIf, etc.)
25+
// non-primary action
26+
const (
27+
nonpBit = uint64(1) << 63
28+
nonpMask = ^nonpBit
29+
)
2830

2931
type (
3032
streamsToggle struct {
31-
hdrActive string // apc.HdrActiveEC, ...
32-
actOn string // apc.ActOpenEC, ...
33-
actOff string // apc.ActCloseEC, ...
34-
last atomic.Int64 // mono-time of last positive refresh
33+
hdrActive string // apc.HdrActiveEC, ...
34+
actOn string // apc.ActOpenEC, ...
35+
actOff string // apc.ActCloseEC, ...
36+
last atomic.Uint64 // mono-time of last positive refresh
3537
}
3638

3739
ecToggle struct {
@@ -56,10 +58,34 @@ func (f *dmToggle) init() {
5658
f.actOff = apc.ActCloseSDM
5759
}
5860

59-
func (*dmToggle) timeout() time.Duration { return cmn.SharedStreamsDflt }
61+
func (*dmToggle) timeout() time.Duration { return cmn.SharedStreamsDflt } // see also: sharedDM.getActive
6062

6163
func (f *streamsToggle) isActive(h http.Header) bool { _, ok := h[f.hdrActive]; return ok }
62-
func (f *streamsToggle) setActive(now int64) { f.last.Store(now) }
64+
65+
func (f *streamsToggle) setActive(now int64) {
66+
f.last.Store(uint64(now))
67+
}
68+
69+
// non-primary: executed user call requiring SDM
70+
func (f *streamsToggle) nonpSetActive(now int64) {
71+
debug.Assert(now > 0, "invalid timestamp: ", now)
72+
f.last.Store((uint64(now) & nonpMask) | nonpBit)
73+
}
74+
75+
// non-primary: fast-keepalive => primary
76+
func (f *streamsToggle) nonpResetActive() (uint64, bool) {
77+
last := f.last.Load()
78+
if last&nonpBit == 0 {
79+
return last, false
80+
}
81+
last = f.last.Swap(0)
82+
return last, true
83+
}
84+
85+
// non-primary: fast-keepalive => primary
86+
func (f *streamsToggle) nonpUndo(last uint64) {
87+
_ = f.last.CAS(0, last)
88+
}
6389

6490
// target => primary keep-alive
6591
func (f *streamsToggle) recvKalive(p *proxy, hdr http.Header, now int64, tout time.Duration) {
@@ -70,7 +96,7 @@ func (f *streamsToggle) recvKalive(p *proxy, hdr http.Header, now int64, tout ti
7096
if tout < 0 {
7197
return
7298
}
73-
last := f.last.Load()
99+
last := int64(f.last.Load() & nonpMask)
74100
if last == 0 || time.Duration(now-last) < tout {
75101
return
76102
}
@@ -80,7 +106,10 @@ func (f *streamsToggle) recvKalive(p *proxy, hdr http.Header, now int64, tout ti
80106
// primary => target keep-alive
81107
func (f *streamsToggle) respKalive(hdr http.Header, now int64, tout time.Duration) {
82108
if tout > 0 {
83-
if last := f.last.Load(); last != 0 && time.Duration(now-last) < tout {
109+
v := f.last.Load()
110+
last := int64(v & nonpMask)
111+
debug.Assert(int64(v) >= 0, f.hdrActive, " invalid timestamp: ", v)
112+
if last != 0 && time.Duration(now-last) < tout {
84113
hdr.Set(f.hdrActive, "true")
85114
}
86115
}
@@ -96,7 +125,7 @@ func (f *streamsToggle) on(p *proxy, tout time.Duration) error {
96125
}
97126
var (
98127
now = mono.NanoTime()
99-
last = f.last.Load()
128+
last = int64(f.last.Load() & nonpMask)
100129
)
101130
if last != 0 && time.Duration(now-last) < cmn.SharedStreamsNack {
102131
return nil
@@ -109,7 +138,8 @@ func (f *streamsToggle) on(p *proxy, tout time.Duration) error {
109138
}
110139

111140
func (f *streamsToggle) off(p *proxy, last int64) {
112-
if !f.last.CAS(last, 0) {
141+
debug.Assert(last > 0, f.hdrActive, " invalid timestamp: ", last)
142+
if !f.last.CAS(uint64(last), 0) {
113143
return
114144
}
115145

ais/tgtec.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/NVIDIA/aistore/cmn"
1616
"github.com/NVIDIA/aistore/cmn/cos"
1717
"github.com/NVIDIA/aistore/cmn/debug"
18+
"github.com/NVIDIA/aistore/cmn/mono"
1819
"github.com/NVIDIA/aistore/cmn/nlog"
1920
"github.com/NVIDIA/aistore/core"
2021
"github.com/NVIDIA/aistore/core/meta"
@@ -161,7 +162,7 @@ func (t *target) httpecpost(w http.ResponseWriter, r *http.Request) {
161162
if !t.ensureIntraControl(w, r, true /* from primary */) {
162163
return
163164
}
164-
if bundle.SDM.IsActive() {
165+
if bundle.SDM.IsActive(mono.NanoTime()) {
165166
t.writeErr(w, r, _errOff(bundle.SDMName))
166167
return
167168
}
@@ -184,10 +185,10 @@ func closeEc(int64) time.Duration {
184185
return hk.UnregInterval
185186
}
186187

187-
func closeSDM(int64) time.Duration {
188-
if bundle.SDM.IsActive() {
188+
func closeSDM(now int64) time.Duration {
189+
if bundle.SDM.IsActive(now) {
189190
nlog.Warningln("hk-cb:", _errOff(bundle.SDMName))
190-
} else if err := bundle.SDM.Close(); err != nil {
191+
} else if err := bundle.SDM.Close(now); err != nil {
191192
nlog.Errorln(err)
192193
}
193194
return hk.UnregInterval

bench/tools/aisloader/params.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type (
4646
duration DurationExt // benchmark duration
4747
putSizeUpperBoundStr string // total PUT size limit (unparsed)
4848
tmpDir string // temp directory for file-based readers
49-
readerType string // reader type: sg(default), file, rand, tar
49+
readerType string // reader type: sg(default), file, rand, tar (PUT only)
5050
putSizeUpperBound int64 // total PUT size limit (bytes)
5151
seed int64 // random seed for reproducibility
5252
maxputs uint64 // max number of PUT operations

bench/tools/aisloader/print.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,15 @@ func printRunParams(p *params) {
403403
}
404404
}
405405

406+
// omit PUT-only
407+
var (
408+
minsize, maxsize int64
409+
readerType string
410+
)
411+
if p.putPct > 0 {
412+
minsize, maxsize = p.minSize, p.maxSize
413+
readerType = p.readerType
414+
}
406415
b, err := jsoniter.MarshalIndent(ppEssential{
407416
URL: p.proxyURL,
408417
Bucket: p.bck.Cname(""),
@@ -413,12 +422,12 @@ func printRunParams(p *params) {
413422
UpdatePct: p.updateExistingPct,
414423
MultipartPct: p.multipartPct,
415424
GetBatchSize: p.getBatchSize,
416-
MinSize: p.minSize,
417-
MaxSize: p.maxSize,
425+
MinSize: minsize,
426+
MaxSize: maxsize,
418427
MaxPutBytes: p.putSizeUpperBound,
419428
Arch: arch,
420429
NameGetter: ngLabel(),
421-
ReaderType: p.readerType,
430+
ReaderType: readerType,
422431
Cleanup: p.cleanUp.Val,
423432
}, "", " ")
424433

bench/tools/aisloader/run.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ func Start(version, buildtime string) (err error) {
200200
runParams.duration.Val = time.Duration(math.MaxInt64)
201201
}
202202

203-
if runParams.readerType == readers.File {
203+
if runParams.readerType == readers.File && runParams.putPct > 0 {
204204
if err := cos.CreateDir(runParams.tmpDir + "/" + myName); err != nil {
205205
return fmt.Errorf("failed to create local test directory %q, err = %s", runParams.tmpDir, err.Error())
206206
}
@@ -707,7 +707,7 @@ func cleanupObjs(objs []string, wg *sync.WaitGroup) {
707707
}
708708
}
709709

710-
if runParams.readerType == readers.File {
710+
if runParams.readerType == readers.File && runParams.putPct > 0 {
711711
for _, obj := range objs {
712712
if err := os.Remove(runParams.tmpDir + "/" + obj); err != nil {
713713
fmt.Println("delete local file err:", err)

core/lom_xattr.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ const (
8787
DumpLomEnvVar = "AIS_DUMP_LOM"
8888
)
8989

90-
const lomDirtyMask = uint64(1 << 63)
90+
const lomDirtyMask = uint64(1) << 63
9191

9292
const (
9393
badLmeta = "bad lmeta"

core/lombid.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
// [63]=AisBID | [62:60]=legacy flags (3 bits) | [59:0]=serial
1616

1717
const (
18-
AisBID = uint64(1 << 63)
18+
AisBID = uint64(1) << 63
1919

2020
bitshift = 60
2121
flagMaskV1 = math.MaxUint64 >> bitshift // 0xf

core/meta/bck.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (b *Bck) String() string {
121121
// add BID
122122
// for the mask to clear "ais" bit and/or other high bits reserved for LOM flags, see core/lombid
123123
const (
124-
aisBID = uint64(1 << 63)
124+
aisBID = uint64(1) << 63
125125
)
126126
var sb strings.Builder
127127
sb.Grow(64)

0 commit comments

Comments
 (0)