Skip to content

Commit 20dca25

Browse files
authored
HDDS-13648. Update NSSummary rebuilding implementation to queue based approach. (#9009)
1 parent 93fad7c commit 20dca25

File tree

9 files changed

+201
-158
lines changed

9 files changed

+201
-158
lines changed

hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@
4949
import java.util.Set;
5050
import java.util.TimeZone;
5151
import java.util.concurrent.BlockingQueue;
52-
import java.util.concurrent.ExecutorService;
53-
import java.util.concurrent.Executors;
5452
import java.util.stream.Collectors;
5553
import javax.ws.rs.core.Response;
5654
import org.apache.commons.io.FileUtils;
@@ -97,15 +95,6 @@ public class ReconUtils {
9795
private static Logger log = LoggerFactory.getLogger(
9896
ReconUtils.class);
9997

100-
// Use NSSummaryTask's unified rebuild control instead of separate tracking
101-
private static final ExecutorService NSSUMMARY_REBUILD_EXECUTOR =
102-
Executors.newSingleThreadExecutor(r -> {
103-
Thread t = new Thread(r);
104-
t.setName("RebuildNSSummaryThread");
105-
t.setDaemon(true); // Optional: allows JVM to exit without waiting
106-
return t;
107-
});
108-
10998
public ReconUtils() {
11099
}
111100

@@ -119,33 +108,6 @@ public static org.apache.hadoop.ozone.recon.tasks.NSSummaryTask.RebuildState get
119108
return org.apache.hadoop.ozone.recon.tasks.NSSummaryTask.getRebuildState();
120109
}
121110

122-
/**
123-
* Convenience method to trigger asynchronous NSSummary tree rebuild.
124-
* Uses the unified control mechanism in NSSummaryTask.
125-
*
126-
* @param reconNamespaceSummaryManager The namespace summary manager
127-
* @param omMetadataManager The OM metadata manager
128-
* @return true if rebuild was triggered successfully, false otherwise
129-
*/
130-
public static boolean triggerAsyncNSSummaryRebuild(
131-
ReconNamespaceSummaryManager reconNamespaceSummaryManager,
132-
ReconOMMetadataManager omMetadataManager) {
133-
134-
// Submit rebuild task to single thread executor for async execution
135-
NSSUMMARY_REBUILD_EXECUTOR.submit(() -> {
136-
try {
137-
138-
// This will go through NSSummaryTask's unified control mechanism
139-
reconNamespaceSummaryManager.rebuildNSSummaryTree(omMetadataManager);
140-
log.info("Async NSSummary tree rebuild completed successfully.");
141-
} catch (Exception e) {
142-
log.error("Async NSSummary tree rebuild failed.", e);
143-
}
144-
});
145-
146-
return true;
147-
}
148-
149111
public static File getReconScmDbDir(ConfigurationSource conf) {
150112
return new ReconUtils().getReconDbDir(conf, OZONE_RECON_SCM_DB_DIR);
151113
}
@@ -299,13 +261,6 @@ public static StringBuilder constructFullPathPrefix(long initialParentId, String
299261
"deletion, returning empty string for path construction.");
300262
throw new ServiceNotReadyException("Service is initializing. Please try again later.");
301263
}
302-
if (nsSummary.getParentId() == -1) {
303-
// Trigger async rebuild using unified control mechanism
304-
triggerAsyncNSSummaryRebuild(reconNamespaceSummaryManager, omMetadataManager);
305-
log.warn(
306-
"NSSummary tree corruption detected, rebuild triggered. Returning empty string for path construction.");
307-
throw new ServiceNotReadyException("Service is initializing. Please try again later.");
308-
}
309264
// On the last pass, dir-name will be empty and parent will be zero, indicating the loop should end.
310265
if (!nsSummary.getDirName().isEmpty()) {
311266
pathSegments.add(nsSummary.getDirName());

hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconNamespaceSummaryManager.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.hadoop.hdds.utils.db.BatchOperation;
2323
import org.apache.hadoop.hdds.utils.db.DBStore;
2424
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
25-
import org.apache.hadoop.ozone.om.OMMetadataManager;
2625
import org.apache.hadoop.ozone.recon.api.types.NSSummary;
2726
import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider;
2827

@@ -52,6 +51,4 @@ void batchStoreNSSummaries(BatchOperation batch, long objectId,
5251

5352
void commitBatchOperation(RDBBatchOperation rdbBatchOperation)
5453
throws IOException;
55-
56-
void rebuildNSSummaryTree(OMMetadataManager omMetadataManager);
5754
}

hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconNamespaceSummaryManagerImpl.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.hadoop.hdds.utils.db.DBStore;
2727
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
2828
import org.apache.hadoop.hdds.utils.db.Table;
29-
import org.apache.hadoop.ozone.om.OMMetadataManager;
3029
import org.apache.hadoop.ozone.recon.api.types.NSSummary;
3130
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
3231
import org.apache.hadoop.ozone.recon.tasks.NSSummaryTask;
@@ -105,13 +104,6 @@ public void commitBatchOperation(RDBBatchOperation rdbBatchOperation)
105104
this.namespaceDbStore.commitBatchOperation(rdbBatchOperation);
106105
}
107106

108-
@Override
109-
public void rebuildNSSummaryTree(OMMetadataManager omMetadataManager) {
110-
// This method is called by the unified ReconUtils.triggerNSSummaryRebuild
111-
// It should only handle the actual rebuild logic without state management
112-
nsSummaryTask.reprocess(omMetadataManager);
113-
}
114-
115107
public Table getNSSummaryTable() {
116108
return nsSummaryTable;
117109
}

hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,14 @@ public TaskResult reprocess(OMMetadataManager omMetadataManager) {
205205
LOG.info("NSSummary tree rebuild is already in progress, skipping duplicate request.");
206206
return buildTaskResult(false);
207207
}
208-
208+
209209
if (!REBUILD_STATE.compareAndSet(currentState, RebuildState.RUNNING)) {
210-
LOG.info("Failed to acquire rebuild lock, another thread may have started rebuild.");
210+
// Check if another thread successfully started the rebuild
211+
if (REBUILD_STATE.get() == RebuildState.RUNNING) {
212+
LOG.info("Rebuild already in progress by another thread, returning success");
213+
return buildTaskResult(true);
214+
}
215+
LOG.info("Failed to acquire rebuild lock, unknown state");
211216
return buildTaskResult(false);
212217
}
213218

@@ -250,7 +255,7 @@ protected TaskResult executeReprocess(OMMetadataManager omMetadataManager, long
250255
ThreadFactory threadFactory = new ThreadFactoryBuilder()
251256
.setNameFormat("Recon-NSSummaryTask-%d")
252257
.build();
253-
ExecutorService executorService = Executors.newFixedThreadPool(2,
258+
ExecutorService executorService = Executors.newFixedThreadPool(3,
254259
threadFactory);
255260
boolean success = false;
256261
try {
@@ -263,14 +268,31 @@ protected TaskResult executeReprocess(OMMetadataManager omMetadataManager, long
263268
}
264269
}
265270
success = true;
266-
267-
} catch (InterruptedException | ExecutionException ex) {
268-
LOG.error("Error while reprocessing NSSummary table in Recon DB.", ex);
271+
272+
} catch (InterruptedException ex) {
273+
Thread.currentThread().interrupt();
274+
LOG.error("NSSummaryTask was interrupted.", ex);
275+
REBUILD_STATE.set(RebuildState.FAILED);
276+
return buildTaskResult(false);
277+
} catch (ExecutionException ex) {
278+
LOG.error("Error while reprocessing NSSummary table in Recon DB.", ex.getCause());
269279
REBUILD_STATE.set(RebuildState.FAILED);
270280
return buildTaskResult(false);
271-
272281
} finally {
273282
executorService.shutdown();
283+
// Deterministic resource cleanup with timeout
284+
try {
285+
// get() ensures the work is done. awaitTermination ensures the workers are also verifiably gone.
286+
// It turns an asynchronous shutdown into a synchronous, deterministic one
287+
if (!executorService.awaitTermination(5, TimeUnit.MINUTES)) {
288+
LOG.warn("Executor service for NSSummaryTask did not terminate in the specified time.");
289+
executorService.shutdownNow();
290+
}
291+
} catch (InterruptedException ex) {
292+
LOG.error("NSSummaryTask executor service termination was interrupted.", ex);
293+
executorService.shutdownNow();
294+
Thread.currentThread().interrupt();
295+
}
274296

275297
long endTime = System.nanoTime();
276298
// Convert to milliseconds

hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ protected boolean flushAndCommitUpdatedNSToDB(Map<Long, NSSummary> nsSummaryMap,
261261
try {
262262
updateNSSummariesToDB(nsSummaryMap, objectIdsToBeDeleted);
263263
} catch (IOException e) {
264-
LOG.error("Unable to write Namespace Summary data in Recon DB.", e);
264+
LOG.error("Unable to write Namespace Summary data in Recon DB. batchSize={}", nsSummaryMap.size(), e);
265265
return false;
266266
} finally {
267267
nsSummaryMap.clear();

hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/upgrade/NSSummaryAggregatedTotalsUpgrade.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@
2222
import com.google.inject.Injector;
2323
import javax.sql.DataSource;
2424
import org.apache.hadoop.ozone.recon.ReconGuiceServletContextListener;
25-
import org.apache.hadoop.ozone.recon.ReconUtils;
26-
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
27-
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
25+
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
26+
import org.apache.hadoop.ozone.recon.tasks.ReconTaskReInitializationEvent;
2827
import org.slf4j.Logger;
2928
import org.slf4j.LoggerFactory;
3029

@@ -50,12 +49,16 @@ public void execute(DataSource source) throws Exception {
5049
"Guice injector not initialized. NSSummary rebuild cannot proceed during upgrade.");
5150
}
5251

53-
ReconNamespaceSummaryManager nsMgr = injector.getInstance(ReconNamespaceSummaryManager.class);
54-
ReconOMMetadataManager omMgr = injector.getInstance(ReconOMMetadataManager.class);
55-
56-
// Fire and forget: unified control using ReconUtils -> NSSummaryTask
52+
ReconTaskController reconTaskController = injector.getInstance(ReconTaskController.class);
5753
LOG.info("Triggering asynchronous NSSummary tree rebuild for materialized totals (upgrade action).");
58-
ReconUtils.triggerAsyncNSSummaryRebuild(nsMgr, omMgr);
54+
ReconTaskController.ReInitializationResult result = reconTaskController.queueReInitializationEvent(
55+
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
56+
if (result != ReconTaskController.ReInitializationResult.SUCCESS) {
57+
LOG.error(
58+
"Failed to queue reinitialization event for manual trigger (result: {}), failing the reinitialization " +
59+
"during NSSummaryAggregatedTotalsUpgrade action, will be retried as part of syncDataFromOM " +
60+
"scheduler task.", result);
61+
}
5962
}
6063

6164
@Override

hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@
2929
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
3030
import static org.junit.jupiter.api.Assertions.assertEquals;
3131
import static org.junit.jupiter.api.Assertions.assertThrows;
32-
import static org.mockito.Mockito.anyLong;
3332
import static org.mockito.Mockito.mock;
34-
import static org.mockito.Mockito.verify;
3533
import static org.mockito.Mockito.when;
3634

3735
import java.io.File;
@@ -88,8 +86,6 @@
8886
import org.junit.jupiter.api.BeforeEach;
8987
import org.junit.jupiter.api.Test;
9088
import org.junit.jupiter.api.io.TempDir;
91-
import org.mockito.ArgumentCaptor;
92-
import org.slf4j.Logger;
9389

9490
/**
9591
* Test for NSSummary REST APIs with FSO.
@@ -813,40 +809,6 @@ public void testConstructFullPathWithNegativeParentIdTriggersRebuild() throws IO
813809
ReconUtils.constructFullPath(keyInfo, mockSummaryManager, mockMetadataManager));
814810
}
815811

816-
@Test
817-
public void testLoggingWhenParentIdIsNegative() throws IOException {
818-
ReconNamespaceSummaryManager mockManager =
819-
mock(ReconNamespaceSummaryManager.class);
820-
Logger mockLogger = mock(Logger.class);
821-
ReconUtils.setLogger(mockLogger);
822-
823-
NSSummary mockSummary = new NSSummary();
824-
mockSummary.setParentId(-1);
825-
when(mockManager.getNSSummary(anyLong())).thenReturn(mockSummary);
826-
827-
OmKeyInfo keyInfo = new OmKeyInfo.Builder()
828-
.setKeyName("testKey")
829-
.setVolumeName("vol")
830-
.setBucketName("bucket")
831-
.setObjectID(1L)
832-
.setParentObjectID(1L)
833-
.build();
834-
835-
assertThrows(ServiceNotReadyException.class, () ->
836-
ReconUtils.constructFullPath(keyInfo, mockManager, null));
837-
838-
// Assert
839-
ArgumentCaptor<String> logCaptor = ArgumentCaptor.forClass(String.class);
840-
verify(mockLogger).warn(logCaptor.capture());
841-
String loggedMessage = logCaptor.getValue();
842-
843-
// Here we can assert the exact message we expect to see in the logs.
844-
// Since we set parentId = -1, this triggers the corruption detection path
845-
assertEquals(
846-
"NSSummary tree corruption detected, rebuild triggered. Returning empty string " +
847-
"for path construction.", loggedMessage);
848-
}
849-
850812
/**
851813
* Write directories and keys info into OM DB.
852814
* @throws Exception

hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryUnifiedControl.java

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -434,54 +434,6 @@ void testReconUtilsIntegration() throws Exception {
434434
"State should return to IDLE after completion");
435435
}
436436

437-
/**
438-
* Test that ReconUtils async trigger respects unified control.
439-
*/
440-
@Test
441-
void testReconUtilsRespectsUnifiedControl() throws Exception {
442-
CountDownLatch firstRebuildStarted = new CountDownLatch(1);
443-
CountDownLatch firstRebuildCanFinish = new CountDownLatch(1);
444-
445-
// Setup first rebuild to block
446-
doAnswer(invocation -> {
447-
firstRebuildStarted.countDown();
448-
boolean awaitSuccess = firstRebuildCanFinish.await(5, TimeUnit.SECONDS);
449-
if (!awaitSuccess) {
450-
LOG.warn("firstRebuildCanFinish.await() timed out");
451-
}
452-
return null;
453-
}).when(mockNamespaceSummaryManager).clearNSSummaryTable();
454-
455-
// Start first rebuild via NSSummaryTask directly
456-
CompletableFuture<TaskResult> directRebuild = CompletableFuture.supplyAsync(() ->
457-
nsSummaryTask.reprocess(mockOMMetadataManager));
458-
459-
// Wait for first rebuild to start
460-
assertTrue(firstRebuildStarted.await(5, TimeUnit.SECONDS),
461-
"First rebuild should start");
462-
assertEquals(RebuildState.RUNNING, NSSummaryTask.getRebuildState(),
463-
"State should be RUNNING");
464-
465-
// Try to trigger via ReconUtils - should still respect the running state
466-
// (The async execution will be queued but the actual rebuild will be rejected)
467-
boolean triggered = ReconUtils.triggerAsyncNSSummaryRebuild(
468-
mockNamespaceSummaryManager, mockReconOMMetadataManager);
469-
assertTrue(triggered, "ReconUtils should queue the async request");
470-
471-
// State should still be RUNNING from the first rebuild
472-
assertEquals(RebuildState.RUNNING, NSSummaryTask.getRebuildState(),
473-
"State should still be RUNNING from first rebuild");
474-
475-
// Complete first rebuild
476-
firstRebuildCanFinish.countDown();
477-
TaskResult result = directRebuild.get(5, TimeUnit.SECONDS);
478-
assertTrue(result.isTaskSuccess(), "First rebuild should succeed");
479-
480-
// Final state should be IDLE
481-
assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
482-
"Final state should be IDLE");
483-
}
484-
485437
/**
486438
* Test state transitions during exception scenarios.
487439
*/

0 commit comments

Comments
 (0)