Skip to content

Commit 99e7561

Browse files
committed
fix: move saving decision into Save and add jitter
1 parent 36de1b7 commit 99e7561

File tree

7 files changed

+75
-58
lines changed

7 files changed

+75
-58
lines changed

cmd/statshouse-agg/statshouse-agg.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,13 @@ main_loop:
174174
log.Printf("4. Waiting RPC clients to receive responses and disconnect...")
175175
agg.WaitRPCServer(10 * time.Second)
176176
shutdownInfo.StopRPCServer = agent.ShutdownInfoDuration(&now).Nanoseconds()
177-
log.Printf("5. Saving mappings...")
178-
_ = mappingsCache.Save()
177+
log.Printf("5. Saving mappings cache...")
178+
ok, err := mappingsCache.Save()
179+
if ok {
180+
log.Printf("5. Mappings cache saved")
181+
} else if err != nil {
182+
log.Printf("5. Failed to save mappings cache: %v", err)
183+
}
179184
shutdownInfo.SaveMappings = agent.ShutdownInfoDuration(&now).Nanoseconds()
180185
log.Printf("6. Saving journals...")
181186
agg.SaveJournals()

cmd/statshouse/statshouse.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -420,11 +420,21 @@ loop:
420420
logOk.Printf("7. Waiting preprocessor to save %d buckets of historic data...", nonEmpty)
421421
main.agent.WaitPreprocessor()
422422
shutdownInfo.StopPreprocessor = agent.ShutdownInfoDuration(&now).Nanoseconds()
423-
logOk.Printf("8. Saving mappings...")
424-
_ = mappingsCache.Save()
423+
logOk.Printf("8. Saving mappings cache...")
424+
ok, err := mappingsCache.Save()
425+
if ok {
426+
logOk.Printf("8. Mappings cache saved")
427+
} else if err != nil {
428+
logErr.Printf("8. Failed to save mappings cache: %v", err)
429+
}
425430
shutdownInfo.SaveMappings = agent.ShutdownInfoDuration(&now).Nanoseconds()
426431
logOk.Printf("9. Saving journal...")
427-
_ = journalFast.Save()
432+
ok, version, err := journalFast.Save()
433+
if ok {
434+
logOk.Printf("9. Journal saved, new version %d", version)
435+
} else if err != nil {
436+
logErr.Printf("9. Failed to save journal: %v", err)
437+
}
428438
shutdownInfo.SaveJournal = agent.ShutdownInfoDuration(&now).Nanoseconds()
429439
shutdownInfo.FinishShutdownTime = now.UnixNano()
430440
agent.ShutdownInfoSave(argv.cacheDir, shutdownInfo)

internal/aggregator/aggregator.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ func MakeAggregator(dc pcache.DiskCache, fj *os.File, fjCompact *os.File, mappin
366366
go func() {
367367
for {
368368
time.Sleep(time.Hour) // arbitrary
369-
_ = mappingsCache.Save()
369+
_, _ = mappingsCache.Save()
370370
a.SaveJournals()
371371
}
372372
}()
@@ -375,8 +375,8 @@ func MakeAggregator(dc pcache.DiskCache, fj *os.File, fjCompact *os.File, mappin
375375
}
376376

377377
func (a *Aggregator) SaveJournals() {
378-
_ = a.journalFast.Save()
379-
_ = a.journalCompact.Save()
378+
_, _, _ = a.journalFast.Save()
379+
_, _, _ = a.journalCompact.Save()
380380
}
381381

382382
func (a *Aggregator) Agent() *agent.Agent {

internal/metajournal/journal_fast.go

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/google/btree"
2222
"github.com/mailru/easyjson"
2323
"github.com/zeebo/xxh3"
24+
"pgregory.net/rand"
2425

2526
"github.com/VKCOM/statshouse/internal/agent"
2627
"github.com/VKCOM/statshouse/internal/data_model"
@@ -71,7 +72,7 @@ type JournalFast struct {
7172

7273
// periodic saving
7374
lastSavedVersion int64
74-
periodicSaveInterval time.Duration
75+
periodicSaveInterval time.Duration // we run between one and two times this interval
7576

7677
BuiltinLongPollImmediateOK data_model.ItemValue
7778
BuiltinLongPollImmediateError data_model.ItemValue
@@ -209,7 +210,7 @@ func (ms *JournalFast) loadImpl(fileSize int64, readAt func(b []byte, offset int
209210
return src, nil
210211
}
211212

212-
func (ms *JournalFast) Save() error {
213+
func (ms *JournalFast) Save() (bool, int64, error) {
213214
// We exclude writers so that they do not block on Lock() while code below runs in RLock().
214215
// If we allow this, all new readers (GetValue) block on RLock(), effectively waiting for Save to finish.
215216
ms.modifyMu.Lock()
@@ -218,11 +219,20 @@ func (ms *JournalFast) Save() error {
218219
ms.mu.RLock()
219220
defer ms.mu.RUnlock()
220221

222+
// If the journal is not changed, do not save it
223+
if ms.lastSavedVersion == ms.currentVersion {
224+
return false, ms.currentVersion, nil
225+
}
221226
saver := data_model.ChunkedStorageSaver{
222227
WriteAt: ms.writeAt,
223228
Truncate: ms.truncate,
224229
}
225-
return ms.save(&saver, 0)
230+
err := ms.save(&saver, 0)
231+
if err != nil {
232+
return false, ms.currentVersion, err
233+
}
234+
ms.lastSavedVersion = ms.currentVersion
235+
return true, ms.currentVersion, nil
226236
}
227237

228238
func (ms *JournalFast) save(saver *data_model.ChunkedStorageSaver, maxChunkSize int) error {
@@ -561,29 +571,15 @@ func (ms *JournalFast) StartPeriodicSaving() {
561571
}
562572

563573
func (ms *JournalFast) goPeriodicSaving() {
564-
// Fixed 10-minute delay to prevent simultaneous saves with mappings cache
565-
time.Sleep(10 * time.Minute)
566-
567-
ticker := time.NewTicker(ms.periodicSaveInterval)
568-
defer ticker.Stop()
569-
570-
for range ticker.C {
571-
ms.mu.RLock()
572-
currentVersion := ms.currentVersion
573-
lastSaved := ms.lastSavedVersion
574-
ms.mu.RUnlock()
575-
576-
// Only save if there are changes
577-
if currentVersion > lastSaved {
578-
err := ms.Save()
579-
if err != nil {
580-
log.Printf("Periodic journal save failed: %v", err)
581-
} else {
582-
ms.mu.Lock()
583-
ms.lastSavedVersion = currentVersion
584-
ms.mu.Unlock()
585-
log.Printf("Periodic journal save completed, version %d", currentVersion)
586-
}
574+
for {
575+
sleepDuration := ms.periodicSaveInterval + time.Duration(rand.Intn(int(ms.periodicSaveInterval)))*time.Second
576+
time.Sleep(sleepDuration)
577+
578+
ok, version, err := ms.Save()
579+
if err != nil {
580+
log.Printf("Periodic journal save failed: %v", err)
581+
} else if ok {
582+
log.Printf("Periodic journal save completed, new version %d", version)
587583
}
588584
}
589585
}

internal/metajournal/journal_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func Benchmark_SaveJournal(b *testing.B) {
9898
b.ResetTimer()
9999
b.ReportAllocs()
100100
for i := 0; i < b.N; i++ {
101-
err := journal.Save()
101+
_, _, err := journal.Save()
102102
require.NoError(b, err)
103103
}
104104
}
@@ -139,7 +139,8 @@ func Benchmark_LoadFast(b *testing.B) {
139139
break
140140
}
141141
}
142-
require.NoError(b, j.Save())
142+
_, _, err = j.Save()
143+
require.NoError(b, err)
143144
_ = fp.Close()
144145
fp, err = os.OpenFile(testFile, os.O_CREATE|os.O_RDWR, 0666)
145146
require.NoError(b, err)

internal/pcache/mappings_cache.go

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ type MappingsCache struct {
7878
adds atomic.Int64
7979

8080
// periodic saving
81-
hasChanges atomic.Bool
81+
version uint64 // accessed under modifyMu
82+
lastSavedVersion uint64 // accessed under modifyMu
8283
periodicSaveInterval time.Duration
8384

8485
// custom FS
@@ -391,6 +392,7 @@ func (c *MappingsCache) AddValues(nowUnix uint32, pairs []MappingPair) {
391392
c.addItem(p.Str, p.Value, nowUnix)
392393
// ins++
393394
}
395+
c.version += 1
394396
// d4 := time.Since(a)
395397
// fmt.Printf("addPairsLocked len(pairs) len(items) dels ins: %d %d %d %d %v %v %v %v %v\n", len(pairs), len(items), dels, ins, d0, d1-d0, d2-d1, d3-d2, d4-d3)
396398
// 1024 2048 1023 1024 104.589µs 54.722µs 459.022µs 74.654µs 93.05µs
@@ -408,7 +410,6 @@ func (c *MappingsCache) addItem(k string, v int32, accessTS uint32) {
408410
c.addSumTSLocked(int64(accessTS))
409411
c.cache[k] = cacheValue{value: v, accessTS: accessTS}
410412
c.adds.Add(1)
411-
c.hasChanges.Store(true) // Mark that cache has changes
412413
}
413414

414415
func (c *MappingsCache) removeItem(k string, v int32, accessTS uint32) {
@@ -426,14 +427,13 @@ func (c *MappingsCache) removeItem(k string, v int32, accessTS uint32) {
426427
c.addSumTSLocked(-int64(accessTS))
427428
delete(c.cache, k)
428429
c.evicts.Add(1)
429-
c.hasChanges.Store(true) // Mark that cache has changes
430430
}
431431

432432
func elementSizeDisk(k string) int64 { // max, can be less if short string, requires no padding, etc.
433433
return 4 + int64(len(k)) + 3 + 4 + 4 // strlen, string, padding, value accessTS
434434
}
435435

436-
func (c *MappingsCache) Save() error {
436+
func (c *MappingsCache) Save() (bool, error) {
437437
// We exclude writers so that they do not block on Lock() while code below runs in RLock().
438438
// If we allow this, all new readers (GetValue) block on RLock(), effectively waiting for Save to finish.
439439
c.modifyMu.Lock()
@@ -442,6 +442,10 @@ func (c *MappingsCache) Save() error {
442442
c.mu.RLock()
443443
defer c.mu.RUnlock()
444444

445+
if c.version == c.lastSavedVersion {
446+
return false, nil
447+
}
448+
445449
saver := data_model.ChunkedStorageSaver{
446450
WriteAt: c.writeAt,
447451
Truncate: c.truncate,
@@ -471,17 +475,22 @@ func (c *MappingsCache) Save() error {
471475
})
472476
for _, p := range items {
473477
if err := appendItem(p.str, p.val.value, p.val.accessTS); err != nil {
474-
return err
478+
return false, err
475479
}
476480
}
477481
} else {
478482
for k, p := range c.cache {
479483
if err := appendItem(k, p.value, p.accessTS); err != nil {
480-
return err
484+
return false, err
481485
}
482486
}
483487
}
484-
return saver.FinishWrite(chunk)
488+
err := saver.FinishWrite(chunk)
489+
if err != nil {
490+
return false, err
491+
}
492+
c.lastSavedVersion = c.version
493+
return true, nil
485494
}
486495

487496
// callers are expected to ignore error from this method
@@ -532,19 +541,15 @@ func (c *MappingsCache) StartPeriodicSaving() {
532541
}
533542

534543
func (c *MappingsCache) goPeriodicSaving() {
535-
ticker := time.NewTicker(c.periodicSaveInterval)
536-
defer ticker.Stop()
537-
538-
for range ticker.C {
539-
// Only save if there are changes
540-
if c.hasChanges.Load() {
541-
err := c.Save()
542-
if err != nil {
543-
log.Printf("Periodic mappings cache save failed: %v", err)
544-
} else {
545-
c.hasChanges.Store(false) // Reset the flag after successful save
546-
log.Printf("Periodic mappings cache save completed")
547-
}
544+
for {
545+
sleepDuration := c.periodicSaveInterval + time.Duration(rand.Intn(int(c.periodicSaveInterval)))*time.Second
546+
time.Sleep(sleepDuration)
547+
548+
ok, err := c.Save()
549+
if err != nil {
550+
log.Printf("Periodic mappings cache save failed: %v", err)
551+
} else if ok {
552+
log.Printf("Periodic mappings cache save completed")
548553
}
549554
}
550555
}

internal/pcache/mappings_cache_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,13 @@ func TestAddValues(t *testing.T) {
7575
t.Fail()
7676
}
7777
cache.debugPrint(now, os.Stdout)
78-
if err := cache.Save(); err != nil {
78+
if _, err := cache.Save(); err != nil {
7979
t.Error(err)
8080
}
8181
fp2 := []byte(string(fp))
8282
cache = newTestMappingsCache(&fp2)
8383
cache.debugPrint(now, os.Stdout)
84-
if err := cache.Save(); err != nil {
84+
if _, err := cache.Save(); err != nil {
8585
t.Error(err)
8686
}
8787
if string(fp) != string(fp2) {

0 commit comments

Comments
 (0)