System Weakness

System Weakness is a publication that specialises in publishing upcoming writers in cybersecurity and ethical hacking space. Our security experts write to make the cyber universe more secure, one vulnerability at a time.

Follow publication

Configure SSL/TLS for a Confluent Kafka cluster

Throughout the lifecycle of a Kafka deployment, it might stores many sensitive informations which have demanding data privacy concerns and regulations.

To ensure the data stored is in compliance with these requirements, you will need to secure your Kafka deployment with TLS

In this article, we will go through the steps needed to configure SSL/TLS for a Kafka cluster.

Note: this is a follow up on my other articles about security configuration for Confluent Kafka (LDAP authentication and Role-based access control configurations).

TL;DR: Example Configuration File

zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
group.initial.rebalance.delay.ms=0
confluent.balancer.enable=true
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
confluent.metrics.reporter.topic.replicas=1
confluent.license.topic.replication.factor=1
confluent.metadata.topic.replication.factor=1
confluent.security.event.logger.exporter.kafka.topic.replicas=1
confluent.balancer.topic.replication.factor=1
default.replication.factor=1
min.insync.replicas=1
## LDAP configuration
# Kafka authenticates to the directory service with the bind user.
ldap.java.naming.provider.url=ldap://openldap.example.com:389
ldap.java.naming.security.authentication=simple
ldap.java.naming.security.credentials=examplePassword
ldap.java.naming.security.principal=cn=exampleUser,ou=KafkaUsers,dc=example,dc=com
# Locate groups
ldap.group.name.attribute=cn
ldap.group.object.class=groupOfNames
ldap.group.member.attribute=member
ldap.group.member.attribute.pattern=CN=(.*),ou=KafkaUsers,dc=example,dc=com
ldap.group.search.base=ou=Group,dc=example,dc=com
ldap.user.search.scope=2
# Locate users. Make sure that these attributes and object classes match what is in your directory service.
ldap.user.name.attribute=cn
ldap.user.object.class=person
ldap.user.search.base=ou=KafkaUsers,dc=example,dc=com
ldap.user.password.attribute=userPassword
## listeners configuration
listeners=BROKER://:9091,OAUTH://:9092
sasl.enabled.mechanisms=PLAIN,OAUTHBEARER
listener.security.protocol.map=BROKER:SASL_SSL,OAUTH:SASL_SSL
# inter-broker configuration
inter.broker.listener.name=BROKER
sasl.mechanism.inter.broker.protocol=PLAIN
listener.name.broker.sasl.enabled.mechanisms=PLAIN
listener.name.broker.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="exampleUser" password="examplePassword";
listener.name.broker.plain.sasl.server.callback.handler.class=io.confluent.security.auth.provider.ldap.LdapAuthenticateCallbackHandler
# OAUTHBEARER authentication configuration
listener.name.oauth.sasl.enabled.mechanisms=OAUTHBEARER
listener.name.oauth.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler
listener.name.oauth.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler
listener.name.oauth.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required publicKeyPath=/opt/confluent/mds/mds.pub;
# MDS configuration
authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
confluent.authorizer.access.rule.providers=CONFLUENT,ZK_ACL
confluent.metadata.server.advertised.listeners=http://0.0.0.0:8090
confluent.metadata.server.authentication.method=BEARER
confluent.metadata.server.listeners=http://0.0.0.0:8090
confluent.metadata.server.token.key.path=/opt/confluent/mds/mds.pem
super.users=User:exampleUser

# SSL configuration
ssl.truststore.location=/opt/confluent/ssl/client.truststore.jks
ssl.truststore.password=changeit
ssl.keystore.location=/opt/confluent/ssl/client.keystore.jks
ssl.keystore.password=changeit
ssl.key.password=changeit

About SSL

Before we begin working on configuring SSL/TLS for a Kafka cluster, let’s begin with some refresher on SSL.

The Components of a SSL connection

There are 03 main concepts in SSL:

  1. a Private Key, which can be thought of as a password to create SSL connection
  2. a Public Key, a client identifier/username for a SSL connection
  3. one or multiple Certificate Authority (CA), a Public Key validator

How SSL works

The process of creating a SSL session between Client and Server can be simplified to 05 steps:

01. Client connects to Server on a SSL port (e.g: 443)
02. Server sends its Public Key to Client
03. Client verifies the Server’s Public Key
04. Client generates a Session Key using Server’s Public Key
05. Server establishes a SSL-secured session

SSL architecture with Certificate Authority

Every client have a list of trusted certificates pre-installed in its system (this is called a truststore). For the Client to trust a server’s Public Key as a valid certificate, it must already have that Public Key in its built-in truststore.

However, the Client’s built-in truststore is not updated dynamically, meaning a service with newly setup SSL configuration will not be trusted.

In order to trust these services, the Client’s built-in truststore is configured with multiple certificates from verified Certificate Authorities .So, a newly setup service can be registered as trusted by any of those intermediate CAs, and the Client can mark that service’s Public Key as trusted.

SSL configuration for Apache Kafka

Apache Kafka, as a system developed using Java have 02 concepts used in creating a SSL connection:

  • Keystore: containing the Private Keys and the Public Keys signed by CAs
  • Truststore: containing the Public Keys that should be trusted (including CAs' public keys )

Configure SSL/TLS for a Kafka cluster

Demo Architecture

In this example, we will have 03 servers:
- a Client Server, which hosts the Kafka Consumer
- a Broker Server, which hosts the Broker and the Metadata Service
- a LDAP Server, which will hosts the LDAP Service and Certificate Authority (CA)

Example Deployment of a Kafka cluster secured by SSL and RBAC

High Level Steps

  1. Setup Certificate Authority
  2. Create Truststore and Keystore for Broker and Client
  3. Configure Broker to use SSL
  4. Test SSL connection

Step 01: Setup Certificate Authority

  1. Environment Variables
CA_PRIVATE_KEY="ca.pem"
CA_PUBLIC_KEY="ca.pub"
CA_HOSTNAME="$(hostname -f)"

2. Generate Private key

openssl genrsa -out "${CA_PRIVATE_KEY}"

3. Generate Public Key

openssl req -new -x509 -key "${CA_PRIVATE_KEY}" -out "${CA_PUBLIC_KEY}" -subj "/C=/ST=/L=/O=/CN=${CA_HOSTNAME}"

4. Transfer CA’s Public Key to other Servers

for server_name in "broker.example.com" "client.example.com"
do
scp "${CA_PUBLIC_KEY}" "${server_name}":/tmp/
done

Step 02: Configure Truststore and Keystore for Broker and Client

Do each of these steps on the Broker and Client servers

  1. Environment Variables
SSL_STORE_DIR="/opt/confluent/ssl"
TRUSTSTORE="${SSL_STORE_DIR}/$(hostname -s).truststore.jks"
KEYSTORE="${SSL_STORE_DIR}/$(hostname -s).keystore.jks"
KEYSTORE_PASSWORD="changeit"
CSR_FILE="/tmp/$(hostname -s).csr"
SIGNED_CERT="/tmp/$(hostname -s).signed.crt"
CA_PUBLIC_KEY="/tmp/ca.pub"

# add this option if you are using JDK 11 or JDK 8
KEYTOOL_OPTS="-J-Dkeystore.pkcs12.legacy"

2. Create Directory containing Truststore and Keystore

mkdir -p "${SSL_STORE_DIR}"

3. Create Truststore containing CA’s Public Key

keytool "${KEYTOOL_OPTS}" -keystore "${TRUSTSTORE}" -alias CARoot -import -file "${CA_PUBLIC_KEY}" -storepass:env KEYSTORE_PASSWORD -noprompt

4. Create Keystore containing CA’s Public Key

keytool "${KEYTOOL_OPTS}" -keystore "${KEYSTORE}" -alias CARoot -import -file "${CA_PUBLIC_KEY}" -storepass:env KEYSTORE_PASSWORD -noprompt

5. Generate Private Key and Certificate Signing Request using Keystore

# generate private key
keytool "${KEYTOOL_OPTS}" -keystore "${KEYSTORE}" -alias localhost -validity 365 -genkey -keyalg RSA -dname "CN=$(hostname -f)" -ext "SAN=DNS:$(hostname -f)" -storepass:env KEYSTORE_PASSWORD -keypass:env KEYSTORE_PASSWORD

# generate CSR
keytool "${KEYTOOL_OPTS}" -keystore "${KEYSTORE}" -certreq -alias localhost -file "${CSR_FILE}" -storepass:env KEYSTORE_PASSWORD -noprompt

note: client must be able to resolve the Common Name used in broker's certificate (which, in this example, is the output of hostname -f command on Broker server)

6. (on LDAP Server) Sign CSRs

Transfer CSR files to LDAP Server

scp "${CSR_FILE}" ldap.example.com:/tmp/

Sign CSRs

for server_name in "broker" "client"
do
openssl x509 -req -CA "${CA_PUBLIC_KEY}" -CAkey "${CA_PRIVATE_KEY}" -in "/tmp/${server_name}.csr" -out "/tmp/${server_name}.signed.crt" -days 365 -CAcreateserial
done

Transfer signed Certificates to Broker and Client

for server_name in "broker" "client"
do
scp "/tmp/${server_name}.signed.crt" "${server_name}.example.com":/tmp/
done

7. Import Signed Certificate

keytool "${KEYTOOL_OPTS}" -keystore "${KEYSTORE}" -alias localhost -import -file "${SIGNED_CERT}" -storepass:env KEYSTORE_PASSWORD -noprompt

8. Change Ownership of Truststore and Keystore to Service User

chown -R cp-kafka:confluent "${SSL_STORE_DIR}"

Step 03: Configure SSL for broker

1. Configure SSL

Add SSL configurations to broker’s config file

Default location of the config file: /etc/kafka/server.properties

echo "# SSL configuration
ssl.truststore.location=${TRUSTSTORE}
ssl.truststore.password=${KEYSTORE_PASSWORD}
ssl.keystore.location=${KEYSTORE}
ssl.keystore.password=${KEYSTORE_PASSWORD}
ssl.key.password=${KEYSTORE_PASSWORD}
"
>> /etc/kafka/server.properties

Update broker listener(s) to use SASL_SSL

Use the following commands to updates all listeners to SASL_SSL

Alternatively, you can configure each listener individually to your liking

UPDATED_PROTOCOLS=()
# split value of property "listener.security.protocol.map" to array using ',' as delimiter
IFS=',' read -ra PROTOCOLS <<< "$(grep listener.security.protocol.map /etc/kafka/server.properties |cut -d '=' -f 2)"

# update protocol of each listener to SASL_SSL
for i in "${PROTOCOLS[@]}"; do
UPDATED_PROTOCOLS+=("${i//:*/:SASL_SSL}")
done

# join UPDATED_PROTOCOLS array into string using ',' as delimiter
printf -v joined '%s,' "${UPDATED_PROTOCOLS[@]}"

# update config file
sed -i -e "s|listener.security.protocol.map=.+|listener.security.protocol.map=${joined%,}|g" /etc/kafka/server.properties

2. Restart Service

systemctl restart confluent-server

Step 04: Test Connection

  1. Update Client’s Properties File

Default location: /etc/kafka/consumer.properties

bootstrap.servers=localhost:9092
# consumer group id
group.id=test-consumer-group
security.protocol=SASL_SSL
ssl.truststore.location=/opt/confluent/ssl/client.truststore.jks
ssl.truststore.password=changeit
ssl.keystore.location=/opt/confluent/ssl/client.keystore.jks
ssl.keystore.password=changeit
ssl.key.password=changeit
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
metadataServerUrls="http://broker.example.com:8090" \
username="administrator" password="changeit" \
publicKeyPath="/opt/mds/mds.pub";

2. Test Connection

Test SSL connection to broker by listing existing topics.

kafka-topics --bootstrap-server broker.example.com:9092 --command-config /etc/kafka/consumer.properties --list

Published in System Weakness

System Weakness is a publication that specialises in publishing upcoming writers in cybersecurity and ethical hacking space. Our security experts write to make the cyber universe more secure, one vulnerability at a time.

Written by Hieu Nguyen

DevOps turned Data Engineer. Currently working on data streaming from Core Banking System to DWH. Reach me at hieund2102@gmail.com

No responses yet

Write a response