Configure Go Clients for OAuth/OIDC on Confluent Cloud¶
Prerequisites¶
Before you begin, ensure you have the following:
Language and tooling¶
- Go 1.18+
- confluent-kafka-go: v2.0.0 or later
- Confluent Platform: 7.2.1 or later; 7.1.3 or later
Confluent Cloud environment¶
- OAuth setup in Confluent Cloud - Configure OAuth authentication
Credentials and identifiers¶
- Client ID - Your application’s identifier (like a username)
- Client secret - Your application’s password for OAuth
- Token endpoint URL - Where to request access tokens
- Scopes - Permissions your application needs (for example,
kafka:read kafka:write
) - Cluster ID (
lkc-xxxxx
) - Your Kafka cluster identifier - Identity pool ID (
pool-xxxxx
) - If using identity pools
Client library¶
Install the latest version with OAuth support:
go get github.com/confluentinc/confluent-kafka-go/v2/kafka
Configure Kafka Go clients¶
Go Kafka clients can authenticate to Confluent Cloud clusters using the OAuth 2.0 protocol. The Go client uses the confluent-kafka-go library which is based on librdkafka.
Configuration approach¶
Required parameters¶
Parameter | Description | Required Value |
---|---|---|
sasl.mechanism |
Sets the authentication method for OAuth. | OAUTHBEARER |
sasl.oauth.token.endpoint.uri |
OAuth token endpoint URL from your identity provider. | (Provided by IdP) |
sasl.oauth.client.id |
OAuth client ID from your identity provider. | (Provided by IdP) |
sasl.oauth.client.secret |
OAuth client secret from your identity provider. | (Provided by IdP) |
sasl.oauth.scope |
OAuth scopes for token request (optional). | (Optional) |
sasl.oauth.logical.cluster |
Logical cluster ID for Confluent Cloud. | lkc-xxxxx |
sasl.oauth.identity.pool.id |
Identity pool ID for Confluent Cloud (optional). | (Optional) |
sasl.oauth.connect.timeout.ms |
Connection timeout for token requests (optional). | 10000 (default) |
sasl.oauth.read.timeout.ms |
Read timeout for token requests (optional). | 10000 (default) |
oauth_cb |
Go function for OAuth token retrieval. | Your callback function |
OAuth callback function¶
The Go client requires an OAuth callback function with the following signature:
func oauthCallback(oauthConfig string) (string, error) {
// Implementation here
return "", nil
}
The callback function receives a JSON string containing the OAuth configuration parameters
from librdkafka. This includes all the sasl.oauth.*
parameters that were set in the
client configuration. The function should parse this JSON, make the OAuth token request,
and return the access token as a string.
Configuration example¶
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
type OAuthConfig struct {
TokenEndpoint string `json:"token_endpoint"`
ClientID string `json:"client_id"`
ClientSecret string `json:"client_secret"`
Scope string `json:"scope"`
}
type TokenResponse struct {
AccessToken string `json:"access_token"`
TokenType string `json:"token_type"`
ExpiresIn int `json:"expires_in"`
}
func oauthCallback(oauthConfig string) (string, error) {
// Parse OAuth configuration from JSON string
var config OAuthConfig
if err := json.Unmarshal([]byte(oauthConfig), &config); err != nil {
return "", fmt.Errorf("failed to parse OAuth config: %v", err)
}
// Prepare the token request
data := url.Values{}
data.Set("grant_type", "client_credentials")
data.Set("client_id", config.ClientID)
data.Set("client_secret", config.ClientSecret)
if config.Scope != "" {
data.Set("scope", config.Scope)
}
// Create HTTP request
req, err := http.NewRequest("POST", config.TokenEndpoint, strings.NewReader(data.Encode()))
if err != nil {
return "", fmt.Errorf("failed to create request: %v", err)
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
// Send request with timeout
client := &http.Client{
Timeout: 10 * time.Second,
}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("failed to send request: %v", err)
}
defer resp.Body.Close()
// Read the body ONCE
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read response body: %v", err)
}
if resp.StatusCode != http.StatusOK {
// Use the body we already read for the error message
return "", fmt.Errorf("OAuth request failed with status %d: %s", resp.StatusCode, string(body))
}
// Parse the body we already read
var tokenResp TokenResponse
if err := json.Unmarshal(body, &tokenResp); err != nil {
return "", fmt.Errorf("failed to parse token response: %v", err)
}
return tokenResp.AccessToken, nil
}
// Enhanced OAuth callback with retry logic
func oauthCallbackWithRetry(oauthConfig string) (string, error) {
maxRetries := 3
baseDelay := time.Second
for attempt := 1; attempt <= maxRetries; attempt++ {
token, err := oauthCallback(oauthConfig)
if err == nil {
return token, nil
}
fmt.Printf("OAuth token request failed (attempt %d/%d): %v\n", attempt, maxRetries, err)
if attempt < maxRetries {
delay := time.Duration(float64(baseDelay) * float64(1<<(attempt-1))) // Exponential backoff
time.Sleep(delay)
}
}
return "", fmt.Errorf("failed to obtain OAuth token after %d attempts", maxRetries)
}
func main() {
// Producer configuration with integrated retry logic
producerConfig := &kafka.ConfigMap{
"bootstrap.servers": "your-cluster-endpoint:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.oauth.token.endpoint.uri": "http://your-oauth-provider.com/oauth2/token",
"sasl.oauth.client.id": "your-client-id",
"sasl.oauth.client.secret": "your-client-secret",
"sasl.oauth.scope": "kafka:read kafka:write",
"sasl.oauth.logical.cluster": "lkc-xxxxx",
"sasl.oauth.identity.pool.id": "pool-xxxxx",
"sasl.oauth.connect.timeout.ms": 10000,
"sasl.oauth.read.timeout.ms": 10000,
"oauth_cb": oauthCallbackWithRetry, // Using enhanced callback with retry
}
// Create producer
producer, err := kafka.NewProducer(producerConfig)
if err != nil {
panic(fmt.Sprintf("Failed to create producer: %v", err))
}
defer producer.Close()
// Consumer configuration with integrated retry logic
consumerConfig := &kafka.ConfigMap{
"bootstrap.servers": "your-cluster-endpoint:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.oauth.token.endpoint.uri": "http://your-oauth-provider.com/oauth2/token",
"sasl.oauth.client.id": "your-client-id",
"sasl.oauth.client.secret": "your-client-secret",
"sasl.oauth.scope": "kafka:read kafka:write",
"sasl.oauth.logical.cluster": "lkc-xxxxx",
"sasl.oauth.identity.pool.id": "pool-xxxxx",
"oauth_cb": oauthCallbackWithRetry, // Using enhanced callback with retry
"group.id": "your-consumer-group",
"auto.offset.reset": "earliest",
}
// Create consumer
consumer, err := kafka.NewConsumer(consumerConfig)
if err != nil {
panic(fmt.Sprintf("Failed to create consumer: %v", err))
}
defer consumer.Close()
}
Test your configuration¶
Enable debug logging to troubleshoot OAuth issues:
// Add to your configuration config := &kafka.ConfigMap{ // ... your OAuth config "debug": "security,protocol,broker", }
Test with a simple producer:
// Test producer producer, err := kafka.NewProducer(config) if err != nil { log.Fatal("Failed to create producer: ", err) } defer producer.Close() topic := "test-topic" message := "test-message" producer.ProduceChannel() <- &kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(message), } // Wait for delivery for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error) } else { fmt.Printf("Message delivered to %s [%d] - OAuth is working!\n", *ev.TopicPartition.Topic, ev.TopicPartition.Partition) } return } }
Check for common errors: - “SASL authentication failed” - Check your OAuth credentials and endpoint - “Invalid token” - Verify your callback function is returning a valid token - “Connection timeout” - Check your bootstrap servers and network connectivity
Verify token refresh - The client should automatically refresh tokens when they expire
Custom OAuth implementations¶
For advanced use cases, implement custom token providers:
import (
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"fmt"
"os"
"time"
"github.com/golang-jwt/jwt/v4"
)
type JWTCustomTokenProvider struct {
privateKey *rsa.PrivateKey
clientID string
tokenEndpoint string
}
func NewJWTCustomTokenProvider(privateKeyPath, clientID, tokenEndpoint string) (*JWTCustomTokenProvider, error) {
// Load private key
keyBytes, err := os.ReadFile(privateKeyPath)
if err != nil {
return nil, fmt.Errorf("failed to read private key: %v", err)
}
block, _ := pem.Decode(keyBytes)
if block == nil {
return nil, fmt.Errorf("failed to decode PEM block")
}
privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
return nil, fmt.Errorf("failed to parse private key: %v", err)
}
return &JWTCustomTokenProvider{
privateKey: privateKey,
clientID: clientID,
tokenEndpoint: tokenEndpoint,
}, nil
}
func (p *JWTCustomTokenProvider) CreateJwtAssertion(audience, issuer string) (string, error) {
now := time.Now()
claims := jwt.MapClaims{
"iss": issuer,
"sub": p.clientID,
"aud": audience,
"iat": now.Unix(),
"exp": now.Add(time.Hour).Unix(),
"jti": fmt.Sprintf("jwt-%d", now.Unix()),
}
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
return token.SignedString(p.privateKey)
}
func (p *JWTCustomTokenProvider) GetToken(audience, issuer string) (string, error) {
assertion, err := p.CreateJwtAssertion(audience, issuer)
if err != nil {
return "", fmt.Errorf("failed to create JWT assertion: %v", err)
}
data := url.Values{}
data.Set("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer")
data.Set("assertion", assertion)
req, err := http.NewRequest("POST", p.tokenEndpoint, strings.NewReader(data.Encode()))
if err != nil {
return "", fmt.Errorf("failed to create request: %v", err)
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("failed to send request: %v", err)
}
defer resp.Body.Close()
// Read the body ONCE
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read response body: %v", err)
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("token request failed with status %d: %s", resp.StatusCode, string(body))
}
var tokenResp TokenResponse
if err := json.Unmarshal(body, &tokenResp); err != nil {
return "", fmt.Errorf("failed to parse token response: %v", err)
}
return tokenResp.AccessToken, nil
}
// NewCustomOAuthCallback creates and returns a Kafka OAuth callback function
// that uses a JWT provider for token retrieval.
func NewCustomOAuthCallback(privateKeyPath, clientID, tokenEndpoint string) (func(string) (string, error), error) {
jwtProvider, err := NewJWTCustomTokenProvider(
privateKeyPath,
clientID,
tokenEndpoint,
)
if err != nil {
return nil, fmt.Errorf("failed to create JWT provider: %v", err)
}
// Return a closure that has access to the jwtProvider
return func(oauthConfig string) (string, error) {
return jwtProvider.GetToken(
"http://your-oauth-provider.com", // audience
"your-client-id", // issuer
)
}, nil
}
// --- In your main application setup ---
// 1. Create the callback function once
customCallback, err := NewCustomOAuthCallback(
"/path/to/private_key.pem",
"your-client-id",
"http://your-oauth-provider.com/oauth2/token",
)
if err != nil {
panic(err)
}
// 2. Use it in the Kafka configuration
producerConfig := &kafka.ConfigMap{
// ... other properties
"oauth_cb": customCallback,
}
Google OIDC integration¶
For Google OIDC integration with Go clients:
import (
"context"
"fmt"
"time"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
)
func googleOAuthCallback(oauthConfig string) (string, error) {
// Load service account credentials
credentials, err := google.CredentialsFromFile(
"path/to/service-account-key.json",
"http://www.googleapis.com/auth/cloud-platform",
)
if err != nil {
return "", fmt.Errorf("failed to load credentials: %v", err)
}
// Get token
token, err := credentials.TokenSource.Token()
if err != nil {
return "", fmt.Errorf("failed to get token: %v", err)
}
return token.AccessToken, nil
}
// Google OIDC configuration
googleConfig := &kafka.ConfigMap{
"bootstrap.servers": "your-cluster-endpoint:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.oauth.logical.cluster": "lkc-xxxxx",
"sasl.oauth.identity.pool.id": "pool-xxxxx",
"oauth_cb": googleOAuthCallback,
}
Note
The googleOAuthCallback
function loads credentials directly from the
service account JSON file and ignores the oauthConfig
string passed
by the client. For this reason, properties like
sasl.oauth.token.endpoint.uri
are not needed in the ConfigMap
when using this callback.
Configure Schema Registry Go clients¶
The Schema Registry Go client authenticates using a built-in OAuth mechanism.
You only need to provide the correct configuration parameters, and the client
handles the token request automatically. It does not use the oauth_cb
callback.
Required parameters¶
The following parameters must be included in the Schema Registry client configuration:
Parameter | Description |
---|---|
bearer.auth.credentials.source |
Set to OAUTHBEARER for OAuth authentication |
bearer.auth.issuer.endpoint.url |
OAuth token endpoint URL from your identity provider |
bearer.auth.client.id |
OAuth client ID from your identity provider |
bearer.auth.client.secret |
OAuth client secret from your identity provider |
bearer.auth.scope |
OAuth scopes for token request (optional) |
bearer.auth.logical.cluster |
Logical cluster ID for Schema Registry (lsrc-xxxxx) |
bearer.auth.identity.pool.id |
Identity pool ID for Confluent Cloud (optional) |
Configuration example¶
import (
"github.com/confluentinc/confluent-kafka-go/schemaregistry"
)
// Schema Registry configuration with OAuth
schemaRegistryConfig := map[string]string{
"url": "http://<your-schema-registry-endpoint>",
"bearer.auth.credentials.source": "OAUTHBEARER",
"bearer.auth.issuer.endpoint.url": "http://<your-idp.com>/oauth2/token",
"bearer.auth.client.id": "<your-client-id>",
"bearer.auth.client.secret": "<your-client-secret>",
"bearer.auth.scope": "schema:read",
"bearer.auth.logical.cluster": "<lsrc-xxxxx>",
"bearer.auth.identity.pool.id": "<pool-yyyyy>",
}
// Create Schema Registry client
schemaRegistryClient, err := schemaregistry.NewClient(schemaRegistryConfig)
if err != nil {
panic(fmt.Sprintf("Failed to create Schema Registry client: %v", err))
}
Inherit OAuth configuration from Kafka client¶
You can also inherit the OAuth configuration from your Kafka client:
// Schema Registry configuration inheriting from Kafka client
schemaRegistryConfig := map[string]string{
"url": "http://<your-schema-registry-endpoint>",
"bearer.auth.credentials.source": "SASL_OAUTHBEARER_INHERIT",
"bearer.auth.logical.cluster": "<lsrc-xxxxx>",
"bearer.auth.identity.pool.id": "<pool-yyyyy>",
}
Custom token provider¶
For custom OAuth implementations, use the following configuration:
schemaRegistryConfig := map[string]string{
"url": "http://<your-schema-registry-endpoint>",
"bearer.auth.credentials.source": "CUSTOM",
"bearer.auth.custom.provider.class": "<fully-qualified-class-name>",
"bearer.auth.logical.cluster": "<lsrc-resource-id>",
"bearer.auth.identity.pool.id": "<identity-pool-id>",
}
Troubleshoot Go OAuth clients¶
Common issues and solutions for Go OAuth clients:
Authentication failures¶
- Verify client ID and secret are correct
- Check token endpoint URL is accessible
- Ensure logical cluster ID is valid
- Validate identity pool ID if used
Network issues¶
- Confirm network connectivity to OAuth provider
- Check firewall rules allow OAuth traffic
- Verify SSL certificate validation
Configuration issues¶
- Ensure all required parameters are provided
- Validate OAuth callback function signature
- Check timeout values are reasonable
Debug logging¶
Enable librdkafka’s internal debug logs for troubleshooting OAuth issues
by setting the debug
property in your ConfigMap
. This provides
far more detail than standard Go logging for client-broker communication.
producerConfig := &kafka.ConfigMap{
// ... other properties
"debug": "security,protocol,broker",
}
Common Go-specific issues¶
- Import errors: Ensure
confluent-kafka-go
is properly installed - Callback function: Verify the OAuth callback function signature matches requirements
- Token refresh: Implement proper token refresh logic with error handling
- Thread safety: Ensure OAuth callback is thread-safe for multi-threaded applications
Performance considerations¶
- Implement token caching to avoid repeated OAuth requests
- Use connection pooling for HTTP requests to OAuth provider
- Consider implementing token refresh before expiration