diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 3767f892e73..fde5abeab4a 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -3050,7 +3050,7 @@ func TestEnginePlanCompactions(t *testing.T) { }, { name: "Small group size with single generation", - /* These files are supposed to have 0 block counts */ + // These files are supposed to have 0 block counts files: []tsm1.ExtFileStat{ { FileStat: tsm1.FileStat{ @@ -3103,7 +3103,7 @@ func TestEnginePlanCompactions(t *testing.T) { }, { name: "Small group size with single generation and levels under 4", - /* These files are supposed to have block counts of 0 */ + // These files are supposed to have block counts of 0 files: tsm1.FileStats{ { Path: "01-02.tsm1", @@ -3328,7 +3328,7 @@ func TestEnginePlanCompactions(t *testing.T) { // TestDefaultPlanner_FullyCompacted_SmallSingleGeneration // will need to ensure that once we have single TSM file under 2 GB we stop name: "Single TSM file", - /* These files are supposed to have 0 blocks */ + // These files are supposed to have 0 blocks files: tsm1.FileStats{ { Path: "01-09.tsm1", @@ -4654,25 +4654,39 @@ func TestEnginePlanCompactions(t *testing.T) { // Arbitrary group length to use in Scheduler.SetDepth mockGroupLen := 5 - // Set the scheduler depth for our lower level groups. - // During PT_Standard this should still plan a level5 compaction group - // but during PT_SmartOptimize this should not. - e.Scheduler.SetDepth(1, mockGroupLen) - e.Scheduler.SetDepth(2, mockGroupLen) - - // Normally this is called within PlanCompactions but because we want to simulate already running - // some compactions we will set them manually here. - atomic.StoreInt64(&e.Stats.TSMCompactionsActive[0], int64(mockGroupLen)) - atomic.StoreInt64(&e.Stats.TSMCompactionsActive[1], int64(mockGroupLen)) - - // Should use PlanType 0 (PT_Standard), 1(PT_SmartOptimize), 2(PT_NoOptimize) - planType := tsm1.PlanType(i) - level1Groups, level2Groups, Level3Groups, Level4Groups, Level5Groups := e.PlanCompactions(planType) - compareLevelGroups(t, test.getResultByPlanType(planType).level1Groups, level1Groups, "unexpected level 1 Group") - compareLevelGroups(t, test.getResultByPlanType(planType).level2Groups, level2Groups, "unexpected level 2 Group") - compareLevelGroups(t, test.getResultByPlanType(planType).level3Groups, Level3Groups, "unexpected level 3 Group") - compareLevelGroups(t, test.getResultByPlanType(planType).level4Groups, Level4Groups, "unexpected level 4 Group") - compareLevelGroups(t, test.getResultByPlanType(planType).level5Groups, Level5Groups, "unexpected level 5 Group") + + // Run the compaction planner twice and verify we get the same results each time. This checks for issues + // with releasing TSM files so they are eligible to be planned again. If the first run succeeds but the + // second run fails, its an issue with releasing TSM files back to the compactor. + for run := range 2 { + runDisplay := run + 1 + + e.Stats = &tsm1.EngineStatistics{} + + // Set the scheduler depth for our lower level groups. + // During PT_Standard this should still plan a level5 compaction group + // but during PT_SmartOptimize this should not. + e.Scheduler.SetDepth(1, mockGroupLen) + e.Scheduler.SetDepth(2, mockGroupLen) + + // Normally this is called within PlanCompactions but because we want to simulate already running + // some compactions we will set them manually here. + atomic.StoreInt64(&e.Stats.TSMCompactionsActive[0], int64(mockGroupLen)) + atomic.StoreInt64(&e.Stats.TSMCompactionsActive[1], int64(mockGroupLen)) + + // Should use PlanType 0 (PT_Standard), 1(PT_SmartOptimize), 2(PT_NoOptimize) + planType := tsm1.PlanType(i) + // Calling PlanCompactions is normally done by e.compact. + level1Groups, level2Groups, Level3Groups, Level4Groups, Level5Groups := e.PlanCompactions(planType) + compareLevelGroups(t, test.getResultByPlanType(planType).level1Groups, level1Groups, fmt.Sprintf("unexpected level 1 Group on run %d", runDisplay)) + compareLevelGroups(t, test.getResultByPlanType(planType).level2Groups, level2Groups, fmt.Sprintf("unexpected level 2 Group on run %d", runDisplay)) + compareLevelGroups(t, test.getResultByPlanType(planType).level3Groups, Level3Groups, fmt.Sprintf("unexpected level 3 Group on run %d", runDisplay)) + compareLevelGroups(t, test.getResultByPlanType(planType).level4Groups, Level4Groups, fmt.Sprintf("unexpected level 4 Group on run %d", runDisplay)) + compareLevelGroups(t, test.getResultByPlanType(planType).level5Groups, Level5Groups, fmt.Sprintf("unexpected level 5 Group on run %d", runDisplay)) + + // Calling ReleaseCompactionPlans is normally done by e.compact. + e.ReleaseCompactionPlans(level1Groups, level2Groups, Level3Groups, Level4Groups, Level5Groups) + } }) } } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 40bf0dbc720..b3f0df05ac1 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -2302,7 +2302,7 @@ func (e *Engine) compact(wg *sync.WaitGroup) { zap.Int("level5Groups", len(level5Groups)), ) } - e.releaseCompactionPlans(level1Groups, level2Groups, level3Groups, level4Groups, level5Groups) + e.ReleaseCompactionPlans(level1Groups, level2Groups, level3Groups, level4Groups, level5Groups) if len(level1Groups)+len(level2Groups)+len(level3Groups)+len(level4Groups)+len(level5Groups) > 0 { e.traceLogger.Debug("Finished releasing compaction plans", zap.Int("level1Groups", len(level1Groups)), @@ -2317,7 +2317,7 @@ func (e *Engine) compact(wg *sync.WaitGroup) { } } -func (e *Engine) releaseCompactionPlans( +func (e *Engine) ReleaseCompactionPlans( level1Groups []PlannedCompactionGroup, level2Groups []PlannedCompactionGroup, level3Groups []PlannedCompactionGroup, @@ -2408,12 +2408,17 @@ func (e *Engine) planCompactionsInner(planType PlanType) ([]PlannedCompactionGro // We don't stop if level 4 is runnable because we need to continue on and check for group 4 to group 5 promotions if // group 4 is the runnable group. if runnable && level <= 3 { + // We don't expect run any level 4 groups, but to avoid a potential race condition that could downgrade the points-per-block + // on a level 4 group, we don't return the level 4 groups here. If we don't return them, we must release them here so + // they are elligible for future compaction plannings. + e.traceLogger.Debug("releasing any level 4 groups due to PT_SmartOptimize and level <= 3", zap.Int("level", level), zap.Int("level4Groups", len(l4Groups))) + e.CompactionPlan.Release(l4Groups) + if len(level1Groups)+len(level2Groups)+len(level3Groups)+len(l4Groups) > 0 { e.traceLogger.Debug("Compaction planning is PT_SmartOptimize with level 1, 2, and 3 compactions", zap.Int("id", int(e.id)), zap.Int("level1Groups", len(level1Groups)), zap.Int("level2Groups", len(level2Groups)), zap.Int("level3Groups", len(level3Groups)), - zap.Int("level4Groups", len(l4Groups)), ) } // We know that the compaction loop will pull a compaction group from levels 1-4, so no need to plan level 5. @@ -2456,14 +2461,19 @@ func (e *Engine) planCompactionsInner(planType PlanType) ([]PlannedCompactionGro if planType == PT_NoOptimize { // For PT_NoOptimize, throw away any promoted level 5 groups and return what we have for level 1 through 4. - // Our behavior changes depending what the plan type is. - if len(level1Groups)+len(level2Groups)+len(level3Groups)+len(l4Groups)+len(level5Groups) > 0 { + // Since we can't return the level 5 groups to avoid them getting scheduled, we must release all the groups + // in level 5 first. If we did not do this, then the groups in level5Groups would no longer be elligible for + // any compactions. + e.traceLogger.Debug("releasing any level 5 groups due to PT_NoOptimize", zap.Int("level5Groups", len(level5Groups))) + for _, compactGroup := range level5Groups { + e.CompactionPlan.Release([]CompactionGroup{compactGroup.Group}) + } + if len(level1Groups)+len(level2Groups)+len(level3Groups)+len(l4Groups) > 0 { e.traceLogger.Debug("Compaction planning is PT_NoOptimize", zap.Int("id", int(e.id)), zap.Int("level1Groups", len(level1Groups)), zap.Int("level2Groups", len(level2Groups)), zap.Int("level3Groups", len(level3Groups)), zap.Int("level4Groups", len(l4Groups)), - zap.Int("level5Groups", len(level5Groups)), ) } return level1Groups, level2Groups, level3Groups, level4Groups, nil