What is Kafka Connect

backend4 Min to Read05 Oct 21

You might have heard of producer, consumer and kafka but what is Kafka Connect. Kafka Connect is a framework and rutime for implementing and operating

  • Source connector that takes data from source and send to kafka. i.e debezium-postgres-conector
  • Sink connector that takes data from kafka and send to other system. i.e elasticsearch-connector

Kafka Connect is also some sort of producer/consumer for kafka. Kafka messages are just bytes. So we need to make sure that anyone reading from the topic is using same serialization format as those writing to topic. Otherwise, confusion and errors will ensue!

kafka connect arch

Kafka Connect has two part:

  • Connector: define how to connect to data store. Takes data from source and pass it to converter.
  • Converter: handle serialization and deserialization of data

Most of the connector which are written properly will allow you to configure converter. You can use seperate converter for key and value. i.e

"key.converter": "org.apache.kafka.connect.json.JsonConverter"
"value.converter": "io.confluent.connect.avro.AvroConverter"

Kafka Connect flow at source side

kafka connect source arch

Kafka Connect flow at sink side

kafka connect target arch

Kafka Connect Configs

  • bootstrap.servers: List of kafka broker host
  • key.converter: Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka
  • value.converter: Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka
  • offset.storage.topic : The name of the Kafka topic where connector offsets are stored

Installing & Starting Kafka Connect

When you install kafka using brew, kafka-connect is also installed,

brew install kafka

Kafka Connect can be started either in standalone mode or distributed mode. Below is command for starting in standalone mode. Please make sure, you start kafka and zookeeper as well.

# start zookeeper
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

# start kafka
kafka-server-start /usr/local/etc/kafka/server.properties

# start kafka connect in standalone mode
connect-standalone /usr/local/etc/kafka/connect-standalone.properties

installing and configuring multiple connector

Get all the jars of your required connector and put it in CLASSPATH. Create configuration properties file for each connector and then start Kafka Connect in standalone mode. For example in below command, we are starting kafka connect with two connector local-file-source and local-file-sink.

local-file-source read appended line text in file test-source.txt and send it to kafka topic connect-test while local-file-sink read from kafka topic connect-test and append the data to file test-sink.txt.

export CLASSPATH=/path/to/my/connectors/*
connect-standalone /usr/local/etc/kafka/connect-standalone.properties file-source-connector.properties file-sink-connector.properties

file-source-connector.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test-source.txt
topic=connect-test

file-sink-connector.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test-sink.txt
topics=connect-test

Managing Kafka Connect Using Rest API

Kafka Connect provide rest API to manage connector i.e

  • Get kafka connect status
curl localhost:8083/
  • Get list of connector
curl localhost:8083/connectors
 
["local-file-source", "local-file-sink"]
  • Create new connector

    POST /connectors
    {
    "name": "hdfs-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "tasks.max": "10",
        "topics": "test-topic",
        "hdfs.url": "hdfs://fakehost:9000",
        "hadoop.conf.dir": "/opt/hadoop/conf",
        "hadoop.home": "/opt/hadoop",
        "flush.size": "100",
        "rotate.interval.ms": "1000"
    }
    }
  • Get status of connector

    curl localhost:8083/connectors/local-file-source/status
  • restart connector

    curl -X POST localhost:8083/connectors/local-file-source/restart
  • pause connector

    curl -X PUT localhost:8083/connectors/local-file-source/pause
  • Reset set of topic names that connector has been using since its creation.

    curl -X PUT localhost:8083/connectors/local-file-source/topics/reset
    curl -X PUT localhost:8083/connectors/local-file-sink/topics/reset

Reference

If you loved this post, Please share it on social media.