Managed Apache Kafka Specifics
Authentication and Certificate Authority
For Managed Apache Kafka producers’ and consumers’ connection to a topic, the only supported connection mechanism is through a SSL certificate validation.
CA
To retrieve the CA for a newly created cluster, run the following command:
exo dbaas ca-certificate > ca.pem
The file ca.pem
now contains the CA for the Exoscale DBaaS.
To retrieve the access certificate and key, we need to use the output templates of the CLI.
exo dbaas show test-kafka -z de-muc-1 --output-template '{{.Kafka.ConnectionInfo.AccessCert}}'
-----BEGIN CERTIFICATE-----
MIIEPDCCAqSgAwIBAgIUf+hoGaT3XjwEbguv9LYqNiVUw54wDQYJKoZIhvcNAQEM
BQAwOjE4MDYGA1UEAwwvYjJmODcwNmUtMDBmNi00M2MzLTkyMjktYTU5ZTYzODhm
ZGQwIFByb2plY3QgQ0EwHhcNMjExMTAxMTUxMTM2WhcNMjQwMTMwMTUxMTM2WjA7
....
....
qI70kNpCdbiqVf9EXvlmEkGFbKkR5rGW+QvdGwXTLBs=
-----END CERTIFICATE-----
For further handling, we will put the certificate and key into files. First for the certificate:
exo dbaas show test-kafka -z de-muc-1 --output-template '{{.Kafka.ConnectionInfo.AccessCert}}' > test-kafka_cert.pem
And then for the key:
exo dbaas show test-kafka -z de-muc-1 --output-template '{{.Kafka.ConnectionInfo.AccessKey}}' > test-kafka_key.pem
To cycle the credentials
To cycle credentials, create a new service user and retrieve its certificate and key. Then delete the previous service user.
Connecting to a Kafka instance
At this point, you can create a topic using your programming language of choice. Here is an example with kafka-python.
Please note that if you use the kafka-python guide, you must use your servername:port to establish a successful connection, not the one in the following examples.
# topic-ssl.py
# This script creates a topic named demo_topic
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers="test-kafka-exoscale-a334664a-3015-4b86-8725-edf585d6cfb4.aivencloud.com:21701",
client_id="test",
security_protocol="SSL",
ssl_cafile="ca.pem",
ssl_certfile="test-kafka_cert.pem",
ssl_keyfile="test-kafka_key.pem",
)
topic_list = []
topic_list.append(NewTopic(name="demo-topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
#EOF
At this point, you can produce and consume messages. Here is another example with kafka-python:
Producer
# producer-ssl.py
# This script connects to Kafka and send a few messages
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers="test-kafka-exoscale-a334664a-3015-4b86-8725-edf585d6cfb4.aivencloud.com:21701",
security_protocol="SSL",
ssl_cafile="ca.pem",
ssl_certfile="test-kafka_cert.pem",
ssl_keyfile="test-kafka_key.pem",
)
for i in range(1, 4):
message = "message number {}".format(i)
print("Sending: {}".format(message))
producer.send("demo-topic", message.encode("utf-8"))
# Force sending of all messages
producer.flush()
#EOF
Consumer
# consumer-ssl.py
# This script receives messages from a Kafka topic
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"demo-topic",
auto_offset_reset="earliest",
bootstrap_servers="test-kafka-exoscale-a334664a-3015-4b86-8725-edf585d6cfb4.aivencloud.com:21701",
client_id="test",
group_id="demo-group",
security_protocol="SSL",
ssl_cafile="ca.pem",
ssl_certfile="test-kafka_cert.pem",
ssl_keyfile="test-kafka_key.pem"
)
# Call poll twice. First call will just assign partitions for our
# consumer without actually returning anything
for _ in range(2):
raw_msgs = consumer.poll(timeout_ms=1000)
for tp, msgs in raw_msgs.items():
for msg in msgs:
print("Received: {}".format(msg.value))
# Commit offsets so we won't get the same messages again
consumer.commit()
#EOF
At this point, can produce and consume messages using your programming language of choice.
Please note that if you haven’t created a topic already, you must create one. Otherwise, the connection will not be successful.
SASL connection
If you want to check for the SASL authentication status, you can query it specifically with:
exo dbaas show test-kafka -z de-muc-1 --output-template '{{.Kafka.AuthenticationMethods.SASL}}'
SASL is less secure
The SASL authentication method for Apache Kafka clusters is less secure than SSL certificate authentication. However, it is easier to manipulate for tests and demonstration purposes. Use at your own risk.
To enable SASL, set the update flag with:
exo dbaas update -z de-muc-1 test-kafka --kafka-enable-sasl-auth true
Updating Database Service "test-kafka"...
┼─────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼
│ DATABASE SERVICE │ │
┼─────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼
│ Zone │ de-muc-1 │
│ Name │ test-kafka │
│ Type │ kafka │
│ Plan │ startup-2 │
│ Disk Size │ 90 GiB │
│ State │ running │
│ Creation Date │ 2021-11-01 15:11:37 +0000 UTC │
│ Update Date │ 2021-11-01 15:55:54 +0000 UTC │
│ Nodes │ 3 │
│ Node CPUs │ 2 │
│ Node Memory │ 2.0 GiB │
│ Termination Protected │ true │
│ Maintenance │ wednesday (18:48:43) │
│ URI │ test-kafka-exoscale-08b0165e-ef03-47ec-926f-f01163d557ed.aivencloud.com:21701 │
│ IP Filter │ 0.0.0.0/0 │
│ Authentication Methods │ certificate, SASL │
│ Kafka Connect Enabled │ false │
│ Kafka REST Enabled │ false │
│ Schema Registry Enabled │ false │
│ Components │ │
│ │ kafka test-kafka-exoscale-08b0165e-ef03-47ec-926f-f01163d557ed.aivencloud.com:21701 auth:certificate route:dynamic usage:primary │
│ │ kafka test-kafka-exoscale-08b0165e-ef03-47ec-926f-f01163d557ed.aivencloud.com:21712 auth:sasl route:dynamic usage:primary │
│ │ │
│ ACL │ │
│ │ default username:avnadmin topic:* permission:admin │
│ │ │
│ Users │ avnadmin (primary) │
┼─────────────────────────┼──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼
At this point, the service offers a different port for the SASL connection, whereas the endpoint remains the same. You need to retrieve the SASL port.
From the example above, we now have SASL port 21712. The last thing we need is the username and password, which you can use the following full JSON output to retrieve:
exo dbaas show test-kafka -z de-muc-1 -O json
Restricting connections from the internet
By default, Exoscale DBaaS are not accessible from the whole internet. Data does not transmit in clear over the network as it is SSL encrypted by default and authenticated.
To allow incoming connections to your database service, you can add a filter that allows:
- just one IP address,
- a network range,
- or a combination of IP address and network range
To do this, update your service or create it with the IP filter, which is a comma separated list of CIDRs:
exo dbaas update -z de-muc-1 test-kafka --kafka-ip-filter=1.2.3.4/32,5.6.7.8/24
Specific options when creating or updating a Apache Kafka service
You can find all the specific options for Apache Kafka by using the CLI help:
exo dbaas create --help-kafka
--kafka-connect-settings Kafka Connect configuration settings (JSON format)
--kafka-enable-cert-auth enable certificate-based authentication method
--kafka-enable-kafka-connect enable Kafka Connect
--kafka-enable-kafka-rest enable Kafka REST
--kafka-enable-sasl-auth enable SASL-based authentication method
--kafka-enable-schema-registry enable Schema Registry
--kafka-ip-filter allow incoming connections from CIDR address block
--kafka-rest-settings Kafka REST configuration settings (JSON format)
--kafka-schema-registry-settings Schema Registry configuration settings (JSON format)
--kafka-settings Kafka configuration settings (JSON format)
--kafka-version Kafka major version
Managed Apache Kafka additional configuration
A large number of service specific configuration options are available:
- Kafka Connect
- Kafka REST
- Kafka Registry
- Kafka general
The general settings for Managed Apache Kafka are:
exo dbaas type show kafka --settings kafka
┼────────────────────────────────────────────────────────────┼─────────┼──────────────────────────────────────────────────────┼
│ KEY │ TYPE │ DESCRIPTION │
┼────────────────────────────────────────────────────────────┼─────────┼──────────────────────────────────────────────────────┼
│ log_segment_delete_delay_ms │ integer │ The amount of time to wait before deleting a file │
│ │ │ from the filesystem │
│ │ │ * Minimum: 0 / Maximum: 3.6e+06 │
│ │ │ * Example: 60000 │
│ log_roll_ms │ integer │ The maximum time before a new log segment is │
│ │ │ rolled out (in milliseconds). │
│ │ │ * Minimum: 1 / Maximum: 9.223372036854776e+18 │
│ log_cleaner_min_compaction_lag_ms │ integer │ The minimum time a message will remain uncompacted │
│ │ │ in the log. Only applicable for logs that are │
│ │ │ being compacted. │
│ │ │ * Minimum: 0 / Maximum: 9.223372036854776e+18 │
│ max_incremental_fetch_session_cache_slots │ integer │ The maximum number of incremental fetch sessions │
│ │ │ that the broker will maintain. │
│ │ │ * Minimum: 1000 / Maximum: 10000 │
│ │ │ * Example: 1000 │
│ group_min_session_timeout_ms │ integer │ The minimum allowed session timeout for registered │
│ │ │ consumers. Longer timeouts give consumers more │
│ │ │ time to process messages in between heartbeats at │
│ │ │ the cost of a longer time to detect failures. │
│ │ │ * Minimum: 0 / Maximum: 60000 │
│ │ │ * Example: 6000 │
│ offsets_retention_minutes │ integer │ Log retention window in minutes for offsets topic │
│ │ │ * Minimum: 1 / Maximum: 2.147483647e+09 │
│ │ │ * Example: 10080 │
│ log_flush_interval_ms │ integer │ The maximum time in ms that a message in any topic │
│ │ │ is kept in memory before flushed to disk. If not │
│ │ │ set, the value in log.flush.scheduler.interval.ms │
│ │ │ is used │
│ │ │ * Minimum: 0 / Maximum: 9.223372036854776e+18 │
│ log_retention_hours │ integer │ The number of hours to keep a log file before │
│ │ │ deleting it │
│ │ │ * Minimum: -1 / Maximum: 2.147483647e+09 │
│ log_retention_ms │ integer │ The number of milliseconds to keep a log file │
│ │ │ before deleting it (in milliseconds), If not set, │
│ │ │ the value in log.retention.minutes is used. If set │
│ │ │ to -1, no time limit is applied. │
│ │ │ * Minimum: -1 / Maximum: 9.223372036854776e+18 │
│ min_insync_replicas │ integer │ When a producer sets acks to 'all' (or '-1'), │
│ │ │ min.insync.replicas specifies the minimum number │
│ │ │ of replicas that must acknowledge a write for the │
│ │ │ write to be considered successful. │
│ │ │ * Minimum: 1 / Maximum: 7 │
│ │ │ * Example: 1 │
│ log_preallocate │ boolean │ Should pre allocate file when create new segment? │
│ │ │ * Example: false │
│ group_initial_rebalance_delay_ms │ integer │ The amount of time, in milliseconds, the group │
│ │ │ coordinator will wait for more consumers to join a │
│ │ │ new group before performing the first rebalance. A │
│ │ │ longer delay means potentially fewer rebalances, │
│ │ │ but increases the time until processing begins. │
│ │ │ The default value for this is 3 seconds. During │
│ │ │ development and testing it might be desirable to │
│ │ │ set this to 0 in order to not delay test execution │
│ │ │ time. │
│ │ │ * Minimum: 0 / Maximum: 300000 │
│ │ │ * Example: 3000 │
│ log_message_timestamp_difference_max_ms │ integer │ The maximum difference allowed between the │
│ │ │ timestamp when a broker receives a message and the │
│ │ │ timestamp specified in the message │
│ │ │ * Minimum: 0 / Maximum: 9.223372036854776e+18 │
│ log_index_size_max_bytes │ integer │ The maximum size in bytes of the offset index │
│ │ │ * Minimum: 1.048576e+06 / Maximum: 1.048576e+08 │
│ │ │ * Example: 1.048576e+07 │
│ message_max_bytes │ integer │ The maximum size of message that the server can │
│ │ │ receive. │
│ │ │ * Minimum: 0 / Maximum: 1.000012e+08 │
│ │ │ * Example: 1.048588e+06 │
│ log_roll_jitter_ms │ integer │ The maximum jitter to subtract from │
│ │ │ logRollTimeMillis (in milliseconds). If not set, │
│ │ │ the value in log.roll.jitter.hours is used │
│ │ │ * Minimum: 0 / Maximum: 9.223372036854776e+18 │
│ transaction_remove_expired_transaction_cleanup_interval_ms │ integer │ The interval at which to remove transactions that │
│ │ │ have expired due to transactional.id.expiration.ms │
│ │ │ passing (defaults to 3600000 (1 hour)). │
│ │ │ * Minimum: 600000 / Maximum: 3.6e+06 │
│ │ │ * Example: 3.6e+06 │
│ log_message_downconversion_enable │ boolean │ This configuration controls whether │
│ │ │ down-conversion of message formats is enabled to │
│ │ │ satisfy consume requests. │
│ │ │ * Example: true │
│ log_cleaner_max_compaction_lag_ms │ integer │ The maximum amount of time message will remain │
│ │ │ uncompacted. Only applicable for logs that are │
│ │ │ being compacted │
│ │ │ * Minimum: 30000 / Maximum: 9.223372036854776e+18 │
│ socket_request_max_bytes │ integer │ The maximum number of bytes in a socket request │
│ │ │ (defaults to 104857600). │
│ │ │ * Minimum: 1.048576e+07 / Maximum: 2.097152e+08 │
│ log_segment_bytes │ integer │ The maximum size of a single log file │
│ │ │ * Minimum: 1.048576e+07 / Maximum: 1.073741824e+09 │
│ replica_fetch_max_bytes │ integer │ The number of bytes of messages to attempt to │
│ │ │ fetch for each partition (defaults to 1048576). │
│ │ │ This is not an absolute maximum, if the first │
│ │ │ record batch in the first non-empty partition of │
│ │ │ the fetch is larger than this value, the record │
│ │ │ batch will still be returned to ensure that │
│ │ │ progress can be made. │
│ │ │ * Minimum: 1.048576e+06 / Maximum: 1.048576e+08 │
│ log_flush_interval_messages │ integer │ The number of messages accumulated on a log │
│ │ │ partition before messages are flushed to disk │
│ │ │ * Minimum: 1 / Maximum: 9.223372036854776e+18 │
│ │ │ * Example: 9.223372036854776e+18 │
│ max_connections_per_ip │ integer │ The maximum number of connections allowed from │
│ │ │ each ip address (defaults to 2147483647). │
│ │ │ * Minimum: 256 / Maximum: 2.147483647e+09 │
│ log_index_interval_bytes │ integer │ The interval with which Kafka adds an entry to the │
│ │ │ offset index │
│ │ │ * Minimum: 0 / Maximum: 1.048576e+08 │
│ │ │ * Example: 4096 │
│ transaction_state_log_segment_bytes │ integer │ The transaction topic segment bytes should be kept │
│ │ │ relatively small in order to facilitate faster log │
│ │ │ compaction and cache loads (defaults to 104857600 │
│ │ │ (100 mebibytes)). │
│ │ │ * Minimum: 1.048576e+06 / Maximum: 2.147483647e+09 │
│ │ │ * Example: 1.048576e+08 │
│ log_cleanup_policy │ string │ The default cleanup policy for segments beyond the │
│ │ │ retention window │
│ │ │ * Supported values: │
│ │ │ - delete │
│ │ │ - compact │
│ │ │ - compact,delete │
│ │ │ * Example: delete │
│ default_replication_factor │ integer │ Replication factor for autocreated topics │
│ │ │ * Minimum: 1 / Maximum: 10 │
│ producer_purgatory_purge_interval_requests │ integer │ The purge interval (in number of requests) of the │
│ │ │ producer request purgatory(defaults to 1000). │
│ │ │ * Minimum: 10 / Maximum: 10000 │
│ group_max_session_timeout_ms │ integer │ The maximum allowed session timeout for registered │
│ │ │ consumers. Longer timeouts give consumers more │
│ │ │ time to process messages in between heartbeats at │
│ │ │ the cost of a longer time to detect failures. │
│ │ │ * Minimum: 0 / Maximum: 1.8e+06 │
│ │ │ * Example: 1.8e+06 │
│ auto_create_topics_enable │ boolean │ Enable auto creation of topics │
│ │ │ * Example: true │
│ connections_max_idle_ms │ integer │ Idle connections timeout: the server socket │
│ │ │ processor threads close the connections that idle │
│ │ │ for longer than this. │
│ │ │ * Minimum: 1000 / Maximum: 3.6e+06 │
│ │ │ * Example: 540000 │
│ log_cleaner_min_cleanable_ratio │ number │ Controls log compactor frequency. Larger value │
│ │ │ means more frequent compactions but also more │
│ │ │ space wasted for logs. Consider setting │
│ │ │ log.cleaner.max.compaction.lag.ms to enforce │
│ │ │ compactions sooner, instead of setting a very high │
│ │ │ value for this option. │
│ │ │ * Minimum: 0.2 / Maximum: 0.9 │
│ │ │ * Example: 0.5 │
│ log_retention_bytes │ integer │ The maximum size of the log before deleting │
│ │ │ messages │
│ │ │ * Minimum: -1 / Maximum: 9.223372036854776e+18 │
│ compression_type │ string │ Specify the final compression type for a given │
│ │ │ topic. This configuration accepts the standard │
│ │ │ compression codecs ('gzip', 'snappy', 'lz4', │
│ │ │ 'zstd'). It additionally accepts 'uncompressed' │
│ │ │ which is equivalent to no compression; and │
│ │ │ 'producer' which means retain the original │
│ │ │ compression codec set by the producer. │
│ │ │ * Supported values: │
│ │ │ - gzip │
│ │ │ - snappy │
│ │ │ - lz4 │
│ │ │ - zstd │
│ │ │ - uncompressed │
│ │ │ - producer │
│ log_cleaner_delete_retention_ms │ integer │ How long are delete records retained? │
│ │ │ * Minimum: 0 / Maximum: 3.1556926e+11 │
│ │ │ * Example: 8.64e+07 │
│ num_partitions │ integer │ Number of partitions for autocreated topics │
│ │ │ * Minimum: 1 / Maximum: 1000 │
│ log_message_timestamp_type │ string │ Define whether the timestamp in the message is │
│ │ │ message create time or log append time. │
│ │ │ * Supported values: │
│ │ │ - CreateTime │
│ │ │ - LogAppendTime │
│ replica_fetch_response_max_bytes │ integer │ Maximum bytes expected for the entire fetch │
│ │ │ response (defaults to 10485760). Records are │
│ │ │ fetched in batches, and if the first record batch │
│ │ │ in the first non-empty partition of the fetch is │
│ │ │ larger than this value, the record batch will │
│ │ │ still be returned to ensure that progress can be │
│ │ │ made. As such, this is not an absolute maximum. │
│ │ │ * Minimum: 1.048576e+07 / Maximum: 1.048576e+09 │
┼────────────────────────────────────────────────────────────┼─────────┼──────────────────────────────────────────────────────┼
You can also update the settings of your database service:
exo dbaas update --zone de-fra-1 target-kafka-service-name --kafka-settings '{"compression_type":"lz4"}
Note
The parameter of --kafka-settings
has to be in JSON format.