Connecting dockerized Golang services via Kafka
Sometimes, when we need to connect a number of services, using API is not enough. For example, if we need to stream messages to many services and ability to scale without worry even when other services are down.
Apache Kafka is an open-source distributed event streaming platform, that allows to publish and subscribe to streams of events using producers and consumers.
Let’s see how to run Kafka, using docker-compose.
First, we need to add Apache ZooKeeper, which is a must have tool for Kafka setups, maintaining configuration information, providing synchronization, group services and more.
zookeeper:image: 'bitnami/zookeeper:latest'ports:- '2181:2181'environment:- ZOO_ENABLE_AUTH=yes- ZOO_SERVER_USERS=user- ZOO_SERVER_PASSWORDS=bitnami- ZOO_CLIENT_USER=user- ZOO_CLIENT_PASSWORD=bitnami
Next, let’s look at Kafka setup:
kafka:image: 'docker.io/bitnami/kafka:2-debian-10'hostname: kafka# Expose port for each listener that can be connected from external callsports:- "9092:9092"- "9091:9091"volumes:# Script to generate JKS certificates# https://raw.githubusercontent.com/confluentinc/confluent-platform-security-tools/master/kafka-generate-ssl.sh# More details in repo: https://github.com/bitnami/bitnami-docker-kafka- ../../keys/kafka/jks/kafka.server.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks- ../../keys/kafka/jks/kafka.server.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jksenvironment:- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181# Here, we describe all enabled auth mechanisms- KAFKA_CFG_SASL_ENABLED_MECHANISMS=SCRAM-SHA-512,SASL_SSL,PLAINTEXT,SASL- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=SCRAM-SHA-512# Here we describe all listeners we want to have with corresponding ports and security protocols.# INTERNAL - will be used for Kafka's internal purposes and won't be exposed, so we can use plaintext auth for it.# CLIENT - primary client for external connections, works for Amazon MSK# LCLIENT - local client, usefull when you don't want to mess with certificates in your local env.- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_PLAINTEXT,CLIENT:SASL_SSL,LCLIENT:SASL_PLAINTEXT- KAFKA_CFG_LISTENERS=INTERNAL://kafka:9093,CLIENT://kafka:9092,LCLIENT://kafka:9091-KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9093,CLIENT://kafka:9092,LCLIENT://localhost:9091- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL- KAFKA_CFG_ADVERTISED_HOST_NAME=kafka- KAFKA_CFG_ADVERTISED_PORT=9092# Unless we specify this, broker id will be randomized on kafka restart each time# and somehow topic leader will be changed and loose ability to write to same topic forever# https://stackoverflow.com/a/46970971/4346580 (comment below answer)- KAFKA_BROKER_ID=1# Credentials- KAFKA_CLIENT_USER=user- KAFKA_CLIENT_PASSWORD=bitnami# ZooKeeper credentials- KAFKA_ZOOKEEPER_USER=user- KAFKA_ZOOKEEPER_PASSWORD=bitnami- KAFKA_ZOOKEEPER_PROTOCOL=SASL- KAFKA_CERTIFICATE_PASSWORD=storepassdepends_on:- zookeeper
Now let’s make simple Golang app, which will send a single message to Kafka. This one will use SASL_PLAINTEXT protocol, that we defined as LCLIENT in docker-compose, so no certificates will be needed.
Also, there is a nice tool such as kowl (https://github.com/cloudhut/kowl), which provides UI for Kafka, so if you want to be able to check what’s going on inside, you can add it to docker-compose.
Let’s see the service’s code:
package mainimport (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {
fmt.Printf("Starting")
topicName := "my-topic" // Here we use SASL_PLAINTEXT listener, which doesn't need certificates
kafkaConfig := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9091",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "user",
"sasl.password": "bitnami",
"group.id": "kafka-writer",
"auto.offset.reset": "smallest",
"enable.auto.commit": false,
"request.required.acks": 1,
}
// Make new producer
producer, err := kafka.NewProducer(kafkaConfig)
defer producer.Close() var headers []kafka.Header
// Creating new message
msg := kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topicName,
Partition: kafka.PartitionAny,
},
Value: []byte("Hello"),
Key: []byte("Message key"),
Headers: headers,
}
// This will send message
err = producer.Produce(&msg, nil)
if err != nil {
panic(err)
}
producer.Flush(20 * 1000) fmt.Printf("Finishing")
}
Now let’s make another service, which will be reading messages from Kafka. For this one, let’s specify certificates and use SASL_SSL security protocol.
First, you need to copy and mount your CARoot certificate into service’s project dir.
You can do this with this bash script and run it before build.
rm -f ca-cert.pem
cp /etc/ssl/certs/ca-cert.pem ./
chmod +x ca-cert.pem
And the consumer service’s code:
package mainimport (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
"os/signal"
"syscall"
)func main() {
topicName := "my_topic" kafkaConfig := &kafka.ConfigMap{
"bootstrap.servers": "kafka:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "user",
"sasl.password": "bitnami",
"group.id": "my_group",
"auto.offset.reset": "smallest",
"enable.auto.commit": false,
// Here we specify copied cert
"ssl.ca.location": "/app/ca-cert.pem",
} kafkaConsumer, err := kafka.NewConsumer(kafkaConfig)
if err != nil {
panic(err)
} fmt.Println("Created Kafka consumer") err = kafkaConsumer.SubscribeTopics([]string{topicName}, nil)
if err != nil {
panic(err)
} sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) // Running loop until stop signal is received
run := true
for run == true {
select {
case _ = <-sigchan:
fmt.Println("Caught signal, stopping polling")
run = false
default:
ev := kafkaConsumer.Poll(1000)
switch event := ev.(type) {
case *kafka.Message:
key := event.Key
value := event.Value fmt.Printf("Got message: %v %v", key, value) _, err := kafkaConsumer.CommitMessage(event)
if err != nil {
panic(err)
}
case kafka.Error:
fmt.Println("Kafka.Error received")
default:
fmt.Println("Unknown Kafka event")
}
}
} fmt.Println("Closing consumer")
err = kafkaConsumer.Close()
if err != nil {
panic(err)
}
}
Now, beause we used SASL_SSL here, this setup can also be used for Amazon MSK, though “ssl.ca.location” should be removed or set optionally, depending on environment.
For local environment, both SASL_PLAINTEXT and SASL_SSL can be used, but to make code ready for production, using SSL is recommended, though it requires additional work to work locally, such as coping certs and configuring more stuff.
All this can be run in a same docker-compose file, sending message from one service and receiving in from the other one.