Skip to content

Commit 0552130

Browse files
committed
support specifying fe protocol
1 parent c61342f commit 0552130

File tree

14 files changed

+46
-30
lines changed

14 files changed

+46
-30
lines changed

flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ public class RestService implements Serializable {
9191
private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
9292
private static final String FE_LOGIN = "/rest/v1/login";
9393
private static final ObjectMapper objectMapper = new ObjectMapper();
94-
private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema";
95-
private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan";
94+
private static final String TABLE_SCHEMA_API = "%s/api/%s/%s/_schema";
95+
private static final String QUERY_PLAN_API = "%s/api/%s/%s/_query_plan";
9696

9797
/**
9898
* send request to Doris FE and get response json string.
@@ -131,6 +131,7 @@ private static String send(
131131
RequestConfig.custom()
132132
.setConnectTimeout(connectTimeout)
133133
.setSocketTimeout(socketTimeout)
134+
.setRedirectsEnabled(true)
134135
.build();
135136

136137
request.setConfig(requestConfig);
@@ -310,6 +311,9 @@ public static String randomEndpoint(String feNodes, Logger logger)
310311
Collections.shuffle(nodes);
311312
for (String feNode : nodes) {
312313
String host = feNode.trim();
314+
if (!host.startsWith("http://") && !host.startsWith("https://")) {
315+
host = "http://" + host;
316+
}
313317
if (BackendUtil.tryHttpConnection(host)) {
314318
return host;
315319
}
@@ -359,7 +363,10 @@ public static List<BackendRowV2> getBackendsV2(
359363

360364
for (String feNode : feNodeList) {
361365
try {
362-
String beUrl = "http://" + feNode + BACKENDS_V2;
366+
if (!feNode.startsWith("http://") && !feNode.startsWith("https://")) {
367+
feNode = "http://" + feNode;
368+
}
369+
String beUrl = feNode + BACKENDS_V2;
363370
HttpGet httpGet = new HttpGet(beUrl);
364371
String response = send(options, readOptions, httpGet, logger);
365372
logger.info("Backend Info:{}", response);
@@ -387,8 +394,7 @@ public static List<BackendRowV2> getBackendsV2(
387394
private static List<BackendRowV2> convert(List<String> feNodeList) {
388395
List<BackendRowV2> nodeList = new ArrayList<>();
389396
for (String node : feNodeList) {
390-
String[] split = node.split(":");
391-
nodeList.add(BackendRowV2.of(split[0], Integer.valueOf(split[1]), true));
397+
nodeList.add(BackendRowV2.ofUrl(node, true));
392398
}
393399
return nodeList;
394400
}

flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ public String getIp() {
5252
}
5353

5454
public void setIp(String ip) {
55+
if (!ip.startsWith("http://") && !ip.startsWith("https://")) {
56+
ip = "http://" + ip;
57+
}
5558
this.ip = ip;
5659
}
5760

@@ -82,5 +85,13 @@ public static BackendRowV2 of(String ip, int httpPort, boolean alive) {
8285
rowV2.setAlive(alive);
8386
return rowV2;
8487
}
88+
89+
public static BackendRowV2 ofUrl(String url, boolean alive) {
90+
int lastColon = url.lastIndexOf(":");
91+
return BackendRowV2.of(
92+
url.substring(0, lastColon),
93+
Integer.valueOf(url.substring(lastColon + 1)),
94+
alive);
95+
}
8596
}
8697
}

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,7 @@ private List<BackendV2.BackendRowV2> initBackends(String beNodes) {
5858
if (tryHttpConnection(node)) {
5959
LOG.info("{} backend http connection success.", node);
6060
node = node.trim();
61-
String[] ipAndPort = node.split(":");
62-
BackendRowV2 backendRowV2 = new BackendRowV2();
63-
backendRowV2.setIp(ipAndPort[0]);
64-
backendRowV2.setHttpPort(Integer.parseInt(ipAndPort[1]));
65-
backendRowV2.setAlive(true);
66-
backends.add(backendRowV2);
61+
backends.add(BackendRowV2.ofUrl(node, true));
6762
}
6863
});
6964
return backends;
@@ -98,8 +93,10 @@ public String getAvailableBackend(int subtaskId) {
9893

9994
public static boolean tryHttpConnection(String host) {
10095
try {
96+
if (!host.startsWith("http://") && !host.startsWith("https://")) {
97+
host = "http://" + host;
98+
}
10199
LOG.debug("try to connect host {}", host);
102-
host = "http://" + host;
103100
URL url = new URL(host);
104101
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
105102
connection.setRequestMethod("GET");

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisCommittable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class DorisCommittable implements DorisAbstractCommittable {
2626
private final long txnID;
2727

2828
public DorisCommittable(String hostPort, String db, long txnID) {
29-
this.hostPort = hostPort;
29+
this.hostPort = (hostPort.startsWith("http") ? "" : "http://") + hostPort;
3030
this.db = db;
3131
this.txnID = txnID;
3232
}

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public class DorisBatchStreamLoad implements Serializable {
8989
private static final long STREAM_LOAD_MAX_ROWS = Integer.MAX_VALUE;
9090
private final LabelGenerator labelGenerator;
9191
private final byte[] lineDelimiter;
92-
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
92+
private static final String LOAD_URL_PATTERN = "%s/api/%s/%s/_stream_load";
9393
private String loadUrl;
9494
private String hostPort;
9595
private final String username;

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
/** The committer to commit transaction. */
5353
public class DorisCommitter implements Committer<DorisCommittable>, Closeable {
5454
private static final Logger LOG = LoggerFactory.getLogger(DorisCommitter.class);
55-
private static final String commitPattern = "http://%s/api/%s/_stream_load_2pc";
55+
private static final String commitPattern = "%s/api/%s/_stream_load_2pc";
5656
private final CloseableHttpClient httpClient;
5757
private final DorisOptions dorisOptions;
5858
private final DorisReadOptions dorisReadOptions;

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class BatchStageLoad implements Serializable {
6767
private static final Logger LOG = LoggerFactory.getLogger(BatchStageLoad.class);
6868
private final LabelGenerator labelGenerator;
6969
private final byte[] lineDelimiter;
70-
private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload";
70+
private static final String UPLOAD_URL_PATTERN = "%s/copy/upload";
7171
private static final String LINE_DELIMITER_KEY_WITH_PRETIX = "file.line_delimiter";
7272
private String uploadUrl;
7373
private String hostPort;
@@ -96,7 +96,9 @@ public BatchStageLoad(
9696
this.password = dorisOptions.getPassword();
9797
this.loadProps = executionOptions.getStreamLoadProp();
9898
this.labelGenerator = labelGenerator;
99-
this.hostPort = dorisOptions.getFenodes();
99+
this.hostPort =
100+
(dorisOptions.getFenodes().startsWith("http") ? "" : "http://")
101+
+ dorisOptions.getFenodes();
100102
this.uploadUrl = String.format(UPLOAD_URL_PATTERN, hostPort);
101103
this.fileNum = new AtomicInteger();
102104
this.lineDelimiter =

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommittable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class DorisCopyCommittable implements DorisAbstractCommittable {
2626
private final String copySQL;
2727

2828
public DorisCopyCommittable(String hostPort, String copySQL) {
29-
this.hostPort = hostPort;
29+
this.hostPort = (hostPort.startsWith("http") ? "" : "http://") + hostPort;
3030
this.copySQL = copySQL;
3131
}
3232

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444

4545
public class DorisCopyCommitter implements Committer<DorisCopyCommittable>, Closeable {
4646
private static final Logger LOG = LoggerFactory.getLogger(DorisCopyCommitter.class);
47-
private static final String commitPattern = "http://%s/copy/query";
47+
private static final String commitPattern = "%s/copy/query";
4848
private static final int SUCCESS = 0;
4949
private static final String FAIL = "1";
5050
private ObjectMapper objectMapper = new ObjectMapper();

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class SchemaChangeManager implements Serializable {
5858
private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class);
5959
private static final String CHECK_SCHEMA_CHANGE_API =
6060
"http://%s/api/enable_light_schema_change/%s/%s";
61-
private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s";
61+
private static final String SCHEMA_CHANGE_API = "%s/api/query/default_cluster/%s";
6262
private ObjectMapper objectMapper = new ObjectMapper();
6363
private DorisOptions dorisOptions;
6464
private String charsetEncoding = "UTF-8";

0 commit comments

Comments
 (0)