Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ public class AmoroManagementConf {
.defaultValue("admin")
.withDescription("The administrator password");

/** Enable master & slave mode, which supports horizontal scaling of AMS. */
public static final ConfigOption<Boolean> USE_MASTER_SLAVE_MODE =
ConfigOptions.key("use-master-slave-mode")
.booleanType()
.defaultValue(false)
.withDescription("Enable master & slave mode, which supports horizontal scaling of AMS.");

public static final ConfigOption<Duration> CATALOG_META_CACHE_EXPIRATION_INTERVAL =
ConfigOptions.key("catalog-meta-cache.expiration-interval")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server;

import static org.apache.amoro.server.AmoroManagementConf.USE_MASTER_SLAVE_MODE;

import io.javalin.Javalin;
import io.javalin.http.HttpCode;
import io.javalin.http.staticfiles.Location;
Expand Down Expand Up @@ -96,6 +98,7 @@ public class AmoroServiceContainer {
public static final Logger LOG = LoggerFactory.getLogger(AmoroServiceContainer.class);

public static final String SERVER_CONFIG_FILENAME = "config.yaml";
private static boolean IS_MASTER_SLAVE_MODE = false;

private final HighAvailabilityContainer haContainer;
private DataSource dataSource;
Expand Down Expand Up @@ -128,15 +131,22 @@ public static void main(String[] args) {
LOG.info("AMS service has been shut down");
}));
service.startRestServices();
while (true) {
try {
service.waitLeaderShip();
service.startOptimizingService();
service.waitFollowerShip();
} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
service.disposeOptimizingService();
if (IS_MASTER_SLAVE_MODE) {
// Even if one does not become the master, it cannot block the subsequent logic.
service.registAndElect();
// Regardless of whether tp becomes the master, the service needs to be activated.
service.startOptimizingService();
} else {
while (true) {
try {
service.waitLeaderShip();
service.startOptimizingService();
service.waitFollowerShip();
} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
service.disposeOptimizingService();
}
}
}
} catch (Throwable t) {
Expand All @@ -145,6 +155,10 @@ public static void main(String[] args) {
}
}

public void registAndElect() throws Exception {
haContainer.registAndElect();
}

public void waitLeaderShip() throws Exception {
haContainer.waitLeaderShip();
}
Expand Down Expand Up @@ -256,6 +270,7 @@ public void dispose() {
private void initConfig() throws Exception {
LOG.info("initializing configurations...");
new ConfigurationHelper().init();
IS_MASTER_SLAVE_MODE = serviceConfig.getBoolean(USE_MASTER_SLAVE_MODE);
}

private void startThriftService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class HighAvailabilityContainer implements LeaderLatchListener {
Expand All @@ -43,13 +45,27 @@ public class HighAvailabilityContainer implements LeaderLatchListener {

private final LeaderLatch leaderLatch;
private final CuratorFramework zkClient;

// Package-private accessors for testing
CuratorFramework getZkClient() {
return zkClient;
}

LeaderLatch getLeaderLatch() {
return leaderLatch;
}

private final String tableServiceMasterPath;
private final String optimizingServiceMasterPath;
private final String nodesPath;
private final AmsServerInfo tableServiceServerInfo;
private final AmsServerInfo optimizingServiceServerInfo;
private final boolean isMasterSlaveMode;
private volatile CountDownLatch followerLatch;
private String registeredNodePath;

public HighAvailabilityContainer(Configurations serviceConfig) throws Exception {
this.isMasterSlaveMode = serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) {
String zkServerAddress = serviceConfig.getString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS);
int zkSessionTimeout =
Expand All @@ -59,6 +75,7 @@ public HighAvailabilityContainer(Configurations serviceConfig) throws Exception
String haClusterName = serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
tableServiceMasterPath = AmsHAProperties.getTableServiceMasterPath(haClusterName);
optimizingServiceMasterPath = AmsHAProperties.getOptimizingServiceMasterPath(haClusterName);
nodesPath = AmsHAProperties.getNodesPath(haClusterName);
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
this.zkClient =
CuratorFrameworkFactory.builder()
Expand All @@ -70,6 +87,7 @@ public HighAvailabilityContainer(Configurations serviceConfig) throws Exception
zkClient.start();
createPathIfNeeded(tableServiceMasterPath);
createPathIfNeeded(optimizingServiceMasterPath);
createPathIfNeeded(nodesPath);
String leaderPath = AmsHAProperties.getLeaderPath(haClusterName);
createPathIfNeeded(leaderPath);
leaderLatch = new LeaderLatch(zkClient, leaderPath);
Expand All @@ -90,8 +108,10 @@ public HighAvailabilityContainer(Configurations serviceConfig) throws Exception
zkClient = null;
tableServiceMasterPath = null;
optimizingServiceMasterPath = null;
nodesPath = null;
tableServiceServerInfo = null;
optimizingServiceServerInfo = null;
registeredNodePath = null;
// block follower latch forever when ha is disabled
followerLatch = new CountDownLatch(1);
}
Expand Down Expand Up @@ -126,6 +146,27 @@ public void waitLeaderShip() throws Exception {
LOG.info("Became the leader of AMS");
}

public void registAndElect() throws Exception {
if (!isMasterSlaveMode) {
LOG.debug("Master-slave mode is not enabled, skip node registration");
return;
}
if (zkClient == null || nodesPath == null) {
LOG.warn("HA is not enabled, skip node registration");
return;
}
// Register node to ZK using ephemeral node
// The node will be automatically deleted when the session expires
String nodeInfo = JacksonUtil.toJSONString(tableServiceServerInfo);
registeredNodePath =
zkClient
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(nodesPath + "/node-", nodeInfo.getBytes(StandardCharsets.UTF_8));
LOG.info("Registered AMS node to ZK: {}", registeredNodePath);
}

public void waitFollowerShip() throws Exception {
LOG.info("Waiting to become the follower of AMS");
if (followerLatch != null) {
Expand All @@ -137,6 +178,18 @@ public void waitFollowerShip() throws Exception {
public void close() {
if (leaderLatch != null) {
try {
// Unregister node from ZK
if (registeredNodePath != null) {
try {
zkClient.delete().forPath(registeredNodePath);
LOG.info("Unregistered AMS node from ZK: {}", registeredNodePath);
} catch (KeeperException.NoNodeException e) {
// Node already deleted, ignore
LOG.debug("Node {} already deleted", registeredNodePath);
} catch (Exception e) {
LOG.warn("Failed to unregister node from ZK: {}", registeredNodePath, e);
}
}
this.leaderLatch.close();
this.zkClient.close();
} catch (IOException e) {
Expand Down Expand Up @@ -171,6 +224,58 @@ private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int restB
return amsServerInfo;
}

/**
* Get list of alive nodes. Only the leader node can call this method.
*
* @return List of alive node information
*/
public List<AmsServerInfo> getAliveNodes() throws Exception {
List<AmsServerInfo> aliveNodes = new ArrayList<>();
if (!isMasterSlaveMode) {
LOG.debug("Master-slave mode is not enabled, return empty node list");
return aliveNodes;
}
if (zkClient == null || nodesPath == null) {
LOG.warn("HA is not enabled, return empty node list");
return aliveNodes;
}
if (!leaderLatch.hasLeadership()) {
LOG.warn("Only leader node can get alive nodes list");
return aliveNodes;
}
try {
List<String> nodePaths = zkClient.getChildren().forPath(nodesPath);
for (String nodePath : nodePaths) {
try {
String fullPath = nodesPath + "/" + nodePath;
byte[] data = zkClient.getData().forPath(fullPath);
if (data != null && data.length > 0) {
String nodeInfoJson = new String(data, StandardCharsets.UTF_8);
AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson, AmsServerInfo.class);
aliveNodes.add(nodeInfo);
}
} catch (Exception e) {
LOG.warn("Failed to get node info for path: {}", nodePath, e);
}
}
} catch (KeeperException.NoNodeException e) {
LOG.debug("Nodes path {} does not exist", nodesPath);
}
return aliveNodes;
}

/**
* Check if current node is the leader.
*
* @return true if current node is the leader, false otherwise
*/
public boolean hasLeadership() {
if (leaderLatch == null) {
return false;
}
return leaderLatch.hasLeadership();
}

private void createPathIfNeeded(String path) throws Exception {
try {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
Expand Down
Loading
Loading