OAuth Configuration Reference for Confluent Cloud Clients¶
This section provides a comprehensive reference for OAuth configuration parameters across different client types and platforms.
Java client parameters¶
Parameter | Description | Default | Required |
---|---|---|---|
sasl.mechanism |
SASL mechanism for OAuth authentication | OAUTHBEARER | Yes |
sasl.jaas.config |
JAAS configuration for OAuth login module | None | Yes |
sasl.login.callback.handler.class |
OAuth callback handler class | io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler | Yes |
sasl.login.connect.timeout.ms |
Connection timeout for token requests | 10000 | No |
sasl.login.read.timeout.ms |
Read timeout for token requests | 10000 | No |
sasl.login.retry.backoff.ms |
Initial retry backoff for failed requests | 100 | No |
sasl.login.retry.backoff.max.ms |
Maximum retry backoff for failed requests | 10000 | No |
oauth.token.endpoint.uri |
OAuth token endpoint URL | None | Yes |
oauth.client.id |
OAuth client ID | None | Yes |
oauth.client.secret |
OAuth client secret | None | Yes |
oauth.scope |
OAuth scopes for token request | None | No |
oauth.logical.cluster |
Logical cluster ID for Confluent Cloud | None | Yes |
oauth.identity.pool.id |
Identity pool ID for Confluent Cloud | None | No |
JAAS configuration parameters¶
Parameter | Description | Default | Required |
---|---|---|---|
oauth.token.endpoint.uri |
OAuth token endpoint URL | None | Yes |
oauth.client.id |
OAuth client ID | None | Yes |
oauth.client.secret |
OAuth client secret | None | Yes |
oauth.scope |
OAuth scopes for token request | None | No |
oauth.logical.cluster |
Logical cluster ID for Confluent Cloud | None | Yes |
oauth.identity.pool.id |
Identity pool ID for Confluent Cloud | None | No |
Python client parameters¶
Parameter | Description | Default | Required |
---|---|---|---|
sasl.mechanism |
SASL mechanism for OAuth authentication | OAUTHBEARER | Yes |
sasl.oauth.token.endpoint.uri |
OAuth token endpoint URL | None | Yes |
sasl.oauth.client.id |
OAuth client ID | None | Yes |
sasl.oauth.client.secret |
OAuth client secret | None | Yes |
sasl.oauth.scope |
OAuth scopes for token request | None | No |
sasl.oauth.logical.cluster |
Logical cluster ID for Confluent Cloud | None | Yes |
sasl.oauth.identity.pool.id |
Identity pool ID for Confluent Cloud | None | No |
sasl.oauth.connect.timeout.ms |
Connection timeout for token requests | 10000 | No |
sasl.oauth.read.timeout.ms |
Read timeout for token requests | 10000 | No |
.NET client parameters¶
Parameter | Description | Default | Required |
---|---|---|---|
sasl.mechanism |
SASL mechanism for OAuth authentication | OAUTHBEARER | Yes |
sasl.oauth.token.endpoint.uri |
OAuth token endpoint URL | None | Yes |
sasl.oauth.client.id |
OAuth client ID | None | Yes |
sasl.oauth.client.secret |
OAuth client secret | None | Yes |
sasl.oauth.scope |
OAuth scopes for token request | None | No |
sasl.oauth.logical.cluster |
Logical cluster ID for Confluent Cloud | None | Yes |
sasl.oauth.identity.pool.id |
Identity pool ID for Confluent Cloud | None | No |
sasl.oauth.connect.timeout.ms |
Connection timeout for token requests | 10000 | No |
sasl.oauth.read.timeout.ms |
Read timeout for token requests | 10000 | No |
Go client parameters¶
Parameter | Description | Default | Required |
---|---|---|---|
sasl.mechanism |
SASL mechanism for OAuth authentication | OAUTHBEARER | Yes |
sasl.oauth.token.endpoint.uri |
OAuth token endpoint URL | None | Yes |
sasl.oauth.client.id |
OAuth client ID | None | Yes |
sasl.oauth.client.secret |
OAuth client secret | None | Yes |
sasl.oauth.scope |
OAuth scopes for token request | None | No |
sasl.oauth.logical.cluster |
Logical cluster ID for Confluent Cloud | None | Yes |
sasl.oauth.identity.pool.id |
Identity pool ID for Confluent Cloud | None | No |
sasl.oauth.connect.timeout.ms |
Connection timeout for token requests | 10000 | No |
sasl.oauth.read.timeout.ms |
Read timeout for token requests | 10000 | No |
Schema Registry client parameters¶
Parameter | Description | Default | Required |
---|---|---|---|
bearer.auth.credentials.source |
Authentication method. Use OAUTHBEARER for OAuth |
None | Yes |
bearer.auth.issuer.endpoint.url |
OAuth token endpoint URL | None | Yes |
bearer.auth.client.id |
OAuth client ID | None | Yes |
bearer.auth.client.secret |
OAuth client secret | None | Yes |
bearer.auth.scope |
OAuth scopes for token request | None | No |
bearer.auth.logical.cluster |
Schema Registry logical cluster ID (lsrc-xxxxx ) |
None | Yes |
bearer.auth.identity.pool.id |
Identity pool ID for Confluent Cloud | None | No |
Configuration validation¶
When configuring OAuth authentication, validate your configuration:
- Required parameters: Ensure all required parameters are provided.
- URL format: Verify token endpoint URLs are properly formatted
- Credentials: Validate client ID and secret are correct
- Network access: Confirm network connectivity to OAuth provider
- Permissions: Verify client has appropriate scopes and permissions
// Configuration validation example
public static void validateOAuthConfig(Properties props) {
// Check required parameters
String[] required = {
"sasl.mechanism",
"sasl.jaas.config",
"oauth.token.endpoint.uri",
"oauth.client.id",
"oauth.client.secret",
"oauth.logical.cluster"
};
for (String param : required) {
if (!props.containsKey(param)) {
throw new IllegalArgumentException("Missing required parameter: " + param);
}
}
// Validate timeout values
int connectTimeout = Integer.parseInt(props.getProperty("sasl.login.connect.timeout.ms", "10000"));
int readTimeout = Integer.parseInt(props.getProperty("sasl.login.read.timeout.ms", "10000"));
if (connectTimeout < 1000 || readTimeout < 1000) {
throw new IllegalArgumentException("Timeout values must be at least 1000ms");
}
}
Configuration templates¶
Use configuration templates for consistent setup:
# oauth-config-template.yaml
kafka:
oauth:
token_endpoint: ${OAUTH_TOKEN_ENDPOINT}
client_id: ${OAUTH_CLIENT_ID}
client_secret: ${OAUTH_CLIENT_SECRET}
scope: ${OAUTH_SCOPE}
logical_cluster: ${LOGICAL_CLUSTER_ID}
identity_pool: ${IDENTITY_POOL_ID}
timeouts:
connect: 10000
read: 10000
retry_backoff: 100
retry_max: 10000
Environment-specific configurations¶
Development environment¶
# Development OAuth configuration
sasl.oauthbearer.token.endpoint.url=http://dev-auth.example.com/oauth2/token
sasl.oauthbearer.client.id=dev-kafka-client
sasl.oauthbearer.client.secret=${DEV_CLIENT_SECRET}
sasl.oauthbearer.scope=kafka:dev
Staging environment¶
# Staging OAuth configuration
sasl.oauthbearer.token.endpoint.url=http://staging-auth.example.com/oauth2/token
sasl.oauthbearer.client.id=staging-kafka-client
sasl.oauthbearer.client.secret=${STAGING_CLIENT_SECRET}
sasl.oauthbearer.scope=kafka:staging
Production environment¶
# Production OAuth configuration
sasl.oauthbearer.token.endpoint.url=http://prod-auth.example.com/oauth2/token
sasl.oauthbearer.client.id=prod-kafka-client
sasl.oauthbearer.client.secret=${PROD_CLIENT_SECRET}
sasl.oauthbearer.scope=kafka:prod
Advanced configuration options¶
Custom token providers¶
For advanced use cases, implement custom token providers:
public class CustomTokenProvider implements BearerAuthCredentialProvider {
private final String tokenEndpoint;
private final String clientId;
private final String clientSecret;
private final Map<String, String> additionalHeaders;
public CustomTokenProvider(String tokenEndpoint, String clientId,
String clientSecret, Map<String, String> additionalHeaders) {
this.tokenEndpoint = tokenEndpoint;
this.clientId = clientId;
this.clientSecret = clientSecret;
this.additionalHeaders = additionalHeaders;
}
@Override
public String getBearerToken(URL url) throws IOException {
// Custom token retrieval logic
HttpURLConnection connection = (HttpURLConnection) new URL(tokenEndpoint).openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
// Add custom headers
if (additionalHeaders != null) {
for (Map.Entry<String, String> header : additionalHeaders.entrySet()) {
connection.setRequestProperty(header.getKey(), header.getValue());
}
}
// Set up authentication
String auth = clientId + ":" + clientSecret;
String encodedAuth = Base64.getEncoder().encodeToString(auth.getBytes());
connection.setRequestProperty("Authorization", "Basic " + encodedAuth);
// Send request
String postData = "grant_type=client_credentials";
connection.setDoOutput(true);
try (OutputStream os = connection.getOutputStream()) {
os.write(postData.getBytes());
}
// Parse response
try (BufferedReader br = new BufferedReader(
new InputStreamReader(connection.getInputStream()))) {
StringBuilder response = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {
response.append(line);
}
// Parse JSON response
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(response.toString());
return jsonNode.get("access_token").asText();
}
}
}
Retry and circuit breaker patterns¶
public class CircuitBreaker {
private final AtomicInteger failureCount = new AtomicInteger(0);
private final AtomicLong lastFailureTime = new AtomicLong(0);
private final int threshold;
private final long timeout;
public CircuitBreaker(int threshold, long timeout) {
this.threshold = threshold;
this.timeout = timeout;
}
public boolean isOpen() {
long now = System.currentTimeMillis();
if (failureCount.get() >= threshold) {
if (now - lastFailureTime.get() < timeout) {
return true; // Circuit is open
} else {
// Reset after timeout
failureCount.set(0);
return false;
}
}
return false;
}
public void recordFailure() {
failureCount.incrementAndGet();
lastFailureTime.set(System.currentTimeMillis());
}
public void recordSuccess() {
failureCount.set(0);
}
}
public class RetryTemplate {
private final int maxRetries;
private final long backoffMs;
public RetryTemplate(int maxRetries, long backoffMs) {
this.maxRetries = maxRetries;
this.backoffMs = backoffMs;
}
public <T> T execute(Supplier<T> operation) throws Exception {
int attempts = 0;
while (attempts < maxRetries) {
try {
return operation.get();
} catch (Exception e) {
attempts++;
if (attempts >= maxRetries) {
throw e;
}
Thread.sleep(backoffMs * attempts);
}
}
throw new RuntimeException("Max retries exceeded");
}
}
Deployment considerations¶
Container deployment¶
# Dockerfile for OAuth-enabled Kafka client
FROM openjdk:11-jre-slim
# Copy application
COPY target/oauth-kafka-client.jar /app/
# Set environment variables
ENV OAUTH_TOKEN_ENDPOINT=http://your-oauth-provider.com/oauth2/token
ENV OAUTH_CLIENT_ID=your-client-id
ENV OAUTH_CLIENT_SECRET=your-client-secret
ENV OAUTH_SCOPE=kafka:read kafka:write
ENV LOGICAL_CLUSTER_ID=lkc-xxxxx
ENV IDENTITY_POOL_ID=pool-xxxxx
# Run application
CMD ["java", "-jar", "/app/oauth-kafka-client.jar"]
Kubernetes deployment¶
# kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: oauth-kafka-client
spec:
replicas: 3
selector:
matchLabels:
app: oauth-kafka-client
template:
metadata:
labels:
app: oauth-kafka-client
spec:
containers:
- name: kafka-client
image: your-registry/oauth-kafka-client:latest
env:
- name: OAUTH_TOKEN_ENDPOINT
valueFrom:
secretKeyRef:
name: oauth-secrets
key: token-endpoint
- name: OAUTH_CLIENT_ID
valueFrom:
secretKeyRef:
name: oauth-secrets
key: client-id
- name: OAUTH_CLIENT_SECRET
valueFrom:
secretKeyRef:
name: oauth-secrets
key: client-secret
- name: OAUTH_SCOPE
value: "kafka:read kafka:write"
- name: LOGICAL_CLUSTER_ID
value: "lkc-xxxxx"
- name: IDENTITY_POOL_ID
value: "pool-xxxxx"
CI/CD integration¶
# .github/workflows/deploy.yml
name: Deploy OAuth Kafka Client
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up JDK
uses: actions/setup-java@v2
with:
java-version: '11'
- name: Build with Maven
run: mvn clean package
- name: Build Docker image
run: docker build -t oauth-kafka-client .
- name: Deploy to Kubernetes
run: |
kubectl apply -f kubernetes-deployment.yaml
kubectl rollout restart deployment/oauth-kafka-client
Integration testing¶
Mock OAuth server
@Test
public void testOAuthIntegration() {
// Start mock OAuth server
MockOAuthServer mockServer = new MockOAuthServer();
mockServer.start();
try {
// Configure client with mock server
Properties props = new Properties();
props.put("sasl.mechanism", "OAUTHBEARER");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
"oauth.token.endpoint.uri=\"" + mockServer.getTokenEndpoint() + "\" " +
"oauth.client.id=\"test-client\" " +
"oauth.client.secret=\"test-secret\";");
// Test authentication
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// ... test operations
} finally {
mockServer.stop();
}
}
Performance testing¶
@Test
public void testOAuthPerformance() {
// Measure token retrieval performance
long startTime = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
// Perform OAuth token request
String token = oauthProvider.getToken();
assertNotNull(token);
}
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
// Assert performance requirements
assertTrue("OAuth performance below threshold", duration < 5000);
}