Skip to content

Commit 1749312

Browse files
committed
get-batch: don't abort; rename internal intra-cluster headers
* get-batch: do not abort (multi-wid) job upon failing to: - assemble - send * refactor: rename caller-id => sender-id etc. headers and variables Signed-off-by: Alex Aizman <[email protected]>
1 parent bb78144 commit 1749312

File tree

19 files changed

+203
-205
lines changed

19 files changed

+203
-205
lines changed

ais/bucketmeta.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func (m *bucketMD) clone() *bucketMD {
201201
return dst
202202
}
203203

204-
func (m *bucketMD) validateUUID(nbmd *bucketMD, si, nsi *meta.Snode, caller string) (err error) {
204+
func (m *bucketMD) validateUUID(nbmd *bucketMD, si, nsi *meta.Snode, sender string) (err error) {
205205
if nbmd == nil || nbmd.Version == 0 || m.Version == 0 {
206206
return
207207
}
@@ -211,7 +211,7 @@ func (m *bucketMD) validateUUID(nbmd *bucketMD, si, nsi *meta.Snode, caller stri
211211
if m.UUID == nbmd.UUID {
212212
return
213213
}
214-
nsiname := caller
214+
nsiname := sender
215215
if nsi != nil {
216216
nsiname = nsi.StringEx()
217217
} else if nsiname == "" {

ais/clustermap.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ func (m *smapX) merge(dst *smapX, override bool) (added int, err error) {
404404
}
405405

406406
// detect duplicate URL and/or IP
407-
// if `del` is true delete the old one so that the caller can update Snode
407+
// if `del` is true delete the old one so that the sender can update Snode
408408
func (m *smapX) handleDuplicateNode(nsi *meta.Snode, del bool) (err error) {
409409
var osi *meta.Snode
410410
if osi, err = m.IsDupNet(nsi); err == nil {
@@ -425,7 +425,7 @@ func (m *smapX) handleDuplicateNode(nsi *meta.Snode, del bool) (err error) {
425425
return
426426
}
427427

428-
func (m *smapX) validateUUID(si *meta.Snode, newSmap *smapX, caller string, cieNum int) (err error) {
428+
func (m *smapX) validateUUID(si *meta.Snode, newSmap *smapX, sender string, cieNum int) (err error) {
429429
if m == nil || newSmap == nil || newSmap.Version == 0 {
430430
return
431431
}
@@ -436,11 +436,11 @@ func (m *smapX) validateUUID(si *meta.Snode, newSmap *smapX, caller string, cieN
436436
return
437437
}
438438
// cluster integrity error (cie)
439-
if caller == "" {
440-
caller = "???"
439+
if sender == "" {
440+
sender = "???"
441441
}
442442
s := fmt.Sprintf("%s: Smaps have different UUIDs: local [%s, %s] vs from [%s, %s]",
443-
ciError(cieNum), si, m.StringEx(), caller, newSmap.StringEx())
443+
ciError(cieNum), si, m.StringEx(), sender, newSmap.StringEx())
444444
err = &errSmapUUIDDiffer{s}
445445
return
446446
}

ais/htrun.go

Lines changed: 63 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -696,12 +696,12 @@ func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) {
696696
// req header
697697
if smap.vstr != "" {
698698
if smap.IsPrimary(h.si) {
699-
req.Header.Set(apc.HdrCallerIsPrimary, "true")
699+
req.Header.Set(apc.HdrSenderIsPrimary, "true")
700700
}
701-
req.Header.Set(apc.HdrCallerSmapVer, smap.vstr)
701+
req.Header.Set(apc.HdrSenderSmapVer, smap.vstr)
702702
}
703-
req.Header.Set(apc.HdrCallerID, h.SID())
704-
req.Header.Set(apc.HdrCallerName, h.si.Name())
703+
req.Header.Set(apc.HdrSenderID, h.SID())
704+
req.Header.Set(apc.HdrSenderName, h.si.Name())
705705
req.Header.Set(cos.HdrUserAgent, ua)
706706

707707
resp, res.err = client.Do(req)
@@ -1505,7 +1505,7 @@ func (h *htrun) warnMsync(r *http.Request, smap *smapX) {
15051505
if !smap.isValid() {
15061506
return
15071507
}
1508-
pid := r.Header.Get(apc.HdrCallerID)
1508+
pid := r.Header.Get(apc.HdrSenderID)
15091509
psi := smap.GetNode(pid)
15101510
if psi == nil {
15111511
err := &errNodeNotFound{msg: tag + " warning:", id: pid, si: h.si, smap: smap}
@@ -1515,36 +1515,35 @@ func (h *htrun) warnMsync(r *http.Request, smap *smapX) {
15151515
}
15161516
}
15171517

1518-
func logmsync(lver int64, revs revs, msg *actMsgExt, opts ...string) { // caller [, what, luuid]
1518+
func logmsync(lver int64, revs revs, msg *actMsgExt, sender string, opts ...string) { // sender [, what, luuid]
15191519
const tag = "msync Rx:"
15201520
var (
1521-
what string
1522-
caller = opts[0]
1523-
uuid = revs.uuid()
1524-
lv = "v" + strconv.FormatInt(lver, 10)
1525-
luuid string
1521+
what string
1522+
uuid = revs.uuid()
1523+
lv = "v" + strconv.FormatInt(lver, 10)
1524+
luuid string
15261525
)
15271526
switch len(opts) {
1528-
case 1:
1527+
case 0:
15291528
what = revs.String()
15301529
if uuid := revs.uuid(); uuid != "" {
15311530
what += "[" + uuid + "]"
15321531
}
1533-
case 2:
1534-
what = opts[1]
1532+
case 1:
1533+
what = opts[0]
15351534
if strings.IndexByte(what, '[') < 0 {
15361535
if uuid != "" {
15371536
what += "[" + uuid + "]"
15381537
}
15391538
}
1540-
case 3:
1541-
what = opts[1]
1542-
luuid := opts[2]
1539+
default:
1540+
what = opts[0]
1541+
luuid = opts[1]
15431542
lv += "[" + luuid + "]"
15441543
}
15451544
// different uuids (clusters) - versions cannot be compared
15461545
if luuid != "" && uuid != "" && uuid != luuid {
1547-
nlog.InfoDepth(1, "Warning", tag, what, "( different cluster", lv, msg.String(), "<--", caller, msg.String(), ")")
1546+
nlog.InfoDepth(1, "Warning", tag, what, "( different cluster", lv, msg.String(), "<--", sender, msg.String(), ")")
15481547
return
15491548
}
15501549

@@ -1555,16 +1554,16 @@ func logmsync(lver int64, revs revs, msg *actMsgExt, opts ...string) { // caller
15551554
if lver == 0 {
15561555
s = "( initial"
15571556
}
1558-
nlog.InfoDepth(1, tag, what, s, lv, msg.String(), "<--", caller, ")")
1557+
nlog.InfoDepth(1, tag, what, s, lv, msg.String(), "<--", sender, ")")
15591558
case lver > revs.version():
1560-
nlog.InfoDepth(1, "Warning", tag, what, "( down from", lv, msg.String(), "<--", caller, msg.String(), ")")
1559+
nlog.InfoDepth(1, "Warning", tag, what, "( down from", lv, msg.String(), "<--", sender, msg.String(), ")")
15611560
default:
1562-
nlog.InfoDepth(1, tag, "new", what, "( have", lv, msg.String(), "<--", caller, ")")
1561+
nlog.InfoDepth(1, tag, "new", what, "( have", lv, msg.String(), "<--", sender, ")")
15631562
}
15641563
}
15651564

15661565
// return extracted (new) config with associated action message; otherwise error
1567-
func (h *htrun) extractConfig(payload msPayload, caller string) (*globalConfig, *actMsgExt, error) {
1566+
func (h *htrun) extractConfig(payload msPayload, sender string) (*globalConfig, *actMsgExt, error) {
15681567
confValue, ok := payload[revsConfTag]
15691568
if !ok {
15701569
return nil, nil, nil
@@ -1587,7 +1586,7 @@ func (h *htrun) extractConfig(payload msPayload, caller string) (*globalConfig,
15871586
}
15881587
config := cmn.GCO.Get()
15891588
if cmn.Rom.V(4, cos.ModAIS) {
1590-
logmsync(config.Version, newConfig, msg, caller, newConfig.String(), config.UUID)
1589+
logmsync(config.Version, newConfig, msg, sender, newConfig.String(), config.UUID)
15911590
}
15921591
if newConfig.version() <= config.Version && msg.Action != apc.ActPrimaryForce {
15931592
if newConfig.version() < config.Version {
@@ -1600,7 +1599,7 @@ func (h *htrun) extractConfig(payload msPayload, caller string) (*globalConfig,
16001599
}
16011600

16021601
// return extracted (new) etl metadata with associated action message; otherwise error
1603-
func (h *htrun) extractEtlMD(payload msPayload, caller string) (*etlMD, *actMsgExt, error) {
1602+
func (h *htrun) extractEtlMD(payload msPayload, sender string) (*etlMD, *actMsgExt, error) {
16041603
etlMDValue, ok := payload[revsEtlMDTag]
16051604
if !ok {
16061605
return nil, nil, nil
@@ -1623,7 +1622,7 @@ func (h *htrun) extractEtlMD(payload msPayload, caller string) (*etlMD, *actMsgE
16231622

16241623
etlMD := h.owner.etl.get()
16251624
if cmn.Rom.V(4, cos.ModAIS) {
1626-
logmsync(etlMD.Version, newMD, msg, caller)
1625+
logmsync(etlMD.Version, newMD, msg, sender)
16271626
}
16281627
if newMD.version() <= etlMD.version() && msg.Action != apc.ActPrimaryForce {
16291628
if newMD.version() < etlMD.version() {
@@ -1636,7 +1635,7 @@ func (h *htrun) extractEtlMD(payload msPayload, caller string) (*etlMD, *actMsgE
16361635
}
16371636

16381637
// return extracted (new) Smap with associated action message; otherwise error
1639-
func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation bool) (*smapX, *actMsgExt, error) {
1638+
func (h *htrun) extractSmap(payload msPayload, sender string, skipValidation bool) (*smapX, *actMsgExt, error) {
16401639
const (
16411640
act = "extract-smap"
16421641
)
@@ -1685,12 +1684,12 @@ func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation boo
16851684
return newSmap, msg, nil
16861685
}
16871686

1688-
if err := smap.validateUUID(h.si, newSmap, caller, 50 /* ciError */); err != nil {
1687+
if err := smap.validateUUID(h.si, newSmap, sender, 50 /* ciError */); err != nil {
16891688
return newSmap, msg, err // FATAL: cluster integrity error
16901689
}
16911690

16921691
if cmn.Rom.V(4, cos.ModAIS) {
1693-
logmsync(smap.Version, newSmap, msg, caller, newSmap.String(), smap.UUID)
1692+
logmsync(smap.Version, newSmap, msg, sender, newSmap.String(), smap.UUID)
16941693
}
16951694
_, sameOrigin, _, eq := smap.Compare(&newSmap.Smap)
16961695
debug.Assert(sameOrigin)
@@ -1707,7 +1706,7 @@ func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation boo
17071706
}
17081707

17091708
// return extracted (new) RMD with associated action message; otherwise error
1710-
func (h *htrun) extractRMD(payload msPayload, caller string) (*rebMD, *actMsgExt, error) {
1709+
func (h *htrun) extractRMD(payload msPayload, sender string) (*rebMD, *actMsgExt, error) {
17111710
rmdValue, ok := payload[revsRMDTag]
17121711
if !ok {
17131712
return nil, nil, nil
@@ -1729,7 +1728,7 @@ func (h *htrun) extractRMD(payload msPayload, caller string) (*rebMD, *actMsgExt
17291728
}
17301729

17311730
rmd := h.owner.rmd.get()
1732-
logmsync(rmd.Version, newRMD, msg, caller, newRMD.String(), rmd.CluID)
1731+
logmsync(rmd.Version, newRMD, msg, sender, newRMD.String(), rmd.CluID)
17331732

17341733
if msg.Action == apc.ActPrimaryForce {
17351734
return newRMD, msg, nil
@@ -1752,7 +1751,7 @@ func (h *htrun) extractRMD(payload msPayload, caller string) (*rebMD, *actMsgExt
17521751
}
17531752

17541753
// return extracted (new) BMD with associated action message; otherwise error
1755-
func (h *htrun) extractBMD(payload msPayload, caller string) (*bucketMD, *actMsgExt, error) {
1754+
func (h *htrun) extractBMD(payload msPayload, sender string) (*bucketMD, *actMsgExt, error) {
17561755
bmdValue, ok := payload[revsBMDTag]
17571756
if !ok {
17581757
return nil, nil, nil
@@ -1776,7 +1775,7 @@ func (h *htrun) extractBMD(payload msPayload, caller string) (*bucketMD, *actMsg
17761775

17771776
bmd := h.owner.bmd.get()
17781777
if cmn.Rom.V(4, cos.ModAIS) {
1779-
logmsync(bmd.Version, newBMD, msg, caller, newBMD.String(), bmd.UUID)
1778+
logmsync(bmd.Version, newBMD, msg, sender, newBMD.String(), bmd.UUID)
17801779
}
17811780
// skip older iff not transactional - see t.receiveBMD()
17821781
if h.si.IsTarget() && msg.UUID != "" {
@@ -1792,25 +1791,25 @@ func (h *htrun) extractBMD(payload msPayload, caller string) (*bucketMD, *actMsg
17921791
return newBMD, msg, nil
17931792
}
17941793

1795-
func (h *htrun) receiveSmap(newSmap *smapX, msg *actMsgExt, payload msPayload, caller string, cb smapUpdatedCB) error {
1794+
func (h *htrun) receiveSmap(newSmap *smapX, msg *actMsgExt, payload msPayload, sender string, cb smapUpdatedCB) error {
17961795
if newSmap == nil {
17971796
return nil
17981797
}
17991798
smap := h.owner.smap.get()
1800-
logmsync(smap.Version, newSmap, msg, caller, newSmap.StringEx(), smap.UUID)
1799+
logmsync(smap.Version, newSmap, msg, sender, newSmap.StringEx(), smap.UUID)
18011800

18021801
if !newSmap.isPresent(h.si) {
18031802
return &errSelfNotFound{act: "receive-smap", si: h.si, tag: "new", smap: newSmap}
18041803
}
18051804
return h.owner.smap.synchronize(h.si, newSmap, payload, cb)
18061805
}
18071806

1808-
func (h *htrun) receiveEtlMD(newEtlMD *etlMD, msg *actMsgExt, payload msPayload, caller string, cb func(ne, oe *etlMD)) (err error) {
1807+
func (h *htrun) receiveEtlMD(newEtlMD *etlMD, msg *actMsgExt, payload msPayload, sender string, cb func(ne, oe *etlMD)) (err error) {
18091808
if newEtlMD == nil {
18101809
return
18111810
}
18121811
etlMD := h.owner.etl.get()
1813-
logmsync(etlMD.Version, newEtlMD, msg, caller)
1812+
logmsync(etlMD.Version, newEtlMD, msg, sender)
18141813

18151814
h.owner.etl.Lock()
18161815
etlMD = h.owner.etl.get()
@@ -1853,7 +1852,7 @@ func (h *htrun) _recvCfg(newConfig *globalConfig, msg *actMsgExt, payload msPayl
18531852
return cmn.GCO.Update(&newConfig.ClusterConfig)
18541853
}
18551854

1856-
func (h *htrun) extractRevokedTokenList(payload msPayload, caller string) (*tokenList, error) {
1855+
func (h *htrun) extractRevokedTokenList(payload msPayload, sender string) (*tokenList, error) {
18571856
var (
18581857
msg actMsgExt
18591858
bytes, ok = payload[revsTokenTag]
@@ -1872,7 +1871,7 @@ func (h *htrun) extractRevokedTokenList(payload msPayload, caller string) (*toke
18721871
err = fmt.Errorf(cmn.FmtErrUnmarshal, h, "blocked token list", cos.BHead(bytes), err)
18731872
return nil, err
18741873
}
1875-
nlog.Infof("extract token list from %q (count: %d, action: %q, uuid: %q)", caller,
1874+
nlog.Infof("extract token list from %q (count: %d, action: %q, uuid: %q)", sender,
18761875
len(tokenList.Tokens), msg.Action, msg.UUID)
18771876
return tokenList, nil
18781877
}
@@ -2235,13 +2234,13 @@ func (h *htrun) rmSelf(smap *smapX, ignoreErr bool) error {
22352234
// via /health handler
22362235
func (h *htrun) externalWD(w http.ResponseWriter, r *http.Request) (responded bool) {
22372236
var (
2238-
callerID = r.Header.Get(apc.HdrCallerID)
2239-
caller = r.Header.Get(apc.HdrCallerName)
2237+
senderID = r.Header.Get(apc.HdrSenderID)
2238+
senderName = r.Header.Get(apc.HdrSenderName)
22402239
)
22412240
// external WD
22422241
// TODO: check receiving on PubNet
22432242
// NOTE: always ready for K8s
2244-
if callerID == "" && caller == "" {
2243+
if senderID == "" && senderName == "" {
22452244
if cmn.Rom.V(5, cos.ModKalive) {
22462245
readiness := strings.Contains(r.URL.RawQuery, apc.QparamHealthReady)
22472246
nlog.Infoln(h.String(), "external health-probe:", r.RemoteAddr, readiness, "[", r.URL.RawQuery, "]")
@@ -2267,72 +2266,73 @@ func (h *htrun) externalWD(w http.ResponseWriter, r *http.Request) (responded bo
22672266

22682267
func (h *htrun) ensureSameSmap(hdr http.Header, smap *smapX) (int, error) {
22692268
var (
2270-
callerID = hdr.Get(apc.HdrCallerID)
2271-
callerName = hdr.Get(apc.HdrCallerName)
2272-
callerSver = hdr.Get(apc.HdrCallerSmapVer)
2269+
senderID = hdr.Get(apc.HdrSenderID)
2270+
senderName = hdr.Get(apc.HdrSenderName)
2271+
senderSver = hdr.Get(apc.HdrSenderSmapVer)
22732272
)
22742273
if !h.ClusterStarted() {
22752274
return http.StatusServiceUnavailable, errors.New("not ready yet")
22762275
}
2277-
if ok := callerID != "" && callerName != ""; !ok {
2276+
if ok := senderID != "" && senderName != ""; !ok {
22782277
return 0, errIntraControl
22792278
}
22802279
if err := smap.validate(); err != nil {
22812280
return 0, err
22822281
}
2283-
caller := smap.GetNode(callerID)
2284-
if caller == nil {
2285-
return http.StatusConflict, fmt.Errorf("%s: caller %s (%s) not present in the local %s", h, callerID, callerName, smap.StringEx())
2282+
sender := smap.GetNode(senderID)
2283+
if sender == nil {
2284+
return http.StatusConflict, fmt.Errorf("%s: sender %s (%s) not present in the local %s", h, senderID, senderName, smap.StringEx())
22862285
}
2287-
if callerSver != smap.vstr {
2288-
return http.StatusConflict, fmt.Errorf("%s: different Smap version from %s(%s): %q vs local %s", h, callerID, callerName, callerSver, smap.StringEx())
2286+
if senderSver != smap.vstr {
2287+
return http.StatusConflict, fmt.Errorf("%s: different Smap version from %s(%s): %q vs local %s",
2288+
h, senderID, senderName, senderSver, smap.StringEx())
22892289
}
22902290
return 0, nil
22912291
}
22922292

22932293
func (h *htrun) checkIntraCall(hdr http.Header, fromPrimary bool) error {
22942294
var (
22952295
smap = h.owner.smap.get()
2296-
callerID = hdr.Get(apc.HdrCallerID)
2297-
callerName = hdr.Get(apc.HdrCallerName)
2298-
callerSver = hdr.Get(apc.HdrCallerSmapVer)
2296+
senderID = hdr.Get(apc.HdrSenderID)
2297+
senderName = hdr.Get(apc.HdrSenderName)
2298+
senderSver = hdr.Get(apc.HdrSenderSmapVer)
22992299
)
2300-
if ok := callerID != "" && callerName != ""; !ok {
2300+
if ok := senderID != "" && senderName != ""; !ok {
23012301
return errIntraControl
23022302
}
23032303
if !smap.isValid() {
23042304
return nil
23052305
}
2306-
caller := smap.GetNode(callerID)
2307-
if ok := caller != nil && (!fromPrimary || smap.isPrimary(caller)); ok {
2306+
node := smap.GetNode(senderID)
2307+
if ok := node != nil && (!fromPrimary || smap.isPrimary(node)); ok {
23082308
return nil
23092309
}
2310-
if callerSver != smap.vstr && callerSver != "" {
2311-
callerVer, err := strconv.ParseInt(callerSver, 10, 64)
2310+
if senderSver != smap.vstr && senderSver != "" {
2311+
ver, err := strconv.ParseInt(senderSver, 10, 64)
23122312
if err != nil { // (unlikely)
2313-
e := fmt.Errorf("%s: invalid caller's Smap ver [%s, %q, %w], %s", h, callerName, callerSver, err, smap)
2313+
e := fmt.Errorf("%s: invalid sender's Smap ver [%s, %q, %w], %s", h, senderName, senderSver, err, smap)
23142314
nlog.Errorln(e)
23152315
return e
23162316
}
23172317
// we still trust the request when the sender's Smap is more current
2318-
if callerVer > smap.version() {
2318+
if ver > smap.version() {
23192319
if h.ClusterStarted() {
23202320
// (exception: setting primary w/ force)
2321-
warn := h.String() + ": local " + smap.String() + " is older than (caller's) " + callerName + " Smap v" + callerSver
2321+
warn := h.String() + ": local " + smap.String() + " is older than (sender's) " + senderName + " Smap v" + senderSver
23222322
nlog.ErrorDepth(1, warn, "- proceeding anyway...")
23232323
}
23242324
runtime.Gosched()
23252325
return nil
23262326
}
23272327
}
2328-
if caller == nil {
2328+
if node == nil {
23292329
if !fromPrimary {
23302330
// assume request from a newly joined node and proceed
23312331
return nil
23322332
}
23332333
return fmt.Errorf("%s: expected %s from a valid node, %s", h, cmn.NetIntraControl, smap)
23342334
}
2335-
return fmt.Errorf("%s: expected %s from primary (and not %s), %s", h, cmn.NetIntraControl, caller, smap)
2335+
return fmt.Errorf("%s: expected %s from primary (and not %s), %s", h, cmn.NetIntraControl, node, smap)
23362336
}
23372337

23382338
func (h *htrun) ensureIntraControl(w http.ResponseWriter, r *http.Request, onlyPrimary bool) (isIntra bool) {

0 commit comments

Comments
 (0)