Tutorial: Streaming CDC using Debezium on Openshift 4

Erfin Feluzy
4 min readJun 18, 2020

--

Live stream your database update to Kafka — Strimzi.io using Debezium.

Strimzi provides a way to run an Apache Kafka cluster on Kubernetes in various deployment configurations. Combined with debezium as CDC connector and Openshift 4, you can setup this stack in minutes.

This tutorial will use this scenario as result.

Preparation: Install MySql from Debezium Example

Login to Openshift CLI and create new project

$ oc login
$ oc new-project erfin-debezium

Deploy mysql database from debezium’s example

$ oc new-app docker.io/debezium/example-mysql:0.5 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw

Try database on pod instance

$ oc get pods | grep  example-mysql$ oc rsh example-mysql-1-vg2rb$ mysql -u mysqluser -p inventory
Enter password: mysqlpw
mysql> select * from customers;+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | erfinf | Walker | ed@walker.com |
| 1004 | erfin | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+

Step 1: Install Strimzi Kafka Cluster using OpenShift web console

This steps need cluster-admin role to install Strimzi Operator to your namespace (erfin-debezium)

Notes: use default config (ephemeral) for this tutorial

Step 2: Install Strimzi Kafka Connect Source to Image (s2i) using Openshift web console

Install strimzi Connect s2i

Expose route Kafka connect api to register database’s cdc listener later

$ oc expose svc/my-connect-cluster-connect-api
$ oc get route

result:

NAME                             HOST/PORT                                                                                  
my-connect-cluster-connect-api my-connect-cluster-connect-api-erfin-debezium.apps.erfin-cluster.sandbox1459.opentlc.com

Step 3: Create folder for Plugins, Download Debezium MySql Connector on your Local computer

$ mkdir plugins
$ cd plugins
$ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.1.2.Final/debezium-connector-mysql-1.1.2.Final-plugin.tar.gz
$ tar -xzf debezium-connector-mysql-1.1.2.Final-plugin.tar.gz

Structure will be like this

$HOME/plugins
-|
|-debezium-connector-mysql
|- ...
|- debezium-connector-mysql-1.1.2.Final.jar
|- ...

Step 4: Upload debezium plugin to Kafka Connect

$ oc start-build my-connect-cluster-connect --from-dir $HOME/plugins/

Step 5: Register debezium connector to target DB (example-mysql) using REST API

$ oc get route
$ curl --location --request POST 'http://my-connect-cluster-connect-api-erfin-demo.apps.erfin-cluster.sandbox1899.opentlc.com/connectors' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{ "name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "example-mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "my-cluster-kafka-brokers:9092",
"database.history.kafka.topic": "dbhistory.inventory" } }'

Result if success

201Created

Or use Postman for simpler use :)

Try your application!

Deploy kafka client apps using OpenShift CLI (oc). You can deploy as serverless application with knative. Steps is on next section.

Note: this application use quarkus with smallrye kafka client on my Quay.io registry :)

$ oc new-app \
quay.io/efeluzy/quarkus-kafka-consumer:latest \
-e mp.messaging.incoming.mytopic-subscriber.topic=dbserver1.inventory.customers
$ curl http://quarkus-kafka-consumer-erfin-debezium.apps.erfin-cluster.sandbox1459.opentlc.com/stream

Simulate database update:

mysql> update customers set first_name='erfin' where id = 1003;

Check stream on curl or browser

CDC data when updating customer Row

Message from kafka topic: dbserver1.inventory.customers

....
"payload": {
"before": {
"id": 1003,
"first_name": "Edward",
"last_name": "Walker",
"email": "ed@walker.com"
},
"after": {
"id": 1003,
"first_name": "Erfin",
"last_name": "Walker",
"email": "ed@walker.com"
},
....

on OpenShift Topology:

Optional: deploy your apps as knative serverless application.

$ kn service update quarkus-kafka-consumer \
--image quay.io/efeluzy/quarkus-kafka-consumer:latest \
--revision-name quarkus-kafka-consumer-v4 \
--env mp.messaging.incoming.mytopic-subscriber.topic=dbserver1.inventory.customers
$ kn route list$ curl http://quarkus-kafka-consumer-erfin-debezium.apps.erfin-cluster.sandbox1459.opentlc.com/stream

--

--

Erfin Feluzy
Erfin Feluzy

Written by Erfin Feluzy

Kuli Ketik dan AppDev Solution Architect at Red Hat. Eat, Code, Sleep — repeat.

No responses yet