Authentication and Certificate Authority

For Managed Apache Kafka producers’ and consumers’ connection to a topic, the only supported connection mechanism is through a SSL certificate validation.

CA

To retrieve the CA for a newly created cluster, run the following command:

exo dbaas ca-certificate > ca.pem

The file ca.pem now contains the CA for the Exoscale DBaaS.

To retrieve the access certificate and key, we need to use the output templates of the CLI.

exo dbaas show test-kafka -z de-muc-1 --output-template '{{.Kafka.ConnectionInfo.AccessCert}}'
-----BEGIN CERTIFICATE-----
MIIEPDCCAqSgAwIBAgIUf+hoGaT3XjwEbguv9LYqNiVUw54wDQYJKoZIhvcNAQEM
BQAwOjE4MDYGA1UEAwwvYjJmODcwNmUtMDBmNi00M2MzLTkyMjktYTU5ZTYzODhm
ZGQwIFByb2plY3QgQ0EwHhcNMjExMTAxMTUxMTM2WhcNMjQwMTMwMTUxMTM2WjA7
....
....
qI70kNpCdbiqVf9EXvlmEkGFbKkR5rGW+QvdGwXTLBs=
-----END CERTIFICATE-----

For further handling, we will put the certificate and key into files. First for the certificate:

exo dbaas show test-kafka -z de-muc-1 --output-template '{{.Kafka.ConnectionInfo.AccessCert}}' > test-kafka_cert.pem

And then for the key:

exo dbaas show test-kafka -z de-muc-1 --output-template '{{.Kafka.ConnectionInfo.AccessKey}}' > test-kafka_key.pem

To cycle the credentials

To cycle credentials, create a new service user and retrieve its certificate and key. Then delete the previous service user.

Connecting to a Kafka instance

At this point, you can create a topic using your programming language of choice. Here is an example with kafka-python.

Please note that if you use the kafka-python guide, you must use your servername:port to establish a successful connection, not the one in the following examples.

# topic-ssl.py
# This script creates a topic named demo_topic

from kafka.admin import KafkaAdminClient, NewTopic

admin_client = KafkaAdminClient(
    bootstrap_servers="test-kafka-exoscale-a334664a-3015-4b86-8725-edf585d6cfb4.aivencloud.com:21701",
    client_id="test",
    security_protocol="SSL",
    ssl_cafile="ca.pem",
    ssl_certfile="test-kafka_cert.pem",
    ssl_keyfile="test-kafka_key.pem",
)

topic_list = []
topic_list.append(NewTopic(name="demo-topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
#EOF

At this point, you can produce and consume messages. Here is another example with kafka-python:

Producer

# producer-ssl.py
# This script connects to Kafka and send a few messages

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers="test-kafka-exoscale-a334664a-3015-4b86-8725-edf585d6cfb4.aivencloud.com:21701",
    security_protocol="SSL",
    ssl_cafile="ca.pem",
    ssl_certfile="test-kafka_cert.pem",
    ssl_keyfile="test-kafka_key.pem",
)

for i in range(1, 4):
    message = "message number {}".format(i)
    print("Sending: {}".format(message))
    producer.send("demo-topic", message.encode("utf-8"))

# Force sending of all messages

producer.flush()
#EOF

Consumer

# consumer-ssl.py
# This script receives messages from a Kafka topic

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "demo-topic",
    auto_offset_reset="earliest",
    bootstrap_servers="test-kafka-exoscale-a334664a-3015-4b86-8725-edf585d6cfb4.aivencloud.com:21701",
    client_id="test",
    group_id="demo-group",
    security_protocol="SSL",
    ssl_cafile="ca.pem",
    ssl_certfile="test-kafka_cert.pem",
    ssl_keyfile="test-kafka_key.pem"
)

# Call poll twice. First call will just assign partitions for our
# consumer without actually returning anything

for _ in range(2):
    raw_msgs = consumer.poll(timeout_ms=1000)
    for tp, msgs in raw_msgs.items():
        for msg in msgs:
            print("Received: {}".format(msg.value))

# Commit offsets so we won't get the same messages again

consumer.commit()
#EOF

At this point, can produce and consume messages using your programming language of choice.

Please note that if you haven’t created a topic already, you must create one. Otherwise, the connection will not be successful.

SASL connection

If you want to check for the SASL authentication status, you can query it specifically with:

exo dbaas show test-kafka -z de-muc-1 --output-template '{{.Kafka.AuthenticationMethods.SASL}}'

SASL is less secure

The SASL authentication method for Apache Kafka clusters is less secure than SSL certificate authentication. However, it is easier to manipulate for tests and demonstration purposes. Use at your own risk.

To enable SASL, set the update flag with:

exo dbaas update -z de-muc-1 test-kafka --kafka-enable-sasl-auth true
Updating Database Service "test-kafka"...
┼─────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼
│    DATABASE SERVICE     │                                                                                                                                              │
┼─────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼
│ Zone                    │ de-muc-1                                                                                                                                     │
│ Name                    │ test-kafka                                                                                                                                   │
│ Type                    │ kafka                                                                                                                                        │
│ Plan                    │ startup-2                                                                                                                                    │
│ Disk Size               │ 90 GiB                                                                                                                                       │
│ State                   │ running                                                                                                                                      │
│ Creation Date           │ 2021-11-01 15:11:37 +0000 UTC                                                                                                                │
│ Update Date             │ 2021-11-01 15:55:54 +0000 UTC                                                                                                                │
│ Nodes                   │ 3                                                                                                                                            │
│ Node CPUs               │ 2                                                                                                                                            │
│ Node Memory             │ 2.0 GiB                                                                                                                                      │
│ Termination Protected   │ true                                                                                                                                         │
│ Maintenance             │ wednesday (18:48:43)                                                                                                                         │
│ URI                     │ test-kafka-exoscale-08b0165e-ef03-47ec-926f-f01163d557ed.aivencloud.com:21701                                                                │
│ IP Filter               │ 0.0.0.0/0                                                                                                                                    │
│ Authentication Methods  │ certificate, SASL                                                                                                                            │
│ Kafka Connect Enabled   │ false                                                                                                                                        │
│ Kafka REST Enabled      │ false                                                                                                                                        │
│ Schema Registry Enabled │ false                                                                                                                                        │
│ Components              │                                                                                                                                              │
│                         │   kafka   test-kafka-exoscale-08b0165e-ef03-47ec-926f-f01163d557ed.aivencloud.com:21701   auth:certificate   route:dynamic   usage:primary   │
│                         │   kafka   test-kafka-exoscale-08b0165e-ef03-47ec-926f-f01163d557ed.aivencloud.com:21712   auth:sasl          route:dynamic   usage:primary   │
│                         │                                                                                                                                              │
│ ACL                     │                                                                                                                                              │
│                         │   default   username:avnadmin   topic:*   permission:admin                                                                                   │
│                         │                                                                                                                                              │
│ Users                   │ avnadmin (primary)                                                                                                                           │
┼─────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼

At this point, the service offers a different port for the SASL connection, whereas the endpoint remains the same. You need to retrieve the SASL port.

From the example above, we now have SASL port 21712. The last thing we need is the username and password, which you can use the following full JSON output to retrieve:

exo dbaas show test-kafka -z de-muc-1 -O json

Restricting connections from the internet

By default, Exoscale DBaaS are not accessible from the whole internet. Data does not transmit in clear over the network as it is SSL encrypted by default and authenticated.

To allow incoming connections to your database service, you can add a filter that allows:

  • just one IP address,
  • a network range,
  • or a combination of IP address and network range

To do this, update your service or create it with the IP filter, which is a comma separated list of CIDRs:

exo dbaas update -z de-muc-1 test-kafka --kafka-ip-filter=1.2.3.4/32,5.6.7.8/24

Specific options when creating or updating a Apache Kafka service

You can find all the specific options for Apache Kafka by using the CLI help:

exo dbaas create --help-kafka
  --kafka-connect-settings           Kafka Connect configuration settings (JSON format)
  --kafka-enable-cert-auth           enable certificate-based authentication method
  --kafka-enable-kafka-connect       enable Kafka Connect
  --kafka-enable-kafka-rest          enable Kafka REST
  --kafka-enable-sasl-auth           enable SASL-based authentication method
  --kafka-enable-schema-registry     enable Schema Registry
  --kafka-ip-filter                  allow incoming connections from CIDR address block
  --kafka-rest-settings              Kafka REST configuration settings (JSON format)
  --kafka-schema-registry-settings   Schema Registry configuration settings (JSON format)
  --kafka-settings                   Kafka configuration settings (JSON format)
  --kafka-version                    Kafka major version

Managed Apache Kafka additional configuration

A large number of service specific configuration options are available:

  • Kafka Connect
  • Kafka REST
  • Kafka Registry
  • Kafka general

The general settings for Managed Apache Kafka are:

exo dbaas type show kafka --settings kafka
┼────────────────────────────────────────────────────────────┼─────────┼──────────────────────────────────────────────────────┼
│                            KEY                             │  TYPE   │                     DESCRIPTION                      │
┼────────────────────────────────────────────────────────────┼─────────┼──────────────────────────────────────────────────────┼
│ log_segment_delete_delay_ms                                │ integer │ The amount of time to wait before deleting a file    │
│                                                            │         │ from the filesystem                                  │
│                                                            │         │   * Minimum: 0 / Maximum: 3.6e+06                    │
│                                                            │         │   * Example: 60000                                   │
│ log_roll_ms                                                │ integer │ The maximum time before a new log segment is         │
│                                                            │         │ rolled out (in milliseconds).                        │
│                                                            │         │   * Minimum: 1 / Maximum: 9.223372036854776e+18      │
│ log_cleaner_min_compaction_lag_ms                          │ integer │ The minimum time a message will remain uncompacted   │
│                                                            │         │ in the log. Only applicable for logs that are        │
│                                                            │         │ being compacted.                                     │
│                                                            │         │   * Minimum: 0 / Maximum: 9.223372036854776e+18      │
│ max_incremental_fetch_session_cache_slots                  │ integer │ The maximum number of incremental fetch sessions     │
│                                                            │         │ that the broker will maintain.                       │
│                                                            │         │   * Minimum: 1000 / Maximum: 10000                   │
│                                                            │         │   * Example: 1000                                    │
│ group_min_session_timeout_ms                               │ integer │ The minimum allowed session timeout for registered   │
│                                                            │         │ consumers. Longer timeouts give consumers more       │
│                                                            │         │ time to process messages in between heartbeats at    │
│                                                            │         │ the cost of a longer time to detect failures.        │
│                                                            │         │   * Minimum: 0 / Maximum: 60000                      │
│                                                            │         │   * Example: 6000                                    │
│ offsets_retention_minutes                                  │ integer │ Log retention window in minutes for offsets topic    │
│                                                            │         │   * Minimum: 1 / Maximum: 2.147483647e+09            │
│                                                            │         │   * Example: 10080                                   │
│ log_flush_interval_ms                                      │ integer │ The maximum time in ms that a message in any topic   │
│                                                            │         │ is kept in memory before flushed to disk. If not     │
│                                                            │         │ set, the value in log.flush.scheduler.interval.ms    │
│                                                            │         │ is used                                              │
│                                                            │         │   * Minimum: 0 / Maximum: 9.223372036854776e+18      │
│ log_retention_hours                                        │ integer │ The number of hours to keep a log file before        │
│                                                            │         │ deleting it                                          │
│                                                            │         │   * Minimum: -1 / Maximum: 2.147483647e+09           │
│ log_retention_ms                                           │ integer │ The number of milliseconds to keep a log file        │
│                                                            │         │ before deleting it (in milliseconds), If not set,    │
│                                                            │         │ the value in log.retention.minutes is used. If set   │
│                                                            │         │ to -1, no time limit is applied.                     │
│                                                            │         │   * Minimum: -1 / Maximum: 9.223372036854776e+18     │
│ min_insync_replicas                                        │ integer │ When a producer sets acks to 'all' (or '-1'),        │
│                                                            │         │ min.insync.replicas specifies the minimum number     │
│                                                            │         │ of replicas that must acknowledge a write for the    │
│                                                            │         │ write to be considered successful.                   │
│                                                            │         │   * Minimum: 1 / Maximum: 7                          │
│                                                            │         │   * Example: 1                                       │
│ log_preallocate                                            │ boolean │ Should pre allocate file when create new segment?    │
│                                                            │         │   * Example: false                                   │
│ group_initial_rebalance_delay_ms                           │ integer │ The amount of time, in milliseconds, the group       │
│                                                            │         │ coordinator will wait for more consumers to join a   │
│                                                            │         │ new group before performing the first rebalance. A   │
│                                                            │         │ longer delay means potentially fewer rebalances,     │
│                                                            │         │ but increases the time until processing begins.      │
│                                                            │         │ The default value for this is 3 seconds. During      │
│                                                            │         │ development and testing it might be desirable to     │
│                                                            │         │ set this to 0 in order to not delay test execution   │
│                                                            │         │ time.                                                │
│                                                            │         │   * Minimum: 0 / Maximum: 300000                     │
│                                                            │         │   * Example: 3000                                    │
│ log_message_timestamp_difference_max_ms                    │ integer │ The maximum difference allowed between the           │
│                                                            │         │ timestamp when a broker receives a message and the   │
│                                                            │         │ timestamp specified in the message                   │
│                                                            │         │   * Minimum: 0 / Maximum: 9.223372036854776e+18      │
│ log_index_size_max_bytes                                   │ integer │ The maximum size in bytes of the offset index        │
│                                                            │         │   * Minimum: 1.048576e+06 / Maximum: 1.048576e+08    │
│                                                            │         │   * Example: 1.048576e+07                            │
│ message_max_bytes                                          │ integer │ The maximum size of message that the server can      │
│                                                            │         │ receive.                                             │
│                                                            │         │   * Minimum: 0 / Maximum: 1.000012e+08               │
│                                                            │         │   * Example: 1.048588e+06                            │
│ log_roll_jitter_ms                                         │ integer │ The maximum jitter to subtract from                  │
│                                                            │         │ logRollTimeMillis (in milliseconds). If not set,     │
│                                                            │         │ the value in log.roll.jitter.hours is used           │
│                                                            │         │   * Minimum: 0 / Maximum: 9.223372036854776e+18      │
│ transaction_remove_expired_transaction_cleanup_interval_ms │ integer │ The interval at which to remove transactions that    │
│                                                            │         │ have expired due to transactional.id.expiration.ms   │
│                                                            │         │ passing (defaults to 3600000 (1 hour)).              │
│                                                            │         │   * Minimum: 600000 / Maximum: 3.6e+06               │
│                                                            │         │   * Example: 3.6e+06                                 │
│ log_message_downconversion_enable                          │ boolean │ This configuration controls whether                  │
│                                                            │         │ down-conversion of message formats is enabled to     │
│                                                            │         │ satisfy consume requests.                            │
│                                                            │         │   * Example: true                                    │
│ log_cleaner_max_compaction_lag_ms                          │ integer │ The maximum amount of time message will remain       │
│                                                            │         │ uncompacted. Only applicable for logs that are       │
│                                                            │         │ being compacted                                      │
│                                                            │         │   * Minimum: 30000 / Maximum: 9.223372036854776e+18  │
│ socket_request_max_bytes                                   │ integer │ The maximum number of bytes in a socket request      │
│                                                            │         │ (defaults to 104857600).                             │
│                                                            │         │   * Minimum: 1.048576e+07 / Maximum: 2.097152e+08    │
│ log_segment_bytes                                          │ integer │ The maximum size of a single log file                │
│                                                            │         │   * Minimum: 1.048576e+07 / Maximum: 1.073741824e+09 │
│ replica_fetch_max_bytes                                    │ integer │ The number of bytes of messages to attempt to        │
│                                                            │         │ fetch for each partition (defaults to 1048576).      │
│                                                            │         │ This is not an absolute maximum, if the first        │
│                                                            │         │ record batch in the first non-empty partition of     │
│                                                            │         │ the fetch is larger than this value, the record      │
│                                                            │         │ batch will still be returned to ensure that          │
│                                                            │         │ progress can be made.                                │
│                                                            │         │   * Minimum: 1.048576e+06 / Maximum: 1.048576e+08    │
│ log_flush_interval_messages                                │ integer │ The number of messages accumulated on a log          │
│                                                            │         │ partition before messages are flushed to disk        │
│                                                            │         │   * Minimum: 1 / Maximum: 9.223372036854776e+18      │
│                                                            │         │   * Example: 9.223372036854776e+18                   │
│ max_connections_per_ip                                     │ integer │ The maximum number of connections allowed from       │
│                                                            │         │ each ip address (defaults to 2147483647).            │
│                                                            │         │   * Minimum: 256 / Maximum: 2.147483647e+09          │
│ log_index_interval_bytes                                   │ integer │ The interval with which Kafka adds an entry to the   │
│                                                            │         │ offset index                                         │
│                                                            │         │   * Minimum: 0 / Maximum: 1.048576e+08               │
│                                                            │         │   * Example: 4096                                    │
│ transaction_state_log_segment_bytes                        │ integer │ The transaction topic segment bytes should be kept   │
│                                                            │         │ relatively small in order to facilitate faster log   │
│                                                            │         │ compaction and cache loads (defaults to 104857600    │
│                                                            │         │ (100 mebibytes)).                                    │
│                                                            │         │   * Minimum: 1.048576e+06 / Maximum: 2.147483647e+09 │
│                                                            │         │   * Example: 1.048576e+08                            │
│ log_cleanup_policy                                         │ string  │ The default cleanup policy for segments beyond the   │
│                                                            │         │ retention window                                     │
│                                                            │         │   * Supported values:                                │
│                                                            │         │     - delete                                         │
│                                                            │         │     - compact                                        │
│                                                            │         │     - compact,delete                                 │
│                                                            │         │   * Example: delete                                  │
│ default_replication_factor                                 │ integer │ Replication factor for autocreated topics            │
│                                                            │         │   * Minimum: 1 / Maximum: 10                         │
│ producer_purgatory_purge_interval_requests                 │ integer │ The purge interval (in number of requests) of the    │
│                                                            │         │ producer request purgatory(defaults to 1000).        │
│                                                            │         │   * Minimum: 10 / Maximum: 10000                     │
│ group_max_session_timeout_ms                               │ integer │ The maximum allowed session timeout for registered   │
│                                                            │         │ consumers. Longer timeouts give consumers more       │
│                                                            │         │ time to process messages in between heartbeats at    │
│                                                            │         │ the cost of a longer time to detect failures.        │
│                                                            │         │   * Minimum: 0 / Maximum: 1.8e+06                    │
│                                                            │         │   * Example: 1.8e+06                                 │
│ auto_create_topics_enable                                  │ boolean │ Enable auto creation of topics                       │
│                                                            │         │   * Example: true                                    │
│ connections_max_idle_ms                                    │ integer │ Idle connections timeout: the server socket          │
│                                                            │         │ processor threads close the connections that idle    │
│                                                            │         │ for longer than this.                                │
│                                                            │         │   * Minimum: 1000 / Maximum: 3.6e+06                 │
│                                                            │         │   * Example: 540000                                  │
│ log_cleaner_min_cleanable_ratio                            │ number  │ Controls log compactor frequency. Larger value       │
│                                                            │         │ means more frequent compactions but also more        │
│                                                            │         │ space wasted for logs. Consider setting              │
│                                                            │         │ log.cleaner.max.compaction.lag.ms to enforce         │
│                                                            │         │ compactions sooner, instead of setting a very high   │
│                                                            │         │ value for this option.                               │
│                                                            │         │   * Minimum: 0.2 / Maximum: 0.9                      │
│                                                            │         │   * Example: 0.5                                     │
│ log_retention_bytes                                        │ integer │ The maximum size of the log before deleting          │
│                                                            │         │ messages                                             │
│                                                            │         │   * Minimum: -1 / Maximum: 9.223372036854776e+18     │
│ compression_type                                           │ string  │ Specify the final compression type for a given       │
│                                                            │         │ topic. This configuration accepts the standard       │
│                                                            │         │ compression codecs ('gzip', 'snappy', 'lz4',         │
│                                                            │         │ 'zstd'). It additionally accepts 'uncompressed'      │
│                                                            │         │ which is equivalent to no compression; and           │
│                                                            │         │ 'producer' which means retain the original           │
│                                                            │         │ compression codec set by the producer.               │
│                                                            │         │   * Supported values:                                │
│                                                            │         │     - gzip                                           │
│                                                            │         │     - snappy                                         │
│                                                            │         │     - lz4                                            │
│                                                            │         │     - zstd                                           │
│                                                            │         │     - uncompressed                                   │
│                                                            │         │     - producer                                       │
│ log_cleaner_delete_retention_ms                            │ integer │ How long are delete records retained?                │
│                                                            │         │   * Minimum: 0 / Maximum: 3.1556926e+11              │
│                                                            │         │   * Example: 8.64e+07                                │
│ num_partitions                                             │ integer │ Number of partitions for autocreated topics          │
│                                                            │         │   * Minimum: 1 / Maximum: 1000                       │
│ log_message_timestamp_type                                 │ string  │ Define whether the timestamp in the message is       │
│                                                            │         │ message create time or log append time.              │
│                                                            │         │   * Supported values:                                │
│                                                            │         │     - CreateTime                                     │
│                                                            │         │     - LogAppendTime                                  │
│ replica_fetch_response_max_bytes                           │ integer │ Maximum bytes expected for the entire fetch          │
│                                                            │         │ response (defaults to 10485760). Records are         │
│                                                            │         │ fetched in batches, and if the first record batch    │
│                                                            │         │ in the first non-empty partition of the fetch is     │
│                                                            │         │ larger than this value, the record batch will        │
│                                                            │         │ still be returned to ensure that progress can be     │
│                                                            │         │ made. As such, this is not an absolute maximum.      │
│                                                            │         │   * Minimum: 1.048576e+07 / Maximum: 1.048576e+09    │
┼────────────────────────────────────────────────────────────┼─────────┼──────────────────────────────────────────────────────┼

You can also update the settings of your database service:

exo dbaas update --zone de-fra-1 target-kafka-service-name --kafka-settings '{"compression_type":"lz4"}

Note

The parameter of --kafka-settings has to be in JSON format.