Skip to content

Commit 9485f27

Browse files
exposing retry metrics
1 parent 680723b commit 9485f27

File tree

4 files changed

+37
-9
lines changed

4 files changed

+37
-9
lines changed

src/main/java/com/getindata/connectors/http/internal/PollingClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.Collection;
44

55
import org.apache.flink.table.data.RowData;
6+
import org.apache.flink.table.functions.FunctionContext;
67

78
/**
89
* A client that is used to get enrichment data from external component.
@@ -15,4 +16,10 @@ public interface PollingClient<T> {
1516
* @return an optional result of data lookup.
1617
*/
1718
Collection<T> pull(RowData lookupRow);
19+
20+
/**
21+
* Initialize the client.
22+
* @param ctx function context
23+
*/
24+
void open(FunctionContext ctx);
1825
}

src/main/java/com/getindata/connectors/http/internal/retry/HttpClientWithRetry.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import com.getindata.connectors.http.internal.status.HttpResponseChecker;
55
import io.github.resilience4j.retry.Retry;
66
import io.github.resilience4j.retry.RetryConfig;
7+
import io.github.resilience4j.retry.RetryRegistry;
78
import lombok.Builder;
89
import lombok.Getter;
910
import lombok.RequiredArgsConstructor;
1011
import lombok.extern.slf4j.Slf4j;
12+
import org.apache.flink.metrics.MetricGroup;
1113

1214
import java.io.IOException;
1315
import java.net.http.HttpClient;
@@ -19,26 +21,38 @@
1921
public class HttpClientWithRetry {
2022

2123
private final HttpClient httpClient;
22-
private final RetryConfig retryConfig;
2324
@Getter
2425
private final HttpResponseChecker responseChecker;
26+
private final Retry retry;
2527

2628
@Builder
2729
HttpClientWithRetry(HttpClient httpClient,
2830
RetryConfig retryConfig,
2931
HttpResponseChecker responseChecker) {
3032
this.httpClient = httpClient;
3133
this.responseChecker = responseChecker;
32-
this.retryConfig = RetryConfig.from(retryConfig)
34+
retryConfig = RetryConfig.from(retryConfig)
3335
.retryExceptions(IOException.class, RetryHttpRequestException.class)
3436
.build();
37+
this.retry = RetryRegistry.ofDefaults().retry("http-lookup-connector", retryConfig);
38+
}
39+
40+
public void registerMetrics(MetricGroup metrics){
41+
var group = metrics.addGroup("http_lookup_connector");
42+
group.gauge("failed_calls_with_retry_attempt_count",
43+
() -> retry.getMetrics().getNumberOfFailedCallsWithRetryAttempt());
44+
group.gauge("failed_calls_without_retry_attempt_count",
45+
() -> retry.getMetrics().getNumberOfFailedCallsWithoutRetryAttempt());
46+
group.gauge("success_calls_with_retry_attempt",
47+
() -> retry.getMetrics().getNumberOfSuccessfulCallsWithRetryAttempt());
48+
group.gauge("success_calls_without_retry_attempt",
49+
() -> retry.getMetrics().getNumberOfSuccessfulCallsWithoutRetryAttempt());
3550
}
3651

3752
public <T> HttpResponse<T> send(
3853
Supplier<HttpRequest> requestSupplier,
3954
HttpResponse.BodyHandler<T> responseBodyHandler
4055
) throws IOException, InterruptedException, HttpStatusCodeValidationFailedException {
41-
var retry = Retry.of("http-lookup-connector", retryConfig);
4256
try {
4357
try {
4458
return Retry.decorateCheckedSupplier(retry, () -> task(requestSupplier, responseBodyHandler)).apply();

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package com.getindata.connectors.http.internal.table.lookup;
22

3-
import java.util.Collection;
4-
import java.util.concurrent.atomic.AtomicInteger;
5-
3+
import com.getindata.connectors.http.internal.PollingClient;
4+
import com.getindata.connectors.http.internal.PollingClientFactory;
5+
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
66
import lombok.AccessLevel;
77
import lombok.Getter;
88
import lombok.extern.slf4j.Slf4j;
@@ -12,9 +12,8 @@
1212
import org.apache.flink.table.functions.FunctionContext;
1313
import org.apache.flink.table.functions.LookupFunction;
1414

15-
import com.getindata.connectors.http.internal.PollingClient;
16-
import com.getindata.connectors.http.internal.PollingClientFactory;
17-
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
15+
import java.util.Collection;
16+
import java.util.concurrent.atomic.AtomicInteger;
1817

1918
@Slf4j
2019
public class HttpTableLookupFunction extends LookupFunction {
@@ -59,6 +58,8 @@ public void open(FunctionContext context) throws Exception {
5958

6059
context.getMetricGroup()
6160
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
61+
62+
client.open(context);
6263
}
6364

6465
@Override

src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.apache.flink.api.common.serialization.DeserializationSchema;
1818
import org.apache.flink.configuration.ReadableConfig;
1919
import org.apache.flink.table.data.RowData;
20+
import org.apache.flink.table.functions.FunctionContext;
2021
import org.apache.flink.util.ConfigurationException;
2122
import org.apache.flink.util.StringUtils;
2223

@@ -84,6 +85,11 @@ public JavaNetHttpPollingClient(
8485
validateIgnoredResponseCodes(this.httpClient.getResponseChecker());
8586
}
8687

88+
public void open(FunctionContext context) {
89+
httpClient.registerMetrics(context.getMetricGroup());
90+
}
91+
92+
8793
@Override
8894
public Collection<RowData> pull(RowData lookupRow) {
8995
try {

0 commit comments

Comments
 (0)