What is Kafka Connect
You might have heard of producer, consumer and kafka but what is Kafka Connect. Kafka Connect is a framework and rutime for implementing and operating
- Source connector that takes data from source and send to kafka. i.e debezium-postgres-conector
- Sink connector that takes data from kafka and send to other system. i.e elasticsearch-connector
Kafka Connect is also some sort of producer/consumer for kafka. Kafka messages are just bytes. So we need to make sure that anyone reading from the topic is using same serialization format as those writing to topic. Otherwise, confusion and errors will ensue!
Kafka Connect has two part:
- Connector: define how to connect to data store. Takes data from source and pass it to converter.
- Converter: handle serialization and deserialization of data
Most of the connector which are written properly will allow you to configure converter. You can use seperate converter for key and value. i.e
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
"value.converter": "io.confluent.connect.avro.AvroConverter"
Kafka Connect flow at source side
Kafka Connect flow at sink side
Kafka Connect Configs
bootstrap.servers
: List of kafka broker hostkey.converter
: Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafkavalue.converter
: Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafkaoffset.storage.topic
: The name of the Kafka topic where connector offsets are stored
Installing & Starting Kafka Connect
When you install kafka using brew, kafka-connect
is also installed,
brew install kafka
Kafka Connect can be started either in standalone mode or distributed mode. Below is command for starting in standalone mode. Please make sure, you start kafka and zookeeper as well.
# start zookeeper
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
# start kafka
kafka-server-start /usr/local/etc/kafka/server.properties
# start kafka connect in standalone mode
connect-standalone /usr/local/etc/kafka/connect-standalone.properties
installing and configuring multiple connector
Get all the jars
of your required connector and put it in CLASSPATH
. Create configuration properties file for each connector and then start Kafka Connect in standalone mode. For example in below command, we are starting kafka connect with two connector local-file-source
and local-file-sink
.
local-file-source
read appended line text in file test-source.txt
and send it to kafka topic connect-test
while local-file-sink
read from kafka topic connect-test
and append the data to file test-sink.txt
.
export CLASSPATH=/path/to/my/connectors/*
connect-standalone /usr/local/etc/kafka/connect-standalone.properties file-source-connector.properties file-sink-connector.properties
file-source-connector.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test-source.txt
topic=connect-test
file-sink-connector.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test-sink.txt
topics=connect-test
Managing Kafka Connect Using Rest API
Kafka Connect provide rest API to manage connector i.e
- Get kafka connect status
curl localhost:8083/
- Get list of connector
curl localhost:8083/connectors
["local-file-source", "local-file-sink"]
-
Create new connector
POST /connectors { "name": "hdfs-sink-connector", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" } }
-
Get status of connector
curl localhost:8083/connectors/local-file-source/status
-
restart connector
curl -X POST localhost:8083/connectors/local-file-source/restart
-
pause connector
curl -X PUT localhost:8083/connectors/local-file-source/pause
-
Reset set of topic names that connector has been using since its creation.
curl -X PUT localhost:8083/connectors/local-file-source/topics/reset curl -X PUT localhost:8083/connectors/local-file-sink/topics/reset