# Connect to Kafka

At this point, you can create a topic using your programming language
of choice. Here is an example with [kafka-python](https://pypi.org/project/kafka-python/).

Please note that if you use the kafka-python guide, you must use your servername:port to establish a
successful connection, **not** the one in the following examples.

```python
# topic-ssl.py
# This script creates a topic named demo_topic

from kafka.admin import KafkaAdminClient, NewTopic

admin_client = KafkaAdminClient(
    bootstrap_servers="test-kafka-exoscale-a334664a-3015-4b86-8725-edf585d6cfb4.aivencloud.com:21701",
    client_id="test",
    security_protocol="SSL",
    ssl_cafile="ca.pem",
    ssl_certfile="test-kafka_cert.pem",
    ssl_keyfile="test-kafka_key.pem",
)

topic_list = []
topic_list.append(NewTopic(name="demo-topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
#EOF
```

At this point, you can produce and consume messages. Here is another example with [kafka-python](https://pypi.org/project/kafka-python/):

## Producer

```python
# producer-ssl.py
# This script connects to Kafka and send a few messages

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers="test-kafka-exoscale-a334664a-3015-4b86-8725-edf585d6cfb4.aivencloud.com:21701",
    security_protocol="SSL",
    ssl_cafile="ca.pem",
    ssl_certfile="test-kafka_cert.pem",
    ssl_keyfile="test-kafka_key.pem",
)

for i in range(1, 4):
    message = "message number {}".format(i)
    print("Sending: {}".format(message))
    producer.send("demo-topic", message.encode("utf-8"))

# Force sending of all messages

producer.flush()
#EOF
```

## Consumer

```python
# consumer-ssl.py
# This script receives messages from a Kafka topic

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "demo-topic",
    auto_offset_reset="earliest",
    bootstrap_servers="test-kafka-exoscale-a334664a-3015-4b86-8725-edf585d6cfb4.aivencloud.com:21701",
    client_id="test",
    group_id="demo-group",
    security_protocol="SSL",
    ssl_cafile="ca.pem",
    ssl_certfile="test-kafka_cert.pem",
    ssl_keyfile="test-kafka_key.pem"
)

# Call poll twice. First call will just assign partitions for our
# consumer without actually returning anything

for _ in range(2):
    raw_msgs = consumer.poll(timeout_ms=1000)
    for tp, msgs in raw_msgs.items():
        for msg in msgs:
            print("Received: {}".format(msg.value))

# Commit offsets so we won't get the same messages again

consumer.commit()
#EOF
```

At this point, can produce and consume messages using your programming
language of choice.

Please note that if you haven't created a topic already, you must create one. Otherwise,
the connection will not be successful.

## SASL Connection

If you want to check for the SASL authentication status, you
can query it specifically with:

```bash 
exo dbaas show test-kafka -z de-muc-1 --output-template '{{.Kafka.AuthenticationMethods.SASL}}'
```

> [!NOTE]
> SASL is less secure.

The SASL authentication method for Kafka clusters is less secure than SSL certificate authentication. However, it is easier to manipulate for tests and demonstration purposes. Use at your own risk.


To enable SASL, set the update flag with:

```bash
exo dbaas update -z de-muc-1 test-kafka --kafka-enable-sasl-auth true
```

Output:

```bash
Updating Database Service "test-kafka"...
┼─────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼
│    DATABASE SERVICE     │                                                                                                                                              │
┼─────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼
│ Zone                    │ de-muc-1                                                                                                                                     │
│ Name                    │ test-kafka                                                                                                                                   │
│ Type                    │ kafka                                                                                                                                        │
│ Plan                    │ startup-2                                                                                                                                    │
│ Disk Size               │ 90 GiB                                                                                                                                       │
│ State                   │ running                                                                                                                                      │
│ Creation Date           │ 2021-11-01 15:11:37 +0000 UTC                                                                                                                │
│ Update Date             │ 2021-11-01 15:55:54 +0000 UTC                                                                                                                │
│ Nodes                   │ 3                                                                                                                                            │
│ Node CPUs               │ 2                                                                                                                                            │
│ Node Memory             │ 2.0 GiB                                                                                                                                      │
│ Termination Protected   │ true                                                                                                                                         │
│ Maintenance             │ wednesday (18:48:43)                                                                                                                         │
│ URI                     │ test-kafka-exoscale-08b0165e-ef03-47ec-926f-f01163d557ed.aivencloud.com:21701                                                                │
│ IP Filter               │ 0.0.0.0/0                                                                                                                                    │
│ Authentication Methods  │ certificate, SASL                                                                                                                            │
│ Kafka Connect Enabled   │ false                                                                                                                                        │
│ Kafka REST Enabled      │ false                                                                                                                                        │
│ Schema Registry Enabled │ false                                                                                                                                        │
│ Components              │                                                                                                                                              │
│                         │   kafka   test-kafka-exoscale-08b0165e-ef03-47ec-926f-f01163d557ed.aivencloud.com:21701   auth:certificate   route:dynamic   usage:primary   │
│                         │   kafka   test-kafka-exoscale-08b0165e-ef03-47ec-926f-f01163d557ed.aivencloud.com:21712   auth:sasl          route:dynamic   usage:primary   │
│                         │                                                                                                                                              │
│ ACL                     │                                                                                                                                              │
│                         │   default   username:avnadmin   topic:*   permission:admin                                                                                   │
│                         │                                                                                                                                              │
│ Users                   │ avnadmin (primary)                                                                                                                           │
┼─────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼
```

At this point, the service offers a
different port for the SASL connection, whereas the endpoint remains the same. You need to retrieve the SASL port.

From the example above, we now have SASL port 21712. The last thing we need is the username and password, which you can use the following full JSON output to retrieve:

```bash
exo dbaas show test-kafka -z de-muc-1 -O json
```
