diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index c9abb3a3c0147..033d5308a2a96 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -20,6 +20,7 @@ import java.net.URL; import java.time.Clock; +import java.time.Duration; import org.apache.pulsar.client.api.Authentication; /** @@ -31,9 +32,9 @@ public final class AuthenticationFactoryOAuth2 { /** * Authenticate with client credentials. * - * @param issuerUrl the issuer URL + * @param issuerUrl the issuer URL * @param credentialsUrl the credentials URL - * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0. + * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0. * @return an Authentication object */ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) { @@ -43,23 +44,144 @@ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl /** * Authenticate with client credentials. * - * @param issuerUrl the issuer URL + * @param issuerUrl the issuer URL * @param credentialsUrl the credentials URL - * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0. - * @param scope An optional field. The value of the scope parameter is expressed as a list of space-delimited, - * case-sensitive strings. The strings are defined by the authorization server. - * If the value contains multiple space-delimited strings, their order does not matter, - * and each string adds an additional access range to the requested scope. - * From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 + * @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0. + * @param scope An optional field. The value of the scope parameter is expressed as a list of + * space-delimited, + * case-sensitive strings. The strings are defined by the authorization server. + * If the value contains multiple space-delimited strings, their order does not matter, + * and each string adds an additional access range to the requested scope. + * From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 * @return an Authentication object */ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope) { - ClientCredentialsFlow flow = ClientCredentialsFlow.builder() - .issuerUrl(issuerUrl) - .privateKey(credentialsUrl.toExternalForm()) - .audience(audience) - .scope(scope) - .build(); - return new AuthenticationOAuth2(flow, Clock.systemDefaultZone()); + return clientCredentialsBuilder().issuerUrl(issuerUrl).credentialsUrl(credentialsUrl).audience(audience) + .scope(scope).build(); } + + /** + * A builder to create an authentication with client credentials. + * + * @return the builder + */ + public static ClientCredentialsBuilder clientCredentialsBuilder() { + return new ClientCredentialsBuilder(); + } + + public static class ClientCredentialsBuilder { + + private URL issuerUrl; + private URL credentialsUrl; + private String audience; + private String scope; + private Duration connectTimeout; + private Duration readTimeout; + private String trustCertsFilePath; + + private ClientCredentialsBuilder() { + } + + /** + * Required issuer URL. + * + * @param issuerUrl the issuer URL + * @return the builder + */ + public ClientCredentialsBuilder issuerUrl(URL issuerUrl) { + this.issuerUrl = issuerUrl; + return this; + } + + /** + * Required credentials URL. + * + * @param credentialsUrl the credentials URL + * @return the builder + */ + public ClientCredentialsBuilder credentialsUrl(URL credentialsUrl) { + this.credentialsUrl = credentialsUrl; + return this; + } + + /** + * Optional audience identifier used by some Identity Providers, like Auth0. + * + * @param audience the audiance + * @return the builder + */ + public ClientCredentialsBuilder audience(String audience) { + this.audience = audience; + return this; + } + + /** + * Optional scope expressed as a list of space-delimited, case-sensitive strings. + * The strings are defined by the authorization server. + * If the value contains multiple space-delimited strings, their order does not matter, + * and each string adds an additional access range to the requested scope. + * From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 + * + * @param scope the scope + * @return the builder + */ + public ClientCredentialsBuilder scope(String scope) { + this.scope = scope; + return this; + } + + /** + * Optional HTTP connection timeout. + * + * @param connectTimeout the connect timeout + * @return the builder + */ + public ClientCredentialsBuilder connectTimeout(Duration connectTimeout) { + this.connectTimeout = connectTimeout; + return this; + } + + /** + * Optional HTTP read timeout. + * + * @param readTimeout the read timeout + * @return the builder + */ + public ClientCredentialsBuilder readTimeout(Duration readTimeout) { + this.readTimeout = readTimeout; + return this; + } + + /** + * Optional path to the file containing the trusted certificate(s) of the token issuer. + * + * @param trustCertsFilePath the path to the file containing the trusted certificate(s) + * @return the builder + */ + public ClientCredentialsBuilder trustCertsFilePath(String trustCertsFilePath) { + this.trustCertsFilePath = trustCertsFilePath; + return this; + } + + /** + * Authenticate with client credentials. + * + * @return an Authentication object + */ + public Authentication build() { + ClientCredentialsFlow flow = ClientCredentialsFlow.builder() + .issuerUrl(issuerUrl) + .privateKey(credentialsUrl == null ? null : credentialsUrl.toExternalForm()) + .audience(audience) + .scope(scope) + .connectTimeout(connectTimeout) + .readTimeout(readTimeout) + .trustCertsFilePath(trustCertsFilePath) + .build(); + return new AuthenticationOAuth2(flow, Clock.systemDefaultZone()); + } + + } + + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java index ef10f1afdb63b..7f64c0b18ac73 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java @@ -26,6 +26,7 @@ import java.net.URL; import java.net.URLConnection; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Map; import lombok.Builder; import lombok.extern.slf4j.Slf4j; @@ -60,62 +61,17 @@ class ClientCredentialsFlow extends FlowBase { private boolean initialized = false; @Builder - public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope) { - super(issuerUrl); + public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope, + Duration connectTimeout, Duration readTimeout, String trustCertsFilePath) { + super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath); this.audience = audience; this.privateKey = privateKey; this.scope = scope; } - @Override - public void initialize() throws PulsarClientException { - super.initialize(); - assert this.metadata != null; - - URL tokenUrl = this.metadata.getTokenEndpoint(); - this.exchanger = new TokenClient(tokenUrl); - initialized = true; - } - - public TokenResult authenticate() throws PulsarClientException { - // read the private key from storage - KeyFile keyFile; - try { - keyFile = loadPrivateKey(this.privateKey); - } catch (IOException e) { - throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage()); - } - - // request an access token using client credentials - ClientCredentialsExchangeRequest req = ClientCredentialsExchangeRequest.builder() - .clientId(keyFile.getClientId()) - .clientSecret(keyFile.getClientSecret()) - .audience(this.audience) - .scope(this.scope) - .build(); - TokenResult tr; - if (!initialized) { - initialize(); - } - try { - tr = this.exchanger.exchangeClientCredentials(req); - } catch (TokenExchangeException | IOException e) { - throw new PulsarClientException.AuthenticationException("Unable to obtain an access token: " - + e.getMessage()); - } - - return tr; - } - - @Override - public void close() throws Exception { - if (exchanger != null) { - exchanger.close(); - } - } - /** * Constructs a {@link ClientCredentialsFlow} from configuration parameters. + * * @param params * @return */ @@ -125,16 +81,24 @@ public static ClientCredentialsFlow fromParameters(Map params) { // These are optional parameters, so we only perform a get String scope = params.get(CONFIG_PARAM_SCOPE); String audience = params.get(CONFIG_PARAM_AUDIENCE); + Duration connectTimeout = parseParameterDuration(params, CONFIG_PARAM_CONNECT_TIMEOUT); + Duration readTimeout = parseParameterDuration(params, CONFIG_PARAM_READ_TIMEOUT); + String trustCertsFilePath = params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH); + return ClientCredentialsFlow.builder() .issuerUrl(issuerUrl) .audience(audience) .privateKey(privateKeyUrl) .scope(scope) + .connectTimeout(connectTimeout) + .readTimeout(readTimeout) + .trustCertsFilePath(trustCertsFilePath) .build(); } /** * Loads the private key from the given URL. + * * @param privateKeyURL * @return * @throws IOException @@ -162,4 +126,52 @@ private static KeyFile loadPrivateKey(String privateKeyURL) throws IOException { throw new IOException("Invalid privateKey format", e); } } + + @Override + public void initialize() throws PulsarClientException { + super.initialize(); + assert this.metadata != null; + + URL tokenUrl = this.metadata.getTokenEndpoint(); + this.exchanger = new TokenClient(tokenUrl, httpClient); + initialized = true; + } + + public TokenResult authenticate() throws PulsarClientException { + // read the private key from storage + KeyFile keyFile; + try { + keyFile = loadPrivateKey(this.privateKey); + } catch (IOException e) { + throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage()); + } + + // request an access token using client credentials + ClientCredentialsExchangeRequest req = ClientCredentialsExchangeRequest.builder() + .clientId(keyFile.getClientId()) + .clientSecret(keyFile.getClientSecret()) + .audience(this.audience) + .scope(this.scope) + .build(); + TokenResult tr; + if (!initialized) { + initialize(); + } + try { + tr = this.exchanger.exchangeClientCredentials(req); + } catch (TokenExchangeException | IOException e) { + throw new PulsarClientException.AuthenticationException("Unable to obtain an access token: " + + e.getMessage()); + } + + return tr; + } + + @Override + public void close() throws Exception { + super.close(); + if (exchanger != null) { + exchanger.close(); + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java index 125a880086297..6cc9f8e41b5e4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java @@ -18,16 +18,25 @@ */ package org.apache.pulsar.client.impl.auth.oauth2; +import io.netty.handler.ssl.SslContextBuilder; +import java.io.File; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.time.Duration; +import java.time.format.DateTimeParseException; import java.util.Map; +import javax.net.ssl.SSLException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver; import org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata; import org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; /** * An abstract OAuth 2.0 authorization flow. @@ -35,14 +44,60 @@ @Slf4j abstract class FlowBase implements Flow { + public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout"; + public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout"; + public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH = "trustCertsFilePath"; + + protected static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(10); + protected static final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(30); + private static final long serialVersionUID = 1L; protected final URL issuerUrl; + protected final AsyncHttpClient httpClient; protected transient Metadata metadata; - protected FlowBase(URL issuerUrl) { + protected FlowBase(URL issuerUrl, Duration connectTimeout, Duration readTimeout, String trustCertsFilePath) { this.issuerUrl = issuerUrl; + this.httpClient = defaultHttpClient(readTimeout, connectTimeout, trustCertsFilePath); + } + + private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration connectTimeout, + String trustCertsFilePath) { + DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); + confBuilder.setCookieStore(null); + confBuilder.setUseProxyProperties(true); + confBuilder.setFollowRedirect(true); + confBuilder.setConnectTimeout( + getParameterDurationToMillis(CONFIG_PARAM_CONNECT_TIMEOUT, connectTimeout, + DEFAULT_CONNECT_TIMEOUT)); + confBuilder.setReadTimeout( + getParameterDurationToMillis(CONFIG_PARAM_READ_TIMEOUT, readTimeout, DEFAULT_READ_TIMEOUT)); + confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); + if (StringUtils.isNotBlank(trustCertsFilePath)) { + try { + confBuilder.setSslContext(SslContextBuilder.forClient() + .trustManager(new File(trustCertsFilePath)) + .build()); + } catch (SSLException e) { + log.error("Could not set " + CONFIG_PARAM_TRUST_CERTS_FILE_PATH, e); + } + } + return new DefaultAsyncHttpClient(confBuilder.build()); + } + + private int getParameterDurationToMillis(String name, Duration value, Duration defaultValue) { + Duration duration; + if (value == null) { + log.info("Configuration for [{}] is using the default value: [{}]", name, defaultValue); + duration = defaultValue; + } else { + log.info("Configuration for [{}] is: [{}]", name, value); + duration = value; + } + + return (int) duration.toMillis(); } public void initialize() throws PulsarClientException { @@ -55,7 +110,7 @@ public void initialize() throws PulsarClientException { } protected MetadataResolver createMetadataResolver() { - return DefaultMetadataResolver.fromIssuerUrl(issuerUrl); + return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient); } static String parseParameterString(Map params, String name) { @@ -77,4 +132,21 @@ static URL parseParameterUrl(Map params, String name) { throw new IllegalArgumentException("Malformed configuration parameter: " + name); } } + + static Duration parseParameterDuration(Map params, String name) { + String value = params.get(name); + if (StringUtils.isNotBlank(value)) { + try { + return Duration.parse(value); + } catch (DateTimeParseException e) { + throw new IllegalArgumentException("Malformed configuration parameter: " + name, e); + } + } + return null; + } + + @Override + public void close() throws Exception { + httpClient.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java index be636145cb24b..19d0c1acadd15 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java @@ -19,88 +19,50 @@ package org.apache.pulsar.client.impl.auth.oauth2.protocol; import com.fasterxml.jackson.databind.ObjectReader; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; -import java.net.URLConnection; -import java.time.Duration; +import java.util.concurrent.ExecutionException; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.Response; /** * Resolves OAuth 2.0 authorization server metadata as described in RFC 8414. */ public class DefaultMetadataResolver implements MetadataResolver { - protected static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10; - protected static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30; - private final URL metadataUrl; private final ObjectReader objectReader; - private Duration connectTimeout; - private Duration readTimeout; + private final AsyncHttpClient httpClient; - public DefaultMetadataResolver(URL metadataUrl) { + public DefaultMetadataResolver(URL metadataUrl, AsyncHttpClient httpClient) { this.metadataUrl = metadataUrl; this.objectReader = ObjectMapperFactory.getMapper().reader().forType(Metadata.class); - // set a default timeout to ensure that this doesn't block - this.connectTimeout = Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS); - this.readTimeout = Duration.ofSeconds(DEFAULT_READ_TIMEOUT_IN_SECONDS); - } - - public DefaultMetadataResolver withConnectTimeout(Duration connectTimeout) { - this.connectTimeout = connectTimeout; - return this; - } - - public DefaultMetadataResolver withReadTimeout(Duration readTimeout) { - this.readTimeout = readTimeout; - return this; - } - - /** - * Resolves the authorization metadata. - * @return metadata - * @throws IOException if the metadata could not be resolved. - */ - public Metadata resolve() throws IOException { - try { - URLConnection c = this.metadataUrl.openConnection(); - if (connectTimeout != null) { - c.setConnectTimeout((int) connectTimeout.toMillis()); - } - if (readTimeout != null) { - c.setReadTimeout((int) readTimeout.toMillis()); - } - c.setRequestProperty("Accept", "application/json"); - - Metadata metadata; - try (InputStream inputStream = c.getInputStream()) { - metadata = this.objectReader.readValue(inputStream); - } - return metadata; - - } catch (IOException e) { - throw new IOException("Cannot obtain authorization metadata from " + metadataUrl.toString(), e); - } + this.httpClient = httpClient; } /** * Gets a well-known metadata URL for the given OAuth issuer URL. + * * @param issuerUrl The authorization server's issuer identifier * @return a resolver */ - public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl) { - return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl)); + public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl, AsyncHttpClient httpClient) { + return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl), httpClient); } /** * Gets a well-known metadata URL for the given OAuth issuer URL. - * @see - * OAuth Discovery: Obtaining Authorization Server Metadata + * * @param issuerUrl The authorization server's issuer identifier * @return a URL + * @see + * OAuth Discovery: Obtaining Authorization Server Metadata */ public static URL getWellKnownMetadataUrl(URL issuerUrl) { try { @@ -109,4 +71,33 @@ public static URL getWellKnownMetadataUrl(URL issuerUrl) { throw new IllegalArgumentException(e); } } + + /** + * Resolves the authorization metadata. + * + * @return metadata + * @throws IOException if the metadata could not be resolved. + */ + public Metadata resolve() throws IOException { + + try { + Response response = httpClient.prepareGet(metadataUrl.toString()) + .addHeader(HttpHeaderNames.ACCEPT, HttpHeaderValues.APPLICATION_JSON) + .execute() + .toCompletableFuture() + .get(); + + Metadata metadata; + try (InputStream inputStream = response.getResponseBodyAsStream()) { + metadata = this.objectReader.readValue(inputStream); + } + return metadata; + + } catch (IOException | InterruptedException | ExecutionException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new IOException("Cannot obtain authorization metadata from " + metadataUrl, e); + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java index f4e4c770e67fc..cb4c2a551d01e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java @@ -27,12 +27,8 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.AsyncHttpClientConfig; -import org.asynchttpclient.DefaultAsyncHttpClient; -import org.asynchttpclient.DefaultAsyncHttpClientConfig; import org.asynchttpclient.Response; /** @@ -40,30 +36,11 @@ */ public class TokenClient implements ClientCredentialsExchanger { - protected static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10; - protected static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30; - private final URL tokenUrl; private final AsyncHttpClient httpClient; - public TokenClient(URL tokenUrl) { - this(tokenUrl, null); - } - - TokenClient(URL tokenUrl, AsyncHttpClient httpClient) { - if (httpClient == null) { - DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); - confBuilder.setCookieStore(null); - confBuilder.setUseProxyProperties(true); - confBuilder.setFollowRedirect(true); - confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000); - confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000); - confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); - AsyncHttpClientConfig config = confBuilder.build(); - this.httpClient = new DefaultAsyncHttpClient(config); - } else { - this.httpClient = httpClient; - } + public TokenClient(URL tokenUrl, AsyncHttpClient httpClient) { + this.httpClient = httpClient; this.tokenUrl = tokenUrl; } @@ -74,6 +51,7 @@ public void close() throws Exception { /** * Constructing http request parameters. + * * @param req object with relevant request parameters * @return Generate the final request body from a map. */ @@ -97,6 +75,7 @@ String buildClientCredentialsBody(ClientCredentialsExchangeRequest req) { /** * Performs a token exchange using client credentials. + * * @param req the client credentials request details. * @return a token result * @throws TokenExchangeException @@ -115,24 +94,26 @@ public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest re .get(); switch (res.getStatusCode()) { - case 200: - return ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(), - TokenResult.class); - - case 400: // Bad request - case 401: // Unauthorized - throw new TokenExchangeException( - ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(), - TokenError.class)); - - default: - throw new IOException( - "Failed to perform HTTP request. res: " + res.getStatusCode() + " " + res.getStatusText()); + case 200: + return ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(), + TokenResult.class); + + case 400: // Bad request + case 401: // Unauthorized + throw new TokenExchangeException( + ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(), + TokenError.class)); + + default: + throw new IOException( + "Failed to perform HTTP request. res: " + res.getStatusCode() + " " + res.getStatusText()); } - } catch (InterruptedException | ExecutionException e1) { + if (e1 instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new IOException(e1); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java new file mode 100644 index 0000000000000..602aafa7b6c91 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import static org.testng.Assert.assertTrue; +import java.io.IOException; +import java.net.URL; +import java.time.Duration; +import org.apache.pulsar.client.api.Authentication; +import org.testng.annotations.Test; + +public class AuthenticationFactoryOAuth2Test { + + @Test + public void testBuilder() throws IOException { + URL issuerUrl = new URL("http://localhost"); + URL credentialsUrl = new URL("http://localhost"); + String audience = "audience"; + String scope = "scope"; + Duration connectTimeout = Duration.parse("PT11S"); + Duration readTimeout = Duration.ofSeconds(31); + String trustCertsFilePath = null; + try (Authentication authentication = + AuthenticationFactoryOAuth2.clientCredentialsBuilder().issuerUrl(issuerUrl) + .credentialsUrl(credentialsUrl).audience(audience).scope(scope) + .connectTimeout(connectTimeout).readTimeout(readTimeout) + .trustCertsFilePath(trustCertsFilePath).build()) { + assertTrue(authentication instanceof AuthenticationOAuth2); + } + } + + @Test + public void testClientCredentials() throws IOException { + URL issuerUrl = new URL("http://localhost"); + URL credentialsUrl = new URL("http://localhost"); + String audience = "audience"; + try (Authentication authentication = + AuthenticationFactoryOAuth2.clientCredentials(issuerUrl, credentialsUrl, audience)) { + assertTrue(authentication instanceof AuthenticationOAuth2); + } + } + +}