Connecting to a Instance

At this point, you can create a topic using your programming language of choice. Here is an example with kafka-python.

Please note that if you use the kafka-python guide, you must use your servername:port to establish a successful connection, not the one in the following examples.

# topic-ssl.py
# This script creates a topic named demo_topic

from kafka.admin import KafkaAdminClient, NewTopic

admin_client = KafkaAdminClient(
    bootstrap_servers="test-kafka-exoscale-a334664a-3015-4b86-8725-edf585d6cfb4.aivencloud.com:21701",
    client_id="test",
    security_protocol="SSL",
    ssl_cafile="ca.pem",
    ssl_certfile="test-kafka_cert.pem",
    ssl_keyfile="test-kafka_key.pem",
)

topic_list = []
topic_list.append(NewTopic(name="demo-topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
#EOF

At this point, you can produce and consume messages. Here is another example with kafka-python:

Producer

# producer-ssl.py
# This script connects to Kafka and send a few messages

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers="test-kafka-exoscale-a334664a-3015-4b86-8725-edf585d6cfb4.aivencloud.com:21701",
    security_protocol="SSL",
    ssl_cafile="ca.pem",
    ssl_certfile="test-kafka_cert.pem",
    ssl_keyfile="test-kafka_key.pem",
)

for i in range(1, 4):
    message = "message number {}".format(i)
    print("Sending: {}".format(message))
    producer.send("demo-topic", message.encode("utf-8"))

# Force sending of all messages

producer.flush()
#EOF

Consumer

# consumer-ssl.py
# This script receives messages from a Kafka topic

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "demo-topic",
    auto_offset_reset="earliest",
    bootstrap_servers="test-kafka-exoscale-a334664a-3015-4b86-8725-edf585d6cfb4.aivencloud.com:21701",
    client_id="test",
    group_id="demo-group",
    security_protocol="SSL",
    ssl_cafile="ca.pem",
    ssl_certfile="test-kafka_cert.pem",
    ssl_keyfile="test-kafka_key.pem"
)

# Call poll twice. First call will just assign partitions for our
# consumer without actually returning anything

for _ in range(2):
    raw_msgs = consumer.poll(timeout_ms=1000)
    for tp, msgs in raw_msgs.items():
        for msg in msgs:
            print("Received: {}".format(msg.value))

# Commit offsets so we won't get the same messages again

consumer.commit()
#EOF

At this point, can produce and consume messages using your programming language of choice.

Please note that if you haven’t created a topic already, you must create one. Otherwise, the connection will not be successful.

SASL Connection

If you want to check for the SASL authentication status, you can query it specifically with:

$ exo dbaas show test-kafka -z de-muc-1 --output-template '{{.Kafka.AuthenticationMethods.SASL}}'

NOTE
SASL is less secure

The SASL authentication method for Kafka clusters is less secure than SSL certificate authentication. However, it is easier to manipulate for tests and demonstration purposes. Use at your own risk.

To enable SASL, set the update flag with:

$ exo dbaas update -z de-muc-1 test-kafka --kafka-enable-sasl-auth true
Updating Database Service "test-kafka"...
┼─────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼
│    DATABASE SERVICE     │                                                                                                                                              │
┼─────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼
│ Zone                    │ de-muc-1                                                                                                                                     │
│ Name                    │ test-kafka                                                                                                                                   │
│ Type                    │ kafka                                                                                                                                        │
│ Plan                    │ startup-2                                                                                                                                    │
│ Disk Size               │ 90 GiB                                                                                                                                       │
│ State                   │ running                                                                                                                                      │
│ Creation Date           │ 2021-11-01 15:11:37 +0000 UTC                                                                                                                │
│ Update Date             │ 2021-11-01 15:55:54 +0000 UTC                                                                                                                │
│ Nodes                   │ 3                                                                                                                                            │
│ Node CPUs               │ 2                                                                                                                                            │
│ Node Memory             │ 2.0 GiB                                                                                                                                      │
│ Termination Protected   │ true                                                                                                                                         │
│ Maintenance             │ wednesday (18:48:43)                                                                                                                         │
│ URI                     │ test-kafka-exoscale-08b0165e-ef03-47ec-926f-f01163d557ed.aivencloud.com:21701                                                                │
│ IP Filter               │ 0.0.0.0/0                                                                                                                                    │
│ Authentication Methods  │ certificate, SASL                                                                                                                            │
│ Kafka Connect Enabled   │ false                                                                                                                                        │
│ Kafka REST Enabled      │ false                                                                                                                                        │
│ Schema Registry Enabled │ false                                                                                                                                        │
│ Components              │                                                                                                                                              │
│                         │   kafka   test-kafka-exoscale-08b0165e-ef03-47ec-926f-f01163d557ed.aivencloud.com:21701   auth:certificate   route:dynamic   usage:primary   │
│                         │   kafka   test-kafka-exoscale-08b0165e-ef03-47ec-926f-f01163d557ed.aivencloud.com:21712   auth:sasl          route:dynamic   usage:primary   │
│                         │                                                                                                                                              │
│ ACL                     │                                                                                                                                              │
│                         │   default   username:avnadmin   topic:*   permission:admin                                                                                   │
│                         │                                                                                                                                              │
│ Users                   │ avnadmin (primary)                                                                                                                           │
┼─────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼

At this point, the service offers a different port for the SASL connection, whereas the endpoint remains the same. You need to retrieve the SASL port.

From the example above, we now have SASL port 21712. The last thing we need is the username and password, which you can use the following full JSON output to retrieve:

$ exo dbaas show test-kafka -z de-muc-1 -O json