Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## [Unreleased]

- OIDC token request to not flow during explain

## [0.18.0] - 2025-01-15

- Ignore Eclipse files in .gitignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

import java.io.IOException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import com.fasterxml.jackson.core.type.TypeReference;
Expand All @@ -17,16 +20,20 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.StringUtils;

import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.PollingClient;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker;
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig;
import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.RESULT_TYPE;
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST;

/**
* An implementation of {@link PollingClient} that uses Java 11's {@link HttpClient}.
Expand Down Expand Up @@ -96,12 +103,59 @@ private Collection<RowData> queryAndProcess(RowData lookupData) throws Exception

HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData);
HttpResponse<String> response = httpClient.send(
request.getHttpRequest(),
BodyHandlers.ofString()
);
updateHttpRequestIfRequired(request,
HttpHeaderUtils.createOIDCHeaderPreprocessor(options.getReadableConfig())),
BodyHandlers.ofString());
return processHttpResponse(response, request);
}

/**
* If using OIDC, update the http request using the oidc header pre processor to supply the
* authentication header, with a short lived bearer token.
* @param request http reauest to amend
* @param oidcHeaderPreProcessor OIDC header pre processor
* @return http request, which for OIDC will have the bearer token as the authentication header
*/
protected HttpRequest updateHttpRequestIfRequired(HttpLookupSourceRequestEntry request,
HeaderPreprocessor oidcHeaderPreProcessor) {
// We need to check the config and if required amend the value of the
// authentication header to the short lived bearer token
HttpRequest httpRequest = request.getHttpRequest();
ReadableConfig readableConfig = options.getReadableConfig();
if (oidcHeaderPreProcessor != null) {
HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(httpRequest.uri());
if (httpRequest.timeout().isPresent()) {
builder.timeout(httpRequest.timeout().get());
}
if (httpRequest.method().endsWith("GET")) {
builder.GET();
} else {
builder.method(httpRequest.method(), httpRequest.bodyPublisher().get());
}
Map<String, String> headerMap = new HashMap<>();
if (httpRequest.headers() != null && !httpRequest.headers().map().isEmpty()) {
for (Map.Entry<String, List<String>> header
:httpRequest.headers().map().entrySet()) {
List<String> values = header.getValue();
if (values.size() == 1) {
headerMap.put(header.getKey(), header.getValue().get(0));
}
// the existing design does not handle multiple values for headers
}
}
Optional<String> oidcTokenRequest = readableConfig
.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST);
String bearerToken = oidcHeaderPreProcessor.preprocessValueForHeader(
HttpHeaderUtils.AUTHORIZATION, oidcTokenRequest.get());
headerMap.put(HttpHeaderUtils.AUTHORIZATION, bearerToken);
String[] headerAndValueArray = HttpHeaderUtils.toHeaderAndValueArray(headerMap);
builder.headers(headerAndValueArray);
httpRequest = builder.build();
}
return httpRequest;
}

private Collection<RowData> processHttpResponse(
HttpResponse<String> response,
HttpLookupSourceRequestEntry request) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import java.net.http.HttpRequest.Builder;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -18,7 +16,6 @@
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL;

/**
* Base class for {@link HttpRequest} factories.
Expand Down Expand Up @@ -51,21 +48,10 @@ public RequestFactoryBase(
this.baseUrl = options.getUrl();
this.lookupQueryCreator = lookupQueryCreator;
this.options = options;

Properties properties = options.getProperties();
/*
* For OIDC, the preprocessor will fully specify the Authentication header value,
* as a bearer token. But the preprocessors only amend existing headers, so in this case
* if there is no existing authorization header then we add a dummy one to the properties,
* so the preprocessor will be driven and will provide the value.
*/
Optional<String> oidcAuthURL = options.getReadableConfig()
.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL);
if (oidcAuthURL.isPresent()) {
properties.put(HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX
+ HttpHeaderUtils.AUTHORIZATION, "Dummy");
}

// note that the OIDC header preprocessor is not setup here, because it
// issues a network call to the authentication server. This code is driven for
// explain select. Explain should not issue network calls.
// We setup the OIDC authentication header at lookup query time.
var headerMap = HttpHeaderUtils
.prepareHeaderMap(
HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,17 @@ AUTHORIZATION, new OIDCAuthHeaderValuePreprocessor(oidcAuthURL,
}

public static HeaderPreprocessor createHeaderPreprocessor(ReadableConfig readableConfig) {
HeaderPreprocessor headerPreprocessor;
boolean useRawAuthHeader =
readableConfig.get(HttpLookupConnectorOptions.USE_RAW_AUTH_HEADER);
HeaderPreprocessor headerPreprocessor =
HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(
useRawAuthHeader);
log.info("created HeaderPreprocessor for basic useRawAuthHeader=" + useRawAuthHeader);
log.info("returning HeaderPreprocessor " + headerPreprocessor);
return headerPreprocessor;
}
public static HeaderPreprocessor createOIDCHeaderPreprocessor(ReadableConfig readableConfig) {
HeaderPreprocessor headerPreprocessor = null;
Optional<String> oidcAuthURL = readableConfig
.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL);

Expand All @@ -121,16 +131,7 @@ public static HeaderPreprocessor createHeaderPreprocessor(ReadableConfig readabl
+ " for OIDC oidcAuthURL=" + oidcAuthURL
+ ", oidcTokenRequest=" + oidcTokenRequest
+ ", oidcExpiryReduction=" + oidcExpiryReduction);
} else {
boolean useRawAuthHeader =
readableConfig.get(HttpLookupConnectorOptions.USE_RAW_AUTH_HEADER);

headerPreprocessor =
HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(
useRawAuthHeader);
log.info("created HeaderPreprocessor for basic useRawAuthHeader=" + useRawAuthHeader);
}
log.info("returning HeaderPreprocessor " + headerPreprocessor);
return headerPreprocessor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.mockito.junit.jupiter.MockitoExtension;
import static org.assertj.core.api.Assertions.assertThat;


import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQueryCreator;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.io.File;
import java.net.URI;
import java.net.http.HttpRequest;
import java.time.Duration;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.stubbing.StubMapping;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static org.assertj.core.api.Assertions.assertThat;

import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
import static com.getindata.connectors.http.TestHelper.readTestFile;
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*;

public class JavaNetHttpPollingClientWithWireTest {
private static final String BASE_URL = "http://localhost.com";

private static final String SAMPLES_FOLDER = "/auth/";
private static final int SERVER_PORT = 9090;

private static final int HTTPS_SERVER_PORT = 8443;

private static final String SERVER_KEYSTORE_PATH =
"src/test/resources/security/certs/serverKeyStore.jks";

private static final String SERVER_TRUSTSTORE_PATH =
"src/test/resources/security/certs/serverTrustStore.jks";

private static final String ENDPOINT = "/auth";
private static final String BEARER_REQUEST = "Bearer Dummy";

private WireMockServer wireMockServer;
@SuppressWarnings("unchecked")
@BeforeEach
public void setup() {

File keyStoreFile = new File(SERVER_KEYSTORE_PATH);
File trustStoreFile = new File(SERVER_TRUSTSTORE_PATH);

wireMockServer = new WireMockServer(
WireMockConfiguration.wireMockConfig()
.port(SERVER_PORT)
.httpsPort(HTTPS_SERVER_PORT)
.keystorePath(keyStoreFile.getAbsolutePath())
.keystorePassword("password")
.keyManagerPassword("password")
.needClientAuth(true)
.trustStorePath(trustStoreFile.getAbsolutePath())
.trustStorePassword("password")
.extensions(JsonTransform.class)
);
wireMockServer.start();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
Configuration config = new Configuration();
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
env.configure(config, getClass().getClassLoader());
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
}

@AfterEach
public void tearDown() {
wireMockServer.stop();
}


@Test
public void shouldUpdateHttpRequestIfRequiredGet() {
HttpRequest httpRequest = HttpRequest.newBuilder()
.GET()
.uri(URI.create(BASE_URL))
.timeout(Duration.ofSeconds(1))
.setHeader("Origin","*")
.setHeader("X-Content-Type-Options","nosniff")
.setHeader("Content-Type","application/json")
.build();
shouldUpdateHttpRequestIfRequired(httpRequest);
}
@Test
public void shouldUpdateHttpRequestIfRequiredPut() {
HttpRequest httpRequest = HttpRequest.newBuilder()
.PUT( HttpRequest.BodyPublishers.ofString("foo"))
.uri(URI.create(BASE_URL))
.timeout(Duration.ofSeconds(1))
.setHeader("Origin","*")
.setHeader("X-Content-Type-Options","nosniff")
.setHeader("Content-Type","application/json")
.build();
shouldUpdateHttpRequestIfRequired(httpRequest);
}
private void shouldUpdateHttpRequestIfRequired(HttpRequest httpRequest) {
setUpServerBodyStub();
JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(null,
null,
HttpLookupConfig.builder().url(BASE_URL).build(),
null);
LookupQueryInfo lookupQueryInfo = null;
HttpLookupSourceRequestEntry request =
new HttpLookupSourceRequestEntry(httpRequest, lookupQueryInfo);

Configuration configuration = new Configuration();
HeaderPreprocessor oidcHeaderPreProcessor =
HttpHeaderUtils.createOIDCHeaderPreprocessor(configuration);
HttpRequest newHttpRequest = client.updateHttpRequestIfRequired(request,
oidcHeaderPreProcessor);
assertThat(httpRequest).isEqualTo(newHttpRequest);
configuration.setString(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL.key(),"http://localhost:9090/auth");
configuration.setString(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST, BEARER_REQUEST);
configuration.set(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION,
Duration.ofSeconds(1L));
client = new JavaNetHttpPollingClient(null,
null,
HttpLookupConfig.builder().url(BASE_URL).readableConfig(configuration).build(),
null);
oidcHeaderPreProcessor =
HttpHeaderUtils.createOIDCHeaderPreprocessor(configuration);
// change oidcHeaderPreProcessor to use the mock http client for the authentication flow
newHttpRequest = client.updateHttpRequestIfRequired(request,
oidcHeaderPreProcessor);
assertThat(httpRequest).isNotEqualTo(newHttpRequest);
assertThat(httpRequest.headers().map().keySet().size()).isEqualTo(3);
assertThat(newHttpRequest.headers().map().keySet().size()).isEqualTo(4);
assertThat(httpRequest.headers().map().get("Content-Type"))
.isEqualTo(newHttpRequest.headers().map().get("Content-Type"));
}

private StubMapping setUpServerBodyStub() {
return wireMockServer.stubFor(
post(urlEqualTo(ENDPOINT))
.withHeader("Content-Type", equalTo("application/x-www-form-urlencoded"))
.withRequestBody(equalTo(BEARER_REQUEST))
.willReturn(
aResponse()
.withStatus(200)
.withBody(readTestFile(SAMPLES_FOLDER + "AuthResult.json"))
)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.getindata.connectors.http.internal.utils;
import java.time.Duration;

import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import com.getindata.connectors.http.internal.HeaderPreprocessor;
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*;



public class HttpHeaderUtilsTest {
@Test
void shouldCreateOIDCHeaderPreprocessorTest() {
Configuration configuration = new Configuration();
HeaderPreprocessor headerPreprocessor
= HttpHeaderUtils.createOIDCHeaderPreprocessor(configuration);
assertThat(headerPreprocessor).isNull();
configuration.setString(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL.key(), "http://aaa");
configuration.setString(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST.key(), "ccc");
configuration.set(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION, Duration.ofSeconds(1));
headerPreprocessor
= HttpHeaderUtils.createOIDCHeaderPreprocessor(configuration);
assertThat(headerPreprocessor).isNotNull();
}
}
4 changes: 4 additions & 0 deletions src/test/resources/auth/AuthResult.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"access_token": "test",
"expires_in": 1
}
Loading