Connecting to a Instance
At this point, you can create a topic using your programming language of choice. Here is an example with kafka-python.
Please note that if you use the kafka-python guide, you must use your servername:port to establish a successful connection, not the one in the following examples.
# topic-ssl.py
# This script creates a topic named demo_topic
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers="test-kafka-exoscale-a334664a-3015-4b86-8725-edf585d6cfb4.aivencloud.com:21701",
client_id="test",
security_protocol="SSL",
ssl_cafile="ca.pem",
ssl_certfile="test-kafka_cert.pem",
ssl_keyfile="test-kafka_key.pem",
)
topic_list = []
topic_list.append(NewTopic(name="demo-topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
#EOF
At this point, you can produce and consume messages. Here is another example with kafka-python:
Producer
# producer-ssl.py
# This script connects to Kafka and send a few messages
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers="test-kafka-exoscale-a334664a-3015-4b86-8725-edf585d6cfb4.aivencloud.com:21701",
security_protocol="SSL",
ssl_cafile="ca.pem",
ssl_certfile="test-kafka_cert.pem",
ssl_keyfile="test-kafka_key.pem",
)
for i in range(1, 4):
message = "message number {}".format(i)
print("Sending: {}".format(message))
producer.send("demo-topic", message.encode("utf-8"))
# Force sending of all messages
producer.flush()
#EOF
Consumer
# consumer-ssl.py
# This script receives messages from a Kafka topic
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"demo-topic",
auto_offset_reset="earliest",
bootstrap_servers="test-kafka-exoscale-a334664a-3015-4b86-8725-edf585d6cfb4.aivencloud.com:21701",
client_id="test",
group_id="demo-group",
security_protocol="SSL",
ssl_cafile="ca.pem",
ssl_certfile="test-kafka_cert.pem",
ssl_keyfile="test-kafka_key.pem"
)
# Call poll twice. First call will just assign partitions for our
# consumer without actually returning anything
for _ in range(2):
raw_msgs = consumer.poll(timeout_ms=1000)
for tp, msgs in raw_msgs.items():
for msg in msgs:
print("Received: {}".format(msg.value))
# Commit offsets so we won't get the same messages again
consumer.commit()
#EOF
At this point, can produce and consume messages using your programming language of choice.
Please note that if you haven’t created a topic already, you must create one. Otherwise, the connection will not be successful.
SASL Connection
If you want to check for the SASL authentication status, you can query it specifically with:
$ exo dbaas show test-kafka -z de-muc-1 --output-template '{{.Kafka.AuthenticationMethods.SASL}}'
NOTE
SASL is less secure
The SASL authentication method for Kafka clusters is less secure than SSL certificate authentication. However, it is easier to manipulate for tests and demonstration purposes. Use at your own risk.
To enable SASL, set the update flag with:
$ exo dbaas update -z de-muc-1 test-kafka --kafka-enable-sasl-auth true
Updating Database Service "test-kafka"...
┼─────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼
│ DATABASE SERVICE │ │
┼─────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼
│ Zone │ de-muc-1 │
│ Name │ test-kafka │
│ Type │ kafka │
│ Plan │ startup-2 │
│ Disk Size │ 90 GiB │
│ State │ running │
│ Creation Date │ 2021-11-01 15:11:37 +0000 UTC │
│ Update Date │ 2021-11-01 15:55:54 +0000 UTC │
│ Nodes │ 3 │
│ Node CPUs │ 2 │
│ Node Memory │ 2.0 GiB │
│ Termination Protected │ true │
│ Maintenance │ wednesday (18:48:43) │
│ URI │ test-kafka-exoscale-08b0165e-ef03-47ec-926f-f01163d557ed.aivencloud.com:21701 │
│ IP Filter │ 0.0.0.0/0 │
│ Authentication Methods │ certificate, SASL │
│ Kafka Connect Enabled │ false │
│ Kafka REST Enabled │ false │
│ Schema Registry Enabled │ false │
│ Components │ │
│ │ kafka test-kafka-exoscale-08b0165e-ef03-47ec-926f-f01163d557ed.aivencloud.com:21701 auth:certificate route:dynamic usage:primary │
│ │ kafka test-kafka-exoscale-08b0165e-ef03-47ec-926f-f01163d557ed.aivencloud.com:21712 auth:sasl route:dynamic usage:primary │
│ │ │
│ ACL │ │
│ │ default username:avnadmin topic:* permission:admin │
│ │ │
│ Users │ avnadmin (primary) │
┼─────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼
At this point, the service offers a different port for the SASL connection, whereas the endpoint remains the same. You need to retrieve the SASL port.
From the example above, we now have SASL port 21712. The last thing we need is the username and password, which you can use the following full JSON output to retrieve:
$ exo dbaas show test-kafka -z de-muc-1 -O json