Skip to content

Commit 1b15360

Browse files
committed
Working thread pool strategy
1 parent db4a1ca commit 1b15360

File tree

6 files changed

+131
-169
lines changed

6 files changed

+131
-169
lines changed

src/main/java/net/himeki/mcmtfabric/ParallelProcessor.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import net.himeki.mcmtfabric.debug.WorldTickStats;
77
import net.himeki.mcmtfabric.parallelised.BotRegionManager;
88
import net.himeki.mcmtfabric.parallelised.threads.GlobalAffinityThreadPool;
9-
import net.himeki.mcmtfabric.parallelised.threads.SharedThreadPools;
109
import net.himeki.mcmtfabric.parallelised.threads.ThreadedChunksRegion;
1110
import net.himeki.mcmtfabric.serdes.pools.PostExecutePool;
1211
import net.minecraft.block.entity.*;
@@ -134,11 +133,11 @@ private static ThreadedChunksRegion findMatchingRegion(int chunkX, int chunkZ, W
134133
}
135134

136135
public static void setupThreadPool(int parallelism) {
137-
SharedThreadPools.getSharedTickPool();
136+
GlobalAffinityThreadPool.getAffinitySharedPool();
138137

139-
worldPool = GlobalAffinityThreadPool.getAffinityThreadPool();
138+
worldPool = GlobalAffinityThreadPool.getAffinityWorldAndRegionPool();
140139
for (int i = 0; i < 2; i++)
141-
GlobalAffinityThreadPool.increasePoolSize();
140+
GlobalAffinityThreadPool.increaseWorldAndRegionPoolSize();
142141
}
143142

144143
// Statistics
@@ -184,7 +183,7 @@ public static void preTick(int size, MinecraftServer server) {
184183
synchronized (threadedChunksRegions) {
185184
for (ThreadedChunksRegion region : pendingRegionsToAdd) {
186185
threadedChunksRegions.add(region);
187-
GlobalAffinityThreadPool.increasePoolSize();
186+
GlobalAffinityThreadPool.increaseWorldAndRegionPoolSize();
188187
}
189188
chunkRegionCache.invalidateAll();
190189
pendingRegionsToAdd.clear();
@@ -197,7 +196,7 @@ public static void preTick(int size, MinecraftServer server) {
197196
for (ThreadedChunksRegion region : pendingRegionsToRemove) {
198197
threadedChunksRegions.remove(region);
199198
region.shutdownExecutors();
200-
GlobalAffinityThreadPool.decreasePoolSize();
199+
GlobalAffinityThreadPool.decreaseWorldAndRegionPoolSize();
201200
}
202201
chunkRegionCache.invalidateAll();
203202
pendingRegionsToRemove.clear();
@@ -340,7 +339,7 @@ public static void postChunkTick(ServerWorld world) {
340339
for (ThreadedChunksRegion region : threadedChunksRegions) {
341340
// Region's post-chunk-tick handler
342341
if (region.getWorldId().equals(world.getRegistryKey().getValue().toString())) {
343-
SharedThreadPools.getSharedTickPool().execute(region::postChunkTick);
342+
GlobalAffinityThreadPool.getAffinitySharedPool().execute(region::postChunkTick);
344343
}
345344
}
346345
}
@@ -449,7 +448,7 @@ public static void postEntityTick(ServerWorld world) {
449448
synchronized (threadedChunksRegions) {
450449
for (ThreadedChunksRegion region : threadedChunksRegions) {
451450
if (region.getWorldId().equals(world.getRegistryKey().getValue().toString())) {
452-
SharedThreadPools.getSharedTickPool().execute(region::postEntityTick);
451+
GlobalAffinityThreadPool.getAffinitySharedPool().execute(region::postEntityTick);
453452

454453
}
455454
}
@@ -559,7 +558,7 @@ public static void postBlockEntityTick(ServerWorld world) {
559558
synchronized (threadedChunksRegions) {
560559
for (ThreadedChunksRegion region : threadedChunksRegions) {
561560
if (region.getWorldId().equals(world.getRegistryKey().getValue().toString())) {
562-
SharedThreadPools.getSharedTickPool().execute(region::postBlockEntityTick);
561+
GlobalAffinityThreadPool.getAffinitySharedPool().execute(region::postBlockEntityTick);
563562
}
564563
}
565564
}

src/main/java/net/himeki/mcmtfabric/parallelised/threads/AffinityThreadFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ public void run() {
3838
affinityLock.release();
3939
}
4040
// Core release is managed globally on shutdown
41+
CPUCoreManager.releaseCore(assignedCpuCore, prefix);
42+
assignedCpuCores.remove(Integer.valueOf(assignedCpuCore));
4143
}
4244
}
4345
};

src/main/java/net/himeki/mcmtfabric/parallelised/threads/GlobalAffinityThreadPool.java

Lines changed: 117 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,18 @@
66
import java.util.concurrent.TimeUnit;
77

88
public class GlobalAffinityThreadPool {
9-
private static ThreadPoolExecutor affinityThreadPool;
9+
private static ThreadPoolExecutor affinityWorldAndRegionPool;
10+
private static ThreadPoolExecutor affinitySharedPool;
11+
private static final Object poolSizeLock = new Object();
1012

11-
public static synchronized ExecutorService getAffinityThreadPool() {
12-
if (affinityThreadPool == null || affinityThreadPool.isShutdown()) {
13+
14+
public static synchronized ExecutorService getAffinityWorldAndRegionPool() {
15+
if (affinityWorldAndRegionPool == null || affinityWorldAndRegionPool.isShutdown()) {
1316
int totalCores = Runtime.getRuntime().availableProcessors();
1417

15-
AffinityThreadFactory threadFactory = new AffinityThreadFactory("MCMT-AffinityThread");
18+
AffinityThreadFactory threadFactory = new AffinityThreadFactory("MCMT-AffWorldRegionThread");
1619

17-
affinityThreadPool = new ThreadPoolExecutor(
20+
affinityWorldAndRegionPool = new ThreadPoolExecutor(
1821
1,
1922
1,
2023
0L, TimeUnit.MILLISECONDS,
@@ -23,36 +26,130 @@ public static synchronized ExecutorService getAffinityThreadPool() {
2326
new ThreadPoolExecutor.AbortPolicy()
2427
);
2528
}
26-
return affinityThreadPool;
29+
return affinityWorldAndRegionPool;
30+
}
31+
32+
public static synchronized ExecutorService getAffinitySharedPool() {
33+
if (affinitySharedPool == null || affinitySharedPool.isShutdown()) {
34+
int totalCores = Runtime.getRuntime().availableProcessors();
35+
36+
AffinityThreadFactory threadFactory = new AffinityThreadFactory("MCMT-AffSharedThread");
37+
38+
affinitySharedPool = new ThreadPoolExecutor(
39+
totalCores - 2,
40+
totalCores - 2,
41+
0L, TimeUnit.MILLISECONDS,
42+
new LinkedBlockingQueue<>(),
43+
threadFactory,
44+
new ThreadPoolExecutor.AbortPolicy()
45+
);
46+
}
47+
return affinitySharedPool;
48+
}
49+
50+
public static void decreaseWorldAndRegionPoolSize() {
51+
synchronized (poolSizeLock) {
52+
if (affinityWorldAndRegionPool != null) {
53+
int targetSize = affinityWorldAndRegionPool.getCorePoolSize() - 1;
54+
55+
// First decrease the pool sizes
56+
affinityWorldAndRegionPool.setCorePoolSize(targetSize);
57+
affinityWorldAndRegionPool.setMaximumPoolSize(targetSize);
58+
59+
// Wait for active thread count to reach the new target
60+
while (affinityWorldAndRegionPool.getActiveCount() > targetSize) {
61+
try {
62+
Thread.sleep(2); // Short sleep to prevent busy waiting
63+
} catch (InterruptedException e) {
64+
Thread.currentThread().interrupt();
65+
return;
66+
}
67+
}
68+
69+
// Now it's safe to increase the shared pool
70+
if (affinitySharedPool != null) {
71+
increaseSharedPoolSize();
72+
}
73+
}
74+
}
75+
}
76+
77+
public static void increaseWorldAndRegionPoolSize() {
78+
synchronized (poolSizeLock) {
79+
if (affinityWorldAndRegionPool != null) {
80+
if (affinitySharedPool != null) {
81+
// First decrease shared pool
82+
decreaseSharedPoolSize();
83+
84+
// Wait for shared pool to properly decrease
85+
int targetSharedSize = affinitySharedPool.getCorePoolSize();
86+
while (affinitySharedPool.getActiveCount() > targetSharedSize) {
87+
try {
88+
Thread.sleep(2);
89+
} catch (InterruptedException e) {
90+
Thread.currentThread().interrupt();
91+
return;
92+
}
93+
}
94+
}
95+
96+
// Now safely increase WorldAndRegion pool
97+
int newSize = affinityWorldAndRegionPool.getCorePoolSize() + 1;
98+
affinityWorldAndRegionPool.setMaximumPoolSize(newSize);
99+
affinityWorldAndRegionPool.setCorePoolSize(newSize);
100+
}
101+
}
27102
}
28103

29-
public static void increasePoolSize() {
30-
if (affinityThreadPool != null) {
31-
affinityThreadPool.setMaximumPoolSize(affinityThreadPool.getMaximumPoolSize() + 1);
32-
affinityThreadPool.setCorePoolSize(affinityThreadPool.getCorePoolSize() + 1);
104+
private static void increaseSharedPoolSize() {
105+
synchronized (poolSizeLock) {
106+
if (affinitySharedPool != null) {
107+
int newSize = affinitySharedPool.getMaximumPoolSize() + 1;
108+
affinitySharedPool.setMaximumPoolSize(newSize);
109+
affinitySharedPool.setCorePoolSize(newSize);
110+
}
33111
}
34112
}
35113

36-
public static void decreasePoolSize() {
37-
if (affinityThreadPool != null) {
38-
affinityThreadPool.setCorePoolSize(affinityThreadPool.getCorePoolSize() - 1);
39-
affinityThreadPool.setMaximumPoolSize(affinityThreadPool.getMaximumPoolSize() - 1);
114+
private static void decreaseSharedPoolSize() {
115+
synchronized (poolSizeLock) {
116+
if (affinitySharedPool != null) {
117+
int newSize = affinitySharedPool.getCorePoolSize() - 1;
118+
affinitySharedPool.setCorePoolSize(newSize);
119+
affinitySharedPool.setMaximumPoolSize(newSize);
120+
}
40121
}
41122
}
42123

124+
43125
public static void shutdown() {
44-
if (affinityThreadPool != null) {
45-
affinityThreadPool.shutdown();
126+
if (affinityWorldAndRegionPool != null) {
127+
affinityWorldAndRegionPool.shutdown();
128+
try {
129+
if (!affinityWorldAndRegionPool.awaitTermination(5, TimeUnit.SECONDS)) {
130+
affinityWorldAndRegionPool.shutdownNow();
131+
}
132+
} catch (InterruptedException e) {
133+
affinityWorldAndRegionPool.shutdownNow();
134+
Thread.currentThread().interrupt();
135+
}
136+
if (affinityWorldAndRegionPool.getThreadFactory() instanceof AffinityThreadFactory factory) {
137+
factory.releaseAllCores();
138+
}
139+
}
140+
141+
if (affinitySharedPool != null) {
142+
affinitySharedPool.shutdown();
46143
try {
47-
if (!affinityThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
48-
affinityThreadPool.shutdownNow();
144+
if (!affinitySharedPool.awaitTermination(5, TimeUnit.SECONDS)) {
145+
affinitySharedPool.shutdownNow();
49146
}
50147
} catch (InterruptedException e) {
51-
affinityThreadPool.shutdownNow();
148+
affinitySharedPool.shutdownNow();
52149
Thread.currentThread().interrupt();
53150
}
54151

55-
if (affinityThreadPool.getThreadFactory() instanceof AffinityThreadFactory factory) {
152+
if (affinitySharedPool.getThreadFactory() instanceof AffinityThreadFactory factory) {
56153
factory.releaseAllCores();
57154
}
58155
}

src/main/java/net/himeki/mcmtfabric/parallelised/threads/MCMTThreads.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import java.util.concurrent.ThreadFactory;
88

99
public class MCMTThreads {
10-
private static final Logger LOGGER = LogManager.getLogger();
11-
1210
public static ThreadFactory createNamedVirtualThreadFactory(String name) {
1311
return Thread.ofVirtual()
1412
.name(name, 0)
@@ -20,29 +18,4 @@ public static ThreadFactory createNamedPlatformThreadFactory(String name) {
2018
.name(name, 0)
2119
.factory();
2220
}
23-
24-
public static ThreadFactory createNamedPlatformAffinityThreadFactoryForRegion(ThreadedChunksRegion region) {
25-
return runnable -> {
26-
LOGGER.debug("Region {} requesting core assignment", region.getName());
27-
int cpuCore = CPUCoreManager.acquireCore("REGION");
28-
if (cpuCore == -1) {
29-
LOGGER.error("Failed to acquire CPU core for region {} even after preemption attempts", region.getName());
30-
throw new RuntimeException("No available CPU cores for thread affinity, even after attempting preemption");
31-
}
32-
LOGGER.info("Region {} assigned to core {}", region.getName(), cpuCore);
33-
region.setAssignedCpuCore(cpuCore);
34-
SharedThreadPools.adjustSharedPoolSize();
35-
36-
Thread thread = new Thread(() -> {
37-
try (AffinityLock al = AffinityLock.acquireLock(cpuCore)) {
38-
runnable.run();
39-
} finally {
40-
// Release the core when the executor is shutting down
41-
}
42-
}, "Region-" + region.getName() + "-PlatformThread");
43-
44-
thread.setDaemon(true);
45-
return thread;
46-
};
47-
}
4821
}

src/main/java/net/himeki/mcmtfabric/parallelised/threads/SharedThreadPools.java

Lines changed: 0 additions & 107 deletions
This file was deleted.

0 commit comments

Comments
 (0)