Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 36 additions & 22 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3050,7 +3050,7 @@ func TestEnginePlanCompactions(t *testing.T) {
},
{
name: "Small group size with single generation",
/* These files are supposed to have 0 block counts */
// These files are supposed to have 0 block counts
files: []tsm1.ExtFileStat{
{
FileStat: tsm1.FileStat{
Expand Down Expand Up @@ -3103,7 +3103,7 @@ func TestEnginePlanCompactions(t *testing.T) {
},
{
name: "Small group size with single generation and levels under 4",
/* These files are supposed to have block counts of 0 */
// These files are supposed to have block counts of 0
files: tsm1.FileStats{
{
Path: "01-02.tsm1",
Expand Down Expand Up @@ -3328,7 +3328,7 @@ func TestEnginePlanCompactions(t *testing.T) {
// TestDefaultPlanner_FullyCompacted_SmallSingleGeneration
// will need to ensure that once we have single TSM file under 2 GB we stop
name: "Single TSM file",
/* These files are supposed to have 0 blocks */
// These files are supposed to have 0 blocks
files: tsm1.FileStats{
{
Path: "01-09.tsm1",
Expand Down Expand Up @@ -4654,25 +4654,39 @@ func TestEnginePlanCompactions(t *testing.T) {

// Arbitrary group length to use in Scheduler.SetDepth
mockGroupLen := 5
// Set the scheduler depth for our lower level groups.
// During PT_Standard this should still plan a level5 compaction group
// but during PT_SmartOptimize this should not.
e.Scheduler.SetDepth(1, mockGroupLen)
e.Scheduler.SetDepth(2, mockGroupLen)

// Normally this is called within PlanCompactions but because we want to simulate already running
// some compactions we will set them manually here.
atomic.StoreInt64(&e.Stats.TSMCompactionsActive[0], int64(mockGroupLen))
atomic.StoreInt64(&e.Stats.TSMCompactionsActive[1], int64(mockGroupLen))

// Should use PlanType 0 (PT_Standard), 1(PT_SmartOptimize), 2(PT_NoOptimize)
planType := tsm1.PlanType(i)
level1Groups, level2Groups, Level3Groups, Level4Groups, Level5Groups := e.PlanCompactions(planType)
compareLevelGroups(t, test.getResultByPlanType(planType).level1Groups, level1Groups, "unexpected level 1 Group")
compareLevelGroups(t, test.getResultByPlanType(planType).level2Groups, level2Groups, "unexpected level 2 Group")
compareLevelGroups(t, test.getResultByPlanType(planType).level3Groups, Level3Groups, "unexpected level 3 Group")
compareLevelGroups(t, test.getResultByPlanType(planType).level4Groups, Level4Groups, "unexpected level 4 Group")
compareLevelGroups(t, test.getResultByPlanType(planType).level5Groups, Level5Groups, "unexpected level 5 Group")

// Run the compaction planner twice and verify we get the same results each time. This checks for issues
// with releasing TSM files so they are eligible to be planned again. If the first run succeeds but the
// second run fails, its an issue with releasing TSM files back to the compactor.
for run := range 2 {
runDisplay := run + 1

e.Stats = &tsm1.EngineStatistics{}

// Set the scheduler depth for our lower level groups.
// During PT_Standard this should still plan a level5 compaction group
// but during PT_SmartOptimize this should not.
e.Scheduler.SetDepth(1, mockGroupLen)
e.Scheduler.SetDepth(2, mockGroupLen)

// Normally this is called within PlanCompactions but because we want to simulate already running
// some compactions we will set them manually here.
atomic.StoreInt64(&e.Stats.TSMCompactionsActive[0], int64(mockGroupLen))
atomic.StoreInt64(&e.Stats.TSMCompactionsActive[1], int64(mockGroupLen))

// Should use PlanType 0 (PT_Standard), 1(PT_SmartOptimize), 2(PT_NoOptimize)
planType := tsm1.PlanType(i)
// Calling PlanCompactions is normally done by e.compact.
level1Groups, level2Groups, Level3Groups, Level4Groups, Level5Groups := e.PlanCompactions(planType)
compareLevelGroups(t, test.getResultByPlanType(planType).level1Groups, level1Groups, fmt.Sprintf("unexpected level 1 Group on run %d", runDisplay))
compareLevelGroups(t, test.getResultByPlanType(planType).level2Groups, level2Groups, fmt.Sprintf("unexpected level 2 Group on run %d", runDisplay))
compareLevelGroups(t, test.getResultByPlanType(planType).level3Groups, Level3Groups, fmt.Sprintf("unexpected level 3 Group on run %d", runDisplay))
compareLevelGroups(t, test.getResultByPlanType(planType).level4Groups, Level4Groups, fmt.Sprintf("unexpected level 4 Group on run %d", runDisplay))
compareLevelGroups(t, test.getResultByPlanType(planType).level5Groups, Level5Groups, fmt.Sprintf("unexpected level 5 Group on run %d", runDisplay))

// Calling ReleaseCompactionPlans is normally done by e.compact.
e.ReleaseCompactionPlans(level1Groups, level2Groups, Level3Groups, Level4Groups, Level5Groups)
}
})
}
}
Expand Down
22 changes: 16 additions & 6 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2302,7 +2302,7 @@ func (e *Engine) compact(wg *sync.WaitGroup) {
zap.Int("level5Groups", len(level5Groups)),
)
}
e.releaseCompactionPlans(level1Groups, level2Groups, level3Groups, level4Groups, level5Groups)
e.ReleaseCompactionPlans(level1Groups, level2Groups, level3Groups, level4Groups, level5Groups)
if len(level1Groups)+len(level2Groups)+len(level3Groups)+len(level4Groups)+len(level5Groups) > 0 {
e.traceLogger.Debug("Finished releasing compaction plans",
zap.Int("level1Groups", len(level1Groups)),
Expand All @@ -2317,7 +2317,7 @@ func (e *Engine) compact(wg *sync.WaitGroup) {
}
}

func (e *Engine) releaseCompactionPlans(
func (e *Engine) ReleaseCompactionPlans(
level1Groups []PlannedCompactionGroup,
level2Groups []PlannedCompactionGroup,
level3Groups []PlannedCompactionGroup,
Expand Down Expand Up @@ -2408,12 +2408,17 @@ func (e *Engine) planCompactionsInner(planType PlanType) ([]PlannedCompactionGro
// We don't stop if level 4 is runnable because we need to continue on and check for group 4 to group 5 promotions if
// group 4 is the runnable group.
if runnable && level <= 3 {
// We don't expect run any level 4 groups, but to avoid a potential race condition that could downgrade the points-per-block
// on a level 4 group, we don't return the level 4 groups here. If we don't return them, we must release them here so
// they are elligible for future compaction plannings.
e.traceLogger.Debug("releasing any level 4 groups due to PT_SmartOptimize and level <= 3", zap.Int("level", level), zap.Int("level4Groups", len(l4Groups)))
e.CompactionPlan.Release(l4Groups)

if len(level1Groups)+len(level2Groups)+len(level3Groups)+len(l4Groups) > 0 {
e.traceLogger.Debug("Compaction planning is PT_SmartOptimize with level 1, 2, and 3 compactions", zap.Int("id", int(e.id)),
zap.Int("level1Groups", len(level1Groups)),
zap.Int("level2Groups", len(level2Groups)),
zap.Int("level3Groups", len(level3Groups)),
zap.Int("level4Groups", len(l4Groups)),
)
}
// We know that the compaction loop will pull a compaction group from levels 1-4, so no need to plan level 5.
Expand Down Expand Up @@ -2456,14 +2461,19 @@ func (e *Engine) planCompactionsInner(planType PlanType) ([]PlannedCompactionGro

if planType == PT_NoOptimize {
// For PT_NoOptimize, throw away any promoted level 5 groups and return what we have for level 1 through 4.
// Our behavior changes depending what the plan type is.
if len(level1Groups)+len(level2Groups)+len(level3Groups)+len(l4Groups)+len(level5Groups) > 0 {
// Since we can't return the level 5 groups to avoid them getting scheduled, we must release all the groups
// in level 5 first. If we did not do this, then the groups in level5Groups would no longer be elligible for
// any compactions.
e.traceLogger.Debug("releasing any level 5 groups due to PT_NoOptimize", zap.Int("level5Groups", len(level5Groups)))
for _, compactGroup := range level5Groups {
e.CompactionPlan.Release([]CompactionGroup{compactGroup.Group})
}
if len(level1Groups)+len(level2Groups)+len(level3Groups)+len(l4Groups) > 0 {
e.traceLogger.Debug("Compaction planning is PT_NoOptimize", zap.Int("id", int(e.id)),
zap.Int("level1Groups", len(level1Groups)),
zap.Int("level2Groups", len(level2Groups)),
zap.Int("level3Groups", len(level3Groups)),
zap.Int("level4Groups", len(l4Groups)),
zap.Int("level5Groups", len(level5Groups)),
)
}
return level1Groups, level2Groups, level3Groups, level4Groups, nil
Expand Down