Setup Debezium to Stream Postgres Changes to Kafka

backend7 Min to Read05 Oct 21

Have you ever come in situation where you had to send every DB update to Kafka? May be, you are in multi-team org where other team is interested in listening to event for any changes in your team's DB.

There could be multiple approach to send DB update to kafka i.e writting application level code or writting DB level trigger but using debezium to solve this problem is simple yet powerfull.

In this post, we will learn how we can use Debezium to streamline the process of sending postgres changes event to kafka.

What is Debezium

Debezium is a open source plateform for change data capture(CDC). It connect to your database, capture all row level changes and send it to kafka topic. It support many DBs like mysql, postgres, oracle etc

Debezium Architecture

Debezium provide multiple way to install and use it. Each has its own architecture.

  1. Kafka connect
  2. Standalone server
  3. Embedded Engine

Kafka Connect

Debezium can be deployed to existing kafka connect instance. It will work as source connector that will connect to source DB and publish the DB changes to kafka. You just get the debezium's DB specific connector jar and put it in kafka connect's classpath. Afterward, you can manage debezium connector using kafka connect's REST API. To read more about kafka connect read this blog. debezium kafka connect arch

Standalone server

Debezium provide standalone way to install and use it without Kafka Connect. Most of the functionality is same as Kafka connect but here you will have to manage debezium server directly instead of managing it through kafka connect API. debezium standalone arch

Embedded Engine

Another way to use debezium is to embedd it's core java library in your java applocation. As of now, its only for java application, can't be used with non-java application. Please note, this approach is deprecated.

Setup

kafka setup

Install kafka

brew install kafka

Start zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

Start Kafka

kafka-server-start /usr/local/etc/kafka/server.properties

create kafka consumer to monitor topic. Please note the topic name. By default debezium automatically create topic name for each table with below rule database.server.name

kafka-console-consumer --bootstrap-server localhost:9092 --topic mydbserver.public.roles

List down all the topics

kafka-topics --bootstrap-server 127.0.0.1:9092 --list

Postgres setup

Install Postgres, if you have not

brew install postgresql

Use your existing DB or create new DB for test setup like below

#login to postgres db with role postgres
psql postgres -U postgres

# create db
create database mydb;

# create role table
CREATE TABLE roles (
  id INT PRIMARY KEY     NOT NULL,
  role_name VARCHAR(30) UNIQUE NOT NULL,
  created_at timestamp with time zone default NOW()
);

Update /usr/local/var/postgres/pg_hba.conf to allow debezium-connector host to replicate.

local   replication     all                                     trust
host    replication     all             127.0.0.1/32            trust
host    replication     all             ::1/128                 trust

Update /usr/local/var/postgres/postgresql.conf for replication and logical decoding plugin then restart postgres server

#if using other than pgoutput
shared_preload_libraries = 'decoderbufs,wal2json'

wal_level = logical
max_wal_senders = 1
max_replication_slots = 1

Debezium setup

Download and extract debezium from here. Open terminal at extracted folder.

Create file offset.dat inside folder data. This file is used by debezium to keep track of last update from DB server. Incase, debezium crashes or restart then it will resume from this last offset that was stored in this file.

mkdir data
touch offset.dat

Create file conf/application.properties with below configuration. You can check here for more configuration detail for postgres connector.

debezium.sink.pravega.scope=debezium
debezium.sink.type=kafka
debezium.sink.kafka.producer.bootstrap.servers=localhost:9092
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer


debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=mydb
debezium.source.database.server.name=mydbserver
debezium.source.schema.include.list=public
debezium.source.table.include.list=public.roles,public.users
debezium.source.plugin.name=pgoutput

# to prevent complex and huge data/payload
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState


# this will cause consize payload, otherwise payload will be huge with schema
debezium.format.key.schemas.enable=false
debezium.format.value.schemas.enable=false

Start debezium server

./run.sh

test setup

Now make changes to the DB table that you configure in above setup and observe the console logs on kafka consumer.

debezium kafka postgres test setup

How does it work?

Debezium pg connector uses postgreSQL’s replication protocol to receive changes from db as they are commited in transaction log at certain position called as Log Sequence Numbers (LSNs)

When server commit txn, a seperate server process invoke callback function from logical decoding plugin. This function process this txn and convert them to specific format (Protobuf or JSON) and write them to output stream

When the connector receives changes it transforms the events into Debezium create, update, or delete events that include the LSN of the event. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic.

Periodically, kafka connect records the most recent offset. It indicate source sepecific position information i.e LSN

Logical Decoding Plugin

Logical decoding is the process of extracting all persistent changes to a database’s tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database’s internal state.

logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements.

The output plug-ins transform the data from the write-ahead log’s internal representation into the format the consumer of a replication slot desires.Plug-ins are written in C, compiled, and installed on the machine which runs the PostgreSQL server. Example

protobuf - convert to protobuf

wal2json - convert to json

pgoutput inbuilt into postgres 10+. No need to install.

Replication Slot

A replication slot is a feature in PostgreSQL that ensures that the master server will retain the Write-Ahead Log (WAL) that are needed by the replicas even when they are disconnected from the master.

WAL ensures that when there is a crash in the system or loss of connection, the database can be recovered. When you make changes to the database, WAL files keep building up. WAL logs are stored in the pg_wal directory as a set of segment files.

mydb=# select * from pg_replication_slots;
-[ RECORD 1 ]-------+-----------
slot_name           | debezium
plugin              | pgoutput
slot_type           | logical
datoid              | 827180
database            | mydb
temporary           | f
active              | t
active_pid          | 61435
xmin                |
catalog_xmin        | 901702
restart_lsn         | 0/C2661220
confirmed_flush_lsn | 0/C2661258
wal_status          | reserved
safe_wal_size       |

Do not let the inactive slots stay because the master will retain the WAL files needed by the inactive slot indefinitely and will fill up space in the disk. Command to delete slot

mydb=# select pg_drop_replication_slot('debezium');

Transformation

debezium generate complex and lots of info/data for any CRUD operation which might not be useful/understandable for sink.

For example, part of the structure of an UPDATE change event looks like this:

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
} 

However, other connectors or other parts of the Kafka ecosystem usually expect the data in a simple format like this.

{
	"field1" : "newvalue1",
	"field2" : "newvalue2"
}

Using transformation, you can make your message lightweight. I.e if you add below configuration in debezium server then you can acheive above requirement. This is using debezium inbuilt transformer ExtractNewRecordState. See here for more detail on transformation.

debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState

FAQ

How do i keep the same single topic name for partitioned table?

By default debezium will route changes to different topic for each partition. Using topic routing transformation you can reroute your topic name.

Will debezium send existing table rows to kafka when i setup for first time?

By default yes, when debezium start for first time, it perform an initial consistent snapshot of database. You can change this behaviour by setting snapshot.mode. You can read more about this here

Reference

If you loved this post, Please share it on social media.