Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,101 @@ public final class AuthenticationFactoryOAuth2 {
/**
* Authenticate with client credentials.
*
* @param issuerUrl the issuer URL
* @param issuerUrl the issuer URL
* @param credentialsUrl the credentials URL
* @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0.
* @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0.
* @return an Authentication object
*/
public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) {
return clientCredentials(issuerUrl, credentialsUrl, audience, null);
return clientCredentialsBuilder().issuerUrl(issuerUrl).credentialsUrl(credentialsUrl).audience(audience)
.build();
}

/**
* Authenticate with client credentials.
*
* @param issuerUrl the issuer URL
* @param issuerUrl the issuer URL
* @param credentialsUrl the credentials URL
* @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0.
* @param scope An optional field. The value of the scope parameter is expressed as a list of space-delimited,
* case-sensitive strings. The strings are defined by the authorization server.
* If the value contains multiple space-delimited strings, their order does not matter,
* and each string adds an additional access range to the requested scope.
* From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
* @param audience An optional field. The audience identifier used by some Identity Providers, like Auth0.
* @param scope An optional field. The value of the scope parameter is expressed as a list of
* space-delimited,
* case-sensitive strings. The strings are defined by the authorization server.
* If the value contains multiple space-delimited strings, their order does not matter,
* and each string adds an additional access range to the requested scope.
* From here: https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
* @return an Authentication object
*/
public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience, String scope) {
ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
.issuerUrl(issuerUrl)
.privateKey(credentialsUrl.toExternalForm())
.audience(audience)
.scope(scope)
.build();
return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
return clientCredentialsBuilder().issuerUrl(issuerUrl).credentialsUrl(credentialsUrl).audience(audience)
.scope(scope).build();
}

public static ClientCredentialsBuilder clientCredentialsBuilder() {
return new ClientCredentialsBuilder();
}

public static class ClientCredentialsBuilder {

private URL issuerUrl;
private URL credentialsUrl;
private String audience;
private String scope;
private Integer connectTimeout;
private Integer readTimeout;
private String trustCertsFilePath;

private ClientCredentialsBuilder() {
}

public ClientCredentialsBuilder issuerUrl(URL issuerUrl) {
this.issuerUrl = issuerUrl;
return this;
}

public ClientCredentialsBuilder credentialsUrl(URL credentialsUrl) {
this.credentialsUrl = credentialsUrl;
return this;
}

public ClientCredentialsBuilder audience(String audience) {
this.audience = audience;
return this;
}

public ClientCredentialsBuilder scope(String scope) {
this.scope = scope;
return this;
}

public ClientCredentialsBuilder connectTimeout(Integer connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}

public ClientCredentialsBuilder readTimeout(Integer readTimeout) {
this.readTimeout = readTimeout;
return this;
}

public ClientCredentialsBuilder trustCertsFilePath(String trustCertsFilePath) {
this.trustCertsFilePath = trustCertsFilePath;
return this;
}

public Authentication build() {
ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
.issuerUrl(issuerUrl)
.privateKey(credentialsUrl.toExternalForm())
.audience(audience)
.scope(scope)
.connectTimeout(connectTimeout)
.readTimeout(readTimeout)
.trustCertsFilePath(trustCertsFilePath)
.build();
return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
}

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -60,62 +60,17 @@ class ClientCredentialsFlow extends FlowBase {
private boolean initialized = false;

@Builder
public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope) {
super(issuerUrl);
public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope,
Integer connectTimeout, Integer readTimeout, String trustCertsFilePath) {
super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath);
this.audience = audience;
this.privateKey = privateKey;
this.scope = scope;
}

@Override
public void initialize() throws PulsarClientException {
super.initialize();
assert this.metadata != null;

URL tokenUrl = this.metadata.getTokenEndpoint();
this.exchanger = new TokenClient(tokenUrl);
initialized = true;
}

public TokenResult authenticate() throws PulsarClientException {
// read the private key from storage
KeyFile keyFile;
try {
keyFile = loadPrivateKey(this.privateKey);
} catch (IOException e) {
throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage());
}

// request an access token using client credentials
ClientCredentialsExchangeRequest req = ClientCredentialsExchangeRequest.builder()
.clientId(keyFile.getClientId())
.clientSecret(keyFile.getClientSecret())
.audience(this.audience)
.scope(this.scope)
.build();
TokenResult tr;
if (!initialized) {
initialize();
}
try {
tr = this.exchanger.exchangeClientCredentials(req);
} catch (TokenExchangeException | IOException e) {
throw new PulsarClientException.AuthenticationException("Unable to obtain an access token: "
+ e.getMessage());
}

return tr;
}

@Override
public void close() throws Exception {
if (exchanger != null) {
exchanger.close();
}
}

/**
* Constructs a {@link ClientCredentialsFlow} from configuration parameters.
*
* @param params
* @return
*/
Expand All @@ -125,16 +80,24 @@ public static ClientCredentialsFlow fromParameters(Map<String, String> params) {
// These are optional parameters, so we only perform a get
String scope = params.get(CONFIG_PARAM_SCOPE);
String audience = params.get(CONFIG_PARAM_AUDIENCE);
Integer connectTimeout = parseParameterInt(params, CONFIG_PARAM_CONNECT_TIMEOUT);
Integer readTimeout = parseParameterInt(params, CONFIG_PARAM_READ_TIMEOUT);
String trustCertsFilePath = params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH);

return ClientCredentialsFlow.builder()
.issuerUrl(issuerUrl)
.audience(audience)
.privateKey(privateKeyUrl)
.scope(scope)
.connectTimeout(connectTimeout)
.readTimeout(readTimeout)
.trustCertsFilePath(trustCertsFilePath)
.build();
}

/**
* Loads the private key from the given URL.
*
* @param privateKeyURL
* @return
* @throws IOException
Expand Down Expand Up @@ -162,4 +125,52 @@ private static KeyFile loadPrivateKey(String privateKeyURL) throws IOException {
throw new IOException("Invalid privateKey format", e);
}
}

@Override
public void initialize() throws PulsarClientException {
super.initialize();
assert this.metadata != null;

URL tokenUrl = this.metadata.getTokenEndpoint();
this.exchanger = new TokenClient(tokenUrl, httpClient);
initialized = true;
}

public TokenResult authenticate() throws PulsarClientException {
// read the private key from storage
KeyFile keyFile;
try {
keyFile = loadPrivateKey(this.privateKey);
} catch (IOException e) {
throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage());
}

// request an access token using client credentials
ClientCredentialsExchangeRequest req = ClientCredentialsExchangeRequest.builder()
.clientId(keyFile.getClientId())
.clientSecret(keyFile.getClientSecret())
.audience(this.audience)
.scope(this.scope)
.build();
TokenResult tr;
if (!initialized) {
initialize();
}
try {
tr = this.exchanger.exchangeClientCredentials(req);
} catch (TokenExchangeException | IOException e) {
throw new PulsarClientException.AuthenticationException("Unable to obtain an access token: "
+ e.getMessage());
}

return tr;
}

@Override
public void close() throws Exception {
super.close();
if (exchanger != null) {
exchanger.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,80 @@
*/
package org.apache.pulsar.client.impl.auth.oauth2;

import io.netty.handler.ssl.SslContextBuilder;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import javax.net.ssl.SSLException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;

/**
* An abstract OAuth 2.0 authorization flow.
*/
@Slf4j
abstract class FlowBase implements Flow {

public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout";
public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout";
public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH = "trustCertsFilePath";

protected static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
protected static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;

private static final long serialVersionUID = 1L;

protected final URL issuerUrl;
protected final AsyncHttpClient httpClient;

protected transient Metadata metadata;

protected FlowBase(URL issuerUrl) {
protected FlowBase(URL issuerUrl, Integer connectTimeout, Integer readTimeout, String trustCertsFilePath) {
this.issuerUrl = issuerUrl;
this.httpClient = defaultHttpClient(readTimeout, connectTimeout, trustCertsFilePath);
}

private AsyncHttpClient defaultHttpClient(Integer readTimeout, Integer connectTimeout, String trustCertsFilePath) {
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
confBuilder.setCookieStore(null);
confBuilder.setUseProxyProperties(true);
confBuilder.setFollowRedirect(true);
confBuilder.setConnectTimeout(
getParameterInt(CONFIG_PARAM_CONNECT_TIMEOUT, connectTimeout,
DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000));
confBuilder.setReadTimeout(
getParameterInt(CONFIG_PARAM_READ_TIMEOUT, readTimeout, DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000));
confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
if (StringUtils.isNotBlank(trustCertsFilePath)) {
try {
confBuilder.setSslContext(SslContextBuilder.forClient()
.trustManager(new File(trustCertsFilePath))
.build());
} catch (SSLException e) {
log.error("Could not set " + CONFIG_PARAM_TRUST_CERTS_FILE_PATH, e);
}
}
return new DefaultAsyncHttpClient(confBuilder.build());
}

private int getParameterInt(String name, Integer value, int defaultValue) {
if (value == null) {
log.info("Configuration for [{}] is using the default value: [{}]", name, defaultValue);
return defaultValue;
} else {
log.info("Configuration for [{}] is: [{}]", name, value);
return value;
}
}

public void initialize() throws PulsarClientException {
Expand All @@ -55,7 +104,7 @@ public void initialize() throws PulsarClientException {
}

protected MetadataResolver createMetadataResolver() {
return DefaultMetadataResolver.fromIssuerUrl(issuerUrl);
return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient);
}

static String parseParameterString(Map<String, String> params, String name) {
Expand All @@ -77,4 +126,23 @@ static URL parseParameterUrl(Map<String, String> params, String name) {
throw new IllegalArgumentException("Malformed configuration parameter: " + name);
}
}

static Integer parseParameterInt(Map<String, String> params, String name) {
String value = params.get(name);
if (StringUtils.isNotBlank(value)) {
try {
return Integer.parseInt(value);
} catch (NumberFormatException numberFormatException) {
throw new IllegalArgumentException("Malformed configuration parameter: " + name);
}
}
return null;
}

@Override
public void close() throws Exception {
if (httpClient != null) {
httpClient.close();
}
}
}
Loading
Loading