Configure SSL/TLS for a Confluent Kafka cluster
Throughout the lifecycle of a Kafka deployment, it might stores many sensitive informations which have demanding data privacy concerns and regulations.
To ensure the data stored is in compliance with these requirements, you will need to secure your Kafka deployment with TLS
In this article, we will go through the steps needed to configure SSL/TLS for a Kafka cluster.
Note: this is a follow up on my other articles about security configuration for Confluent Kafka (LDAP authentication and Role-based access control configurations).
TL;DR: Example Configuration File
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
group.initial.rebalance.delay.ms=0
confluent.balancer.enable=true
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
confluent.metrics.reporter.topic.replicas=1
confluent.license.topic.replication.factor=1
confluent.metadata.topic.replication.factor=1
confluent.security.event.logger.exporter.kafka.topic.replicas=1
confluent.balancer.topic.replication.factor=1
default.replication.factor=1
min.insync.replicas=1
## LDAP configuration
# Kafka authenticates to the directory service with the bind user.
ldap.java.naming.provider.url=ldap://openldap.example.com:389
ldap.java.naming.security.authentication=simple
ldap.java.naming.security.credentials=examplePassword
ldap.java.naming.security.principal=cn=exampleUser,ou=KafkaUsers,dc=example,dc=com
# Locate groups
ldap.group.name.attribute=cn
ldap.group.object.class=groupOfNames
ldap.group.member.attribute=member
ldap.group.member.attribute.pattern=CN=(.*),ou=KafkaUsers,dc=example,dc=com
ldap.group.search.base=ou=Group,dc=example,dc=com
ldap.user.search.scope=2
# Locate users. Make sure that these attributes and object classes match what is in your directory service.
ldap.user.name.attribute=cn
ldap.user.object.class=person
ldap.user.search.base=ou=KafkaUsers,dc=example,dc=com
ldap.user.password.attribute=userPassword
## listeners configuration
listeners=BROKER://:9091,OAUTH://:9092
sasl.enabled.mechanisms=PLAIN,OAUTHBEARER
listener.security.protocol.map=BROKER:SASL_SSL,OAUTH:SASL_SSL
# inter-broker configuration
inter.broker.listener.name=BROKER
sasl.mechanism.inter.broker.protocol=PLAIN
listener.name.broker.sasl.enabled.mechanisms=PLAIN
listener.name.broker.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="exampleUser" password="examplePassword";
listener.name.broker.plain.sasl.server.callback.handler.class=io.confluent.security.auth.provider.ldap.LdapAuthenticateCallbackHandler
# OAUTHBEARER authentication configuration
listener.name.oauth.sasl.enabled.mechanisms=OAUTHBEARER
listener.name.oauth.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler
listener.name.oauth.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler
listener.name.oauth.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath=/opt/confluent/mds/mds.pub;
# MDS configuration
authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
confluent.authorizer.access.rule.providers=CONFLUENT,ZK_ACL
confluent.metadata.server.advertised.listeners=http://0.0.0.0:8090
confluent.metadata.server.authentication.method=BEARER
confluent.metadata.server.listeners=http://0.0.0.0:8090
confluent.metadata.server.token.key.path=/opt/confluent/mds/mds.pem
super.users=User:exampleUser
# SSL configuration
ssl.truststore.location=/opt/confluent/ssl/client.truststore.jks
ssl.truststore.password=changeit
ssl.keystore.location=/opt/confluent/ssl/client.keystore.jks
ssl.keystore.password=changeit
ssl.key.password=changeit
About SSL
Before we begin working on configuring SSL/TLS for a Kafka cluster, let’s begin with some refresher on SSL.
The Components of a SSL connection
There are 03 main concepts in SSL:
- a
Private Key
, which can be thought of as a password to create SSL connection - a
Public Key
, a client identifier/username for a SSL connection - one or multiple
Certificate Authority
(CA), aPublic Key
validator
How SSL works
The process of creating a SSL session between Client and Server can be simplified to 05 steps:
01. Client connects to Server on a SSL port (e.g: 443)
02. Server sends its Public Key to Client
03. Client verifies the Server’s Public Key
04. Client generates a Session Key using Server’s Public Key
05. Server establishes a SSL-secured session
SSL architecture with Certificate Authority
Every client have a list of trusted certificates pre-installed in its system (this is called a truststore
). For the Client to trust a server’s Public Key as a valid certificate, it must already have that Public Key in its built-in truststore.
However, the Client’s built-in truststore is not updated dynamically, meaning a service with newly setup SSL configuration will not be trusted.
In order to trust these services, the Client’s built-in truststore is configured with multiple certificates from verified Certificate Authorities
.So, a newly setup service can be registered as trusted by any of those intermediate CAs, and the Client can mark that service’s Public Key as trusted.
SSL configuration for Apache Kafka
Apache Kafka, as a system developed using Java have 02 concepts used in creating a SSL connection:
Keystore
: containing the Private Keys and the Public Keys signed by CAsTruststore
: containing the Public Keys that should be trusted (including CAs' public keys )
Configure SSL/TLS for a Kafka cluster
Demo Architecture
In this example, we will have 03 servers:
- a Client Server, which hosts the Kafka Consumer
- a Broker Server, which hosts the Broker and the Metadata Service
- a LDAP Server, which will hosts the LDAP Service and Certificate Authority (CA)

High Level Steps
- Setup Certificate Authority
- Create Truststore and Keystore for Broker and Client
- Configure Broker to use SSL
- Test SSL connection
Step 01: Setup Certificate Authority
- Environment Variables
CA_PRIVATE_KEY="ca.pem"
CA_PUBLIC_KEY="ca.pub"
CA_HOSTNAME="$(hostname -f)"
2. Generate Private key
openssl genrsa -out "${CA_PRIVATE_KEY}"
3. Generate Public Key
openssl req -new -x509 -key "${CA_PRIVATE_KEY}" -out "${CA_PUBLIC_KEY}" -subj "/C=/ST=/L=/O=/CN=${CA_HOSTNAME}"
4. Transfer CA’s Public Key to other Servers
for server_name in "broker.example.com" "client.example.com"
do
scp "${CA_PUBLIC_KEY}" "${server_name}":/tmp/
done
Step 02: Configure Truststore and Keystore for Broker and Client
Do each of these steps on the Broker and Client servers
- Environment Variables
SSL_STORE_DIR="/opt/confluent/ssl"
TRUSTSTORE="${SSL_STORE_DIR}/$(hostname -s).truststore.jks"
KEYSTORE="${SSL_STORE_DIR}/$(hostname -s).keystore.jks"
KEYSTORE_PASSWORD="changeit"
CSR_FILE="/tmp/$(hostname -s).csr"
SIGNED_CERT="/tmp/$(hostname -s).signed.crt"
CA_PUBLIC_KEY="/tmp/ca.pub"
# add this option if you are using JDK 11 or JDK 8
KEYTOOL_OPTS="-J-Dkeystore.pkcs12.legacy"
2. Create Directory containing Truststore and Keystore
mkdir -p "${SSL_STORE_DIR}"
3. Create Truststore containing CA’s Public Key
keytool "${KEYTOOL_OPTS}" -keystore "${TRUSTSTORE}" -alias CARoot -import -file "${CA_PUBLIC_KEY}" -storepass:env KEYSTORE_PASSWORD -noprompt
4. Create Keystore containing CA’s Public Key
keytool "${KEYTOOL_OPTS}" -keystore "${KEYSTORE}" -alias CARoot -import -file "${CA_PUBLIC_KEY}" -storepass:env KEYSTORE_PASSWORD -noprompt
5. Generate Private Key and Certificate Signing Request using Keystore
# generate private key
keytool "${KEYTOOL_OPTS}" -keystore "${KEYSTORE}" -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=$(hostname -f)" -ext "SAN=DNS:$(hostname -f)" -storepass:env KEYSTORE_PASSWORD -keypass:env KEYSTORE_PASSWORD
# generate CSR
keytool "${KEYTOOL_OPTS}" -keystore "${KEYSTORE}" -certreq -alias localhost -file "${CSR_FILE}" -storepass:env KEYSTORE_PASSWORD -noprompt
note: client must be able to resolve the Common Name
used in broker's certificate (which, in this example, is the output of hostname -f
command on Broker server)
6. (on LDAP Server) Sign CSRs
Transfer CSR files to LDAP Server
scp "${CSR_FILE}" ldap.example.com:/tmp/
Sign CSRs
for server_name in "broker" "client"
do
openssl x509 -req -CA "${CA_PUBLIC_KEY}" -CAkey "${CA_PRIVATE_KEY}" -in "/tmp/${server_name}.csr" -out "/tmp/${server_name}.signed.crt" -days 365 -CAcreateserial
done
Transfer signed Certificates to Broker and Client
for server_name in "broker" "client"
do
scp "/tmp/${server_name}.signed.crt" "${server_name}.example.com":/tmp/
done
7. Import Signed Certificate
keytool "${KEYTOOL_OPTS}" -keystore "${KEYSTORE}" -alias localhost -import -file "${SIGNED_CERT}" -storepass:env KEYSTORE_PASSWORD -noprompt
8. Change Ownership of Truststore and Keystore to Service User
chown -R cp-kafka:confluent "${SSL_STORE_DIR}"
Step 03: Configure SSL for broker
1. Configure SSL
Add SSL configurations to broker’s config file
Default location of the config file: /etc/kafka/server.properties
echo "# SSL configuration
ssl.truststore.location=${TRUSTSTORE}
ssl.truststore.password=${KEYSTORE_PASSWORD}
ssl.keystore.location=${KEYSTORE}
ssl.keystore.password=${KEYSTORE_PASSWORD}
ssl.key.password=${KEYSTORE_PASSWORD}
" >> /etc/kafka/server.properties
Update broker listener(s) to use SASL_SSL
Use the following commands to updates all listeners to SASL_SSL
Alternatively, you can configure each listener individually to your liking
UPDATED_PROTOCOLS=()
# split value of property "listener.security.protocol.map" to array using ',' as delimiter
IFS=',' read -ra PROTOCOLS <<< "$(grep listener.security.protocol.map /etc/kafka/server.properties |cut -d '=' -f 2)"
# update protocol of each listener to SASL_SSL
for i in "${PROTOCOLS[@]}"; do
UPDATED_PROTOCOLS+=("${i//:*/:SASL_SSL}")
done
# join UPDATED_PROTOCOLS array into string using ',' as delimiter
printf -v joined '%s,' "${UPDATED_PROTOCOLS[@]}"
# update config file
sed -i -e "s|listener.security.protocol.map=.+|listener.security.protocol.map=${joined%,}|g" /etc/kafka/server.properties
2. Restart Service
systemctl restart confluent-server
Step 04: Test Connection
- Update Client’s Properties File
Default location: /etc/kafka/consumer.properties
bootstrap.servers=localhost:9092
# consumer group id
group.id=test-consumer-group
security.protocol=SASL_SSL
ssl.truststore.location=/opt/confluent/ssl/client.truststore.jks
ssl.truststore.password=changeit
ssl.keystore.location=/opt/confluent/ssl/client.keystore.jks
ssl.keystore.password=changeit
ssl.key.password=changeit
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
metadataServerUrls="http://broker.example.com:8090" \
username="administrator" password="changeit" \
publicKeyPath="/opt/mds/mds.pub";
2. Test Connection
Test SSL connection to broker by listing existing topics.
kafka-topics --bootstrap-server broker.example.com:9092 --command-config /etc/kafka/consumer.properties --list