Skip to content

Commit 43db3ec

Browse files
committed
[config change] keepalive validation and (new) get-batch
* add get-batch config section: - configurable max_wait (default 30s, range [1s, 1m]) - warmup_workers (default 2, -1 to disable) * keepalive: - add interval validation with cross-section timeout checks - validate node-offline detection window * cluster config: - add context-validator, to perform cross-sections * get-batch/x-moss: - replace hardcoded max-wait and warmup workers count Signed-off-by: Alex Aizman <[email protected]>
1 parent c0b5aee commit 43db3ec

File tree

3 files changed

+160
-64
lines changed

3 files changed

+160
-64
lines changed

cmn/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func (bp *Bprops) Validate(targetCnt int) error {
239239

240240
// run assorted props validators
241241
var softErr error
242-
for _, pv := range []PropsValidator{&bp.Cksum, &bp.Mirror, &bp.EC, &bp.Extra, &bp.WritePolicy, &bp.RateLimit, &bp.Chunks, &bp.LRU, &bp.Features} {
242+
for _, pv := range []propsValidator{&bp.Cksum, &bp.Mirror, &bp.EC, &bp.Extra, &bp.WritePolicy, &bp.RateLimit, &bp.Chunks, &bp.LRU, &bp.Features} {
243243
var err error
244244
switch {
245245
case pv == &bp.EC:

cmn/config.go

Lines changed: 149 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,13 @@ const (
3535
)
3636

3737
type (
38-
Validator interface {
38+
validator interface {
3939
Validate() error
4040
}
41-
PropsValidator interface {
41+
contextValidator interface {
42+
Validate(*Config) error
43+
}
44+
propsValidator interface {
4245
ValidateAsProps(arg ...any) error
4346
}
4447
)
@@ -128,6 +131,7 @@ type (
128131
Version int64 `json:"config_version,string"`
129132
Versioning VersionConf `json:"versioning" allow:"cluster"`
130133
Resilver ResilverConf `json:"resilver"`
134+
GetBatch GetBatchConf `json:"get_batch" allow:"cluster"`
131135
}
132136
ConfigToSet struct {
133137
// ClusterConfig
@@ -160,6 +164,7 @@ type (
160164
Proxy *ProxyConfToSet `json:"proxy,omitempty"`
161165
RateLimit *RateLimitConfToSet `json:"rate_limit"`
162166
Features *feat.Flags `json:"features,string,omitempty"`
167+
GetBatch *GetBatchConfToSet `json:"get_batch,omitempty"`
163168

164169
// LocalConfig
165170
FSP *FSPConf `json:"fspaths,omitempty"`
@@ -637,9 +642,9 @@ type (
637642

638643
AuthConf struct {
639644
Secret string `json:"secret"`
640-
Enabled bool `json:"enabled"`
641645
PubKey string `json:"public_key"`
642646
Aud []string `json:"aud"`
647+
Enabled bool `json:"enabled"`
643648
}
644649
AuthConfToSet struct {
645650
Secret *string `json:"secret,omitempty"`
@@ -833,6 +838,31 @@ type (
833838
}
834839
)
835840

841+
// ref: https://github.com/NVIDIA/aistore/releases/tag/v1.4.0#getbatch-api-ml-endpoint
842+
type (
843+
GetBatchConf struct {
844+
// Maximum time to wait for remote targets to send their data during
845+
// distributed multi-object/multi-file retrieval. When a sending target
846+
// fails or is slow to respond, the designated target (DT) waits up to
847+
// this duration before attempting recovery.
848+
// Longer values increase tolerance for transient slowdowns but delay
849+
// error detection. Shorter values fail faster but may be too aggressive.
850+
MaxWait cos.Duration `json:"max_wait"`
851+
852+
// Number of worker goroutines for pagecache read-ahead warming.
853+
// Helps reduce cold-read latency by pre-loading file data into memory.
854+
// Always auto-disabled under extreme system load.
855+
// - (-1): disabled
856+
// - 0: default (enabled w/ 2 (numWarmupWorkersDflt) workers)
857+
// - >0: enabled with N workers
858+
NumWarmupWorkers int `json:"warmup_workers"`
859+
}
860+
GetBatchConfToSet struct {
861+
MaxWait *cos.Duration `json:"max_wait,omitempty"`
862+
NumWarmupWorkers *int `json:"warmup_workers,omitempty"`
863+
}
864+
)
865+
836866
// assorted named fields that require (cluster | node) restart for changes to make an effect
837867
// (used by CLI)
838868
var ConfigRestartRequired = [...]string{"auth.secret", "memsys", "net"}
@@ -859,48 +889,50 @@ func (*ConfigToSet) JspOpts() jsp.Options { return _jspOpts() }
859889

860890
// interface guard: config validators
861891
var (
862-
_ Validator = (*BackendConf)(nil)
863-
_ Validator = (*CksumConf)(nil)
864-
_ Validator = (*LogConf)(nil)
865-
_ Validator = (*LRUConf)(nil)
866-
_ Validator = (*SpaceConf)(nil)
867-
_ Validator = (*MirrorConf)(nil)
868-
_ Validator = (*ECConf)(nil)
869-
_ Validator = (*ChunksConf)(nil)
870-
_ Validator = (*VersionConf)(nil)
871-
_ Validator = (*KeepaliveConf)(nil)
872-
_ Validator = (*PeriodConf)(nil)
873-
_ Validator = (*TimeoutConf)(nil)
874-
_ Validator = (*ClientConf)(nil)
875-
_ Validator = (*RebalanceConf)(nil)
876-
_ Validator = (*ResilverConf)(nil)
877-
_ Validator = (*NetConf)(nil)
878-
_ Validator = (*FSHCConf)(nil)
879-
_ Validator = (*HTTPConf)(nil)
880-
_ Validator = (*DownloaderConf)(nil)
881-
_ Validator = (*DsortConf)(nil)
882-
_ Validator = (*TransportConf)(nil)
883-
_ Validator = (*MemsysConf)(nil)
884-
_ Validator = (*TCBConf)(nil)
885-
_ Validator = (*WritePolicyConf)(nil)
886-
_ Validator = (*TracingConf)(nil)
887-
888-
_ Validator = (*feat.Flags)(nil)
892+
_ validator = (*BackendConf)(nil)
893+
_ validator = (*CksumConf)(nil)
894+
_ validator = (*LogConf)(nil)
895+
_ validator = (*LRUConf)(nil)
896+
_ validator = (*SpaceConf)(nil)
897+
_ validator = (*MirrorConf)(nil)
898+
_ validator = (*ECConf)(nil)
899+
_ validator = (*ChunksConf)(nil)
900+
_ validator = (*VersionConf)(nil)
901+
_ validator = (*PeriodConf)(nil)
902+
_ validator = (*TimeoutConf)(nil)
903+
_ validator = (*ClientConf)(nil)
904+
_ validator = (*RebalanceConf)(nil)
905+
_ validator = (*ResilverConf)(nil)
906+
_ validator = (*NetConf)(nil)
907+
_ validator = (*FSHCConf)(nil)
908+
_ validator = (*HTTPConf)(nil)
909+
_ validator = (*DownloaderConf)(nil)
910+
_ validator = (*DsortConf)(nil)
911+
_ validator = (*TransportConf)(nil)
912+
_ validator = (*MemsysConf)(nil)
913+
_ validator = (*TCBConf)(nil)
914+
_ validator = (*WritePolicyConf)(nil)
915+
_ validator = (*TracingConf)(nil)
916+
_ validator = (*GetBatchConf)(nil)
917+
918+
_ validator = (*feat.Flags)(nil) // is called explicitly from main config validator
919+
920+
_ contextValidator = (*KeepaliveConf)(nil)
889921
)
890922

891923
// interface guard: bucket-level validators-as-props
892924
var (
893-
_ PropsValidator = (*CksumConf)(nil)
894-
_ PropsValidator = (*MirrorConf)(nil)
895-
_ PropsValidator = (*ECConf)(nil)
925+
_ propsValidator = (*CksumConf)(nil)
926+
_ propsValidator = (*MirrorConf)(nil)
927+
_ propsValidator = (*ECConf)(nil)
896928

897-
_ PropsValidator = (*ExtraProps)(nil)
898-
_ PropsValidator = (*feat.Flags)(nil)
929+
_ propsValidator = (*ExtraProps)(nil)
930+
_ propsValidator = (*feat.Flags)(nil)
899931

900-
_ PropsValidator = (*WritePolicyConf)(nil)
901-
_ PropsValidator = (*RateLimitConf)(nil)
902-
_ PropsValidator = (*ChunksConf)(nil)
903-
_ PropsValidator = (*LRUConf)(nil)
932+
_ propsValidator = (*WritePolicyConf)(nil)
933+
_ propsValidator = (*RateLimitConf)(nil)
934+
_ propsValidator = (*ChunksConf)(nil)
935+
_ propsValidator = (*LRUConf)(nil)
904936
)
905937

906938
// interface guard: special (un)marshaling
@@ -924,8 +956,9 @@ func (c *Config) Validate() error {
924956
return errors.New("invalid log dir value (must be non-empty)")
925957
}
926958

927-
// NOTE: These two validations require more context and so we call them explicitly;
928-
// The rest all implement generic interface.
959+
//
960+
// NOTE: the following validations perform cross-sections checks - call them explicitly
961+
//
929962
if err := c.LocalConfig.HostNet.Validate(c); err != nil {
930963
return err
931964
}
@@ -938,16 +971,16 @@ func (c *Config) Validate() error {
938971
if err := c.Features.Validate(); err != nil {
939972
return err
940973
}
941-
942974
opts := IterOpts{VisitAll: true}
943-
return IterFields(c, _validateFld, opts)
975+
return IterFields(c, c.validateFld, opts)
944976
}
945977

946-
func _validateFld(_ string, field IterField) (error, bool) {
947-
if v, ok := field.Value().(Validator); ok {
948-
if err := v.Validate(); err != nil {
949-
return err, false
950-
}
978+
func (c *Config) validateFld(_ string, field IterField) (error, bool) {
979+
switch v := field.Value().(type) {
980+
case contextValidator:
981+
return v.Validate(c), false
982+
case validator:
983+
return v.Validate(), false
951984
}
952985
return nil, false
953986
}
@@ -1639,7 +1672,14 @@ func (c *WritePolicyConf) ValidateAsProps(...any) error { return c.Validate() }
16391672
// see palive.retry in re "total number of failures prior to removing"
16401673
const kaNumRetries = 3
16411674

1642-
func (c *KeepaliveConf) Validate() error {
1675+
// interval bounds
1676+
const (
1677+
kaliveIvalMin = 1 * time.Second
1678+
kaliveIvalMax = 1 * time.Minute
1679+
kaliveToutMax = 2 * time.Minute
1680+
)
1681+
1682+
func (c *KeepaliveConf) Validate(config *Config) error {
16431683
if c.Proxy.Name != "heartbeat" {
16441684
return fmt.Errorf("invalid keepalivetracker.proxy.name %s", c.Proxy.Name)
16451685
}
@@ -1655,14 +1695,37 @@ func (c *KeepaliveConf) Validate() error {
16551695
if c.NumRetries < 1 || c.NumRetries > 10 {
16561696
return fmt.Errorf("invalid keepalivetracker.num_retries %d (expecting range [1, 10])", c.NumRetries)
16571697
}
1658-
return nil
1698+
1699+
if err := c.Proxy.validate(&config.Timeout); err != nil {
1700+
return err
1701+
}
1702+
return c.Target.validate(&config.Timeout)
16591703
}
16601704

16611705
func KeepaliveRetryDuration(c *Config) time.Duration {
16621706
d := c.Timeout.CplaneOperation.D() * time.Duration(c.Keepalive.RetryFactor)
16631707
return min(d, c.Timeout.MaxKeepalive.D()+time.Second)
16641708
}
16651709

1710+
func (c *KeepaliveTrackerConf) validate(timeoutConf *TimeoutConf) error {
1711+
if c.Interval.D() < kaliveIvalMin || c.Interval.D() > kaliveIvalMax {
1712+
return fmt.Errorf("invalid keepalivetracker.interval=%s (expected range [%v, %v])",
1713+
c.Interval, kaliveIvalMin, kaliveIvalMax)
1714+
}
1715+
//
1716+
// must be consistent w/ assorted timeout knobs
1717+
//
1718+
if c.Interval.D() < timeoutConf.MaxKeepalive.D() {
1719+
return fmt.Errorf("keepalivetracker.interval=%s should be >= timeout.max_keepalive=%s",
1720+
c.Interval, timeoutConf.MaxKeepalive)
1721+
}
1722+
kwin := c.Interval.D() * time.Duration(c.Factor)
1723+
if kwin > kaliveToutMax {
1724+
return fmt.Errorf("keepalive detection window=%v (interval * factor) exceeds %v", kwin, kaliveToutMax)
1725+
}
1726+
return nil
1727+
}
1728+
16661729
/////////////
16671730
// NetConf and NetConf.HTTPConf
16681731
/////////////
@@ -2316,6 +2379,42 @@ func (*RateLimitConf) verbs(tag, name, value string) error {
23162379
// NOTE: separately, frontend-rate-limiter validation in `makeNewBckProps`
23172380
func (c *RateLimitConf) ValidateAsProps(...any) error { return c.Validate() }
23182381

2382+
//////////////////
2383+
// GetBatchConf //
2384+
//////////////////
2385+
2386+
const (
2387+
getBatchMaxWaitDflt = 30 * time.Second
2388+
getBatchMaxWaitMin = time.Second
2389+
getBatchMaxWaitMax = time.Minute
2390+
2391+
numWarmupWorkersDisabled = -1
2392+
numWarmupWorkersDflt = 2
2393+
)
2394+
2395+
func (c *GetBatchConf) WarmupWorkers() int {
2396+
return cos.Ternary(c.NumWarmupWorkers == numWarmupWorkersDisabled, 0, c.NumWarmupWorkers)
2397+
}
2398+
2399+
func (c *GetBatchConf) Validate() error {
2400+
debug.Assert(numWarmupWorkersDisabled < 0 && getBatchMaxWaitDflt > getBatchMaxWaitMin && getBatchMaxWaitMin < getBatchMaxWaitMax)
2401+
if c.MaxWait == 0 {
2402+
c.MaxWait = cos.Duration(getBatchMaxWaitDflt)
2403+
} else if c.MaxWait.D() < getBatchMaxWaitMin || c.MaxWait.D() > getBatchMaxWaitMax {
2404+
return fmt.Errorf("invalid get_batch.max_wait=%s (must be in range [%v, %v])", c.MaxWait, getBatchMaxWaitMin, getBatchMaxWaitMax)
2405+
}
2406+
switch c.NumWarmupWorkers {
2407+
case numWarmupWorkersDisabled:
2408+
case 0:
2409+
c.NumWarmupWorkers = numWarmupWorkersDflt
2410+
default:
2411+
if c.NumWarmupWorkers < 0 || c.NumWarmupWorkers > 10 {
2412+
return fmt.Errorf("invalid get_batch.warmup_workers=%d (expecting range [%d, %d])", c.NumWarmupWorkers, 0, 10)
2413+
}
2414+
}
2415+
return nil
2416+
}
2417+
23192418
//
23202419
// misc config utilities ---------------------------------------------------------
23212420
//

xact/xs/moss.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,6 @@ where data-item = (object | archived file) // range read TBD
7272
// - range read
7373
// - finer-grained wi abort
7474

75-
// TODO configurable "hole-plug" waiting timeout: max(host-busy, 30s); see also logStats
76-
const (
77-
maxwait = 30 * time.Second
78-
)
7975
const (
8076
sparseLog = 30 * time.Second
8177
)
@@ -85,7 +81,6 @@ const (
8581
// - bewarm pool is created once at x-moss initialization based on current system load;
8682
// - it stays enabled/disabled for the entire lifetime of this xaction instance across all work items
8783
const (
88-
bewarmCnt = 2
8984
bewarmSize = cos.MiB
9085
)
9186

@@ -248,14 +243,16 @@ func newMoss(p *mossFactory) *XactMoss {
248243
r.DemandBase.Init(p.UUID(), p.Kind(), "" /*ctlmsg*/, p.Bck, mossIdleTime, r.fini)
249244

250245
// best-effort `bewarm` for pagecache warming-up
251-
load, isExtreme := sys.MaxLoad2()
252246
s := "without"
253-
if !isExtreme {
254-
ncpu := sys.NumCPU()
255-
highcpu := ncpu * sys.HighLoad / 100
256-
if load < float64(highcpu) {
257-
r.bewarm = work.New(bewarmCnt, bewarmCnt, r.bewarmFQN, r.ChanAbort())
258-
s = "with"
247+
if num := r.config.GetBatch.WarmupWorkers(); num > 0 {
248+
load, isExtreme := sys.MaxLoad2()
249+
if !isExtreme {
250+
ncpu := sys.NumCPU()
251+
highcpu := ncpu * sys.HighLoad / 100
252+
if load < float64(highcpu) {
253+
r.bewarm = work.New(num, num /*work-chan cap*/, r.bewarmFQN, r.ChanAbort())
254+
s = "with"
255+
}
259256
}
260257
}
261258

@@ -1001,7 +998,7 @@ func (wi *basewi) waitFlushRx(i int) (int, error) {
1001998
elapsed time.Duration
1002999
start = mono.NanoTime()
10031000
recvDrainBurst = cos.ClampInt(8, 1, cap(wi.recv.ch)>>1)
1004-
timeout = max(maxwait, wi.r.config.Timeout.MaxHostBusy.D())
1001+
timeout = wi.r.config.GetBatch.MaxWait.D()
10051002
)
10061003
for {
10071004
wi.recv.mtx.Lock()

0 commit comments

Comments
 (0)