Debezium with Single Message Transformation (SMT)

Hello everyone! In this article, with my teammate Betül Çetinkaya, we are going to write about why we need to use Debezium, what kind of problems we have faced, and how we solved these problems as the Trendyol Seller Finance Team. After a short introduction part and explaining how we benefited from some of the single but effective features of Debezium by tuning connector configuration, we will specifically focus on SMT we use for message filtering.


If you are unfamiliar with terms like Transactional Log Tailing, Change Data Capture (CDC) pattern or you do not know about how to install Debezium and create connector on it, we highly recommend that you read Transaction Log Tailing With Debezium series of posts by our teammate Abdullah YILDIRIM. Because we will assume that you know these topics and how to create a simple connector on Debezium.

As a Seller Finance Team, we are providing solutions to every financial issue of sellers such as settlements, invoices, receipts, payment transferring. Moreover, we facilitate the daily work of the finance department.

We can say that our core business based on settlement domain object. In the simplest manner, we can define settlement as the unit financial record in a seller’s sale. Our settlement records are found in PostgreSQL database. Settlement has relationships with other tables.

Our business is generally processing asynchronously so that we are building our microservices based on event-driven architecture. We prefer to use Kafka as a message broker and we use it frequently in our projects.

Let’s move on to our main story!

We started to encounter performance problems while generating reporting documents, with the increase in the number of sellers and orders. Because these documents contain other columns from other tables that have a relation with settlement table. We decided to migrate settlement table and other related tables -just needed columns- to another database as a single row. Thanks to Debezium, we not only completed the migration but also continue to feed the second database. In this way, we will dispose of join queries and generate document files faster.

System Design

We assume that you have a running Debezium. It is required to send a post request to the running Debezium in order to create a connector. For creating connectors, our base configuration is given below. If you are not familiar with this configuration you can read the documentation of Debezium.
With this base configuration, you are able to create a connector called settlement-connector, that captures every change on settlements table and send all these changes as message to settlement-debezium.public.settlements named Kafka topic.

Base Connector Configuration

Now, we will tune the configurations according to our needs.

Case 1: Ignoring Unnecessary Columns

"public.settlements.created_date, public.settlements.status"

Case 2: Ignoring Tombstone Message

Using the connector option tombstones.on.delete you can control whether upon record deletions a tombstone event should be emitted or not.

“tombstones.on.delete”: “false”

Case 3: Filtering Initial Snapshot

According to the Debezium documentation you can specify which table rows are included in snapshots by using configuration as databaseName.tableName. But be careful at this point, for PostgreSQL, Debezium expects you to define this parameter as schemaName.tableName.

"":"public.settlements", "":
"SELECT * FROM public.settlements WHERE order_date > '2020-11-06'"

Case 4: Message Filtering — Single Message Transform (SMT)

SMTs transform inbound messages after a source connector has produced them, but before they are written to Kafka.

There are different types of transforms such as filter, reroute, unwrap, outbox, etc. but we will use filter type in our case. The filter SMT supports scripting languages that integrate with JSR 223. The filter SMT and any implementation of the JSR 223 API are not included in the Debezium by default. So we need to add SMT and JSR 223 API implementation by following steps in message filtering debezium documentation.

Firstly, we need to download debezium-scripting-1.4.0.Alpha2.tar.gz for SMT. And also need an implementation for JSR 223 API. In our case, we used Groovy as an expression language. You can also use JavaScript or Javascript with Graal.js.

When we get these jar files, we need to extract them into the Debezium plug-in directories of our Kafka Connect environment. For PostgreSQL connector, extract them into ‘/kafka/connect/debezium-connector-postgres/’.

container_name: debezium
image: debezium/connect:1.3
- 8083:8083
- kafka
- postgresql
- CONFIG_STORAGE_TOPIC=debezium_connect_config
- OFFSET_STORAGE_TOPIC=debezium_connect_offset
- STATUS_STORAGE_TOPIC=debezium_connect_status
- GROUP_ID=settlement-debezium-connect
- $PWD/jars:/kafka/connect/debezium-connector-postgres

After your Debezium container is ready, you can check whether jars files moved correctly.

Now that we have added plugins we are ready to use filtering. First of all, we need to configure filter SMT in the connector with the configurations given below.

"transforms": "filter",
"transforms.filter.type": "io.debezium.transforms.Filter"

We also need to specify the regular expression language by configuration given below.

"transforms.filter.language": "jsr223.groovy"

Now we can define our filter conditions by using transforms.filter.condition. We can filter messages by operations and fields. In our example, we need an event when there is an update operation on sent field from false to true.

"transforms.filter.condition": "value.op == 'u' && value.before.sent == false && value.after.sent == true"

When we had completed the configuration we faced a problem that said the value has not ‘op’ field. Then we realized these messages come from Debezium configuration topics. The expression evaluated for every message in the connector including our Debezium config, offset, and status topics. To resolve this issue we need to specify a regular expression with transforms.filter.topic.regex that evaluates the name of the destination topic for an event to determine whether to apply to filter logic. In this way, SMT ignores the event that comes from Debezium configuration topics and filters only messages belong to the target topic.


Our final curl is given below:

Case 5: Heavy Message Payload

Environment variables that start with CONNECT_ will be used to update the Kafka Connect worker configuration file. To update kafka connect configurations we can use CONNECT_PRODUCER_MAX_REQUEST_SIZE as an environment variable.

CONFIG_STORAGE_TOPIC: “settlement-debezium-config”
OFFSET_STORAGE_TOPIC: “settlement-debezium-offset”
STATUS_STORAGE_TOPIC: “settlement-debezium-status”
GROUP_ID: “settlement-debezium-connect”

It allows you to produce a message but it is not enough. Your topic max.message.bytes configuration must be tuned. You can create a topic as follows:

bin/ --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 12 --topic settlement-debezium.public.settlements --config max.message.bytes=3000000

Lastly, on the consumer side, don’t forget to specify configurations such as


that allows you to consume these big messages. Moreover, beware of giving


because processing these messages may cost too much time.


  • Unnecessary Columns
  • Tombstone Messages
  • Initial Snapshot
  • CDC Messages

and handle huge payloads.

If you want to use up-and-coming technologies such as Debezium on production, contribute these works, and develop highly scalable applications join to Trendyol family.

hey How Are You ?