Oracle to Kafka — Playing with Confluent’s new Oracle CDC Source Connector in Docker

Oracle to Kafka — Playing with Confluent’s new Oracle CDC Source Connector in Docker

It’s really exciting to have a new option for streaming Oracle data into Kafka. With Confluent releasing their “Oracle CDC Source Premium Connector” there’s a new way to capture data that has been added to, updated, or deleted from Oracle RDBMS with Change Data Capture (CDC). Think of this as a low touch way to stream both the Oracle data and schema changes into Kafka and Schema Registry.

I put together some examples so you can play with this entire pipeline in Docker. Docker is an easy-way to run a an end to end pipeline on your machine. I hope this demo gives you a glimpse of how Oracle CDC to Kafka works.

⚠️ Some of these components are paid offerings 💰 for production use. Both the Oracle and Confluent license grant you free licence to play with this stuff as a developer for 30 days (disclaimer: I’m not a lawyer; nor do I play one on TV)

🛠 Code available at https://github.com/saubury/kafka-connect-oracle-cdc

Kafka Connect, CDC— What be this?

Kafka Connect is an open source framework for streaming data between Apache Kafka and other data systems (such as databases). Confluent’s Oracle CDC Source Connector is a plug-in for Kafka Connect, which (surprise) connects Oracle as a source into Kafka as a destination.

This connector uses the Oracle LogMiner interface to query online and archived redo log files. That’s an Oracle specific way to stream both the DML and DDL directly from the redo of the database. This is similar to how the Debezium project reads the Postgres write-ahead log or the MySQL binlog.

Why not just use the Oracle JDBC Source Connector?

Robin Moffatt has written a great article on How to Integrate Your Databases with Apache Kafka which provides a great discussion on the differences between a JDBC connector reading data from a database (such as Oracle) vs a log-based Change Data Capture (CDC) tool to extract changes.

The Oracle JDBC Connector is fine for getting a snapshot of a table. However, to identify changed or deleted rows with a JDBC connector is painful. Let’s just say you’ll quickly fall into a murky world of putting triggers on tables to determine what has changed … and no-one wants to do that.

The Oracle CDC Source Connector on the other hand is essentially listening to the recovery redo segment of the database. This allows for most DML (insert, update, delete) and some DDL (alter table) database events to be captured an Oracle database and produced into into Kafka and the Schema Registry.

Oracle Versions

Worth highlighting that when this blog was written, the connector works with Oracle 11g, 12c and 18c but not Oracle 19c. For this demonstration I’m using Oracle 12c with pluggable databases.

Let’s Play

Clone this repo

git clone [https://github.com/saubury/kafka-connect-oracle-cdc](https://github.com/saubury/kafka-connect-oracle-cdc)

cd kafka-connect-oracle-cdc

Get Oracle CDC Source Connector

Be sure to review the license and download the zip file for the Confluent Oracle CDC Source Connector

Unzip to confluentinc-kafka-connect-oracle-cdc (and remove any trailing version numbers)

unzip ~/Downloads/confluentinc-kafka-connect-oracle-cdc-1.0.3.zip

mv confluentinc-kafka-connect-oracle-cdc-1.0.3 confluentinc-kafka-connect-oracle-cdc

Get Oracle Docker

Credit to Stackoverflow for explaining how to get Oracle working in Docker (well, at least in February 2021)

You should now be able to download an Oracle Docker image

docker login --username YourDockerUserName --password-stdin
<<Enter your password>>

docker pull store/oracle/database-enterprise:12.2.0.1

Docker Startup

You’re all set to startup the platform (which will start Oracle, Kafka, Kafka Connect, Schema registry etc.,)

docker-compose up -d

Setup Oracle Docker

Once the Oracle database is running, we need run a script to perform some setup. This SQL will turn on ARCHIVELOG mode, create some users, and establish permissions

First, ensure the database looks like it’s finished starting up (docker-compose logs -f oracle) and then run the following (for the curious, the SQL script is here )

docker-compose exec oracle /scripts/go_sqlplus.sh /scripts/oracle_setup_docker

Sample Data

This SQL script also creates an EMP table, and adds four employees.

The EMP tableThe EMP table

Connector Configuration

Check the OracleCdcSourceConnector source plug-in is available

curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connector-plugins | jq '.'

And look for an occurrence of “class”: “io.confluent.connect.oracle.cdc.OracleCdcSourceConnector”

All good — now it’s time to establish the SimpleOracleCDC connector. This configuration will initially snapshot the EMP table, and then listen to all redo activity (DML and DDL) against this table.

curl -s -X POST -H 'Content-Type: application/json' --data @SimpleOracleCDC.json http://localhost:8083/connectors | jq

Check the status of the connector. You may need to wait a minute or so for the status to show up

curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC/status | jq

Check topic

If you have Kafka tools installed locally, you can look at the de-serialised AVRO like this

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic ORCLCDB.C__MYUSER.EMP --from-beginning

Or if you don’t have the Kafka tools installed, you can launch kafka-avro-console-consumer via a container like (yes; this is all one line)

docker-compose exec kafka-connect kafka-avro-console-consumer --bootstrap-server kafka:29092 --property schema.registry.url="http://schema-registry:8081" --topic ORCLCDB.C__MYUSER.EMP --from-beginning

The (simplified) output of kafka-avro-console-consumer should look something like. Four rows initially from the table are four records in Kafka

{"I":"\u0001","NAME":{"string":"Bob"}}
{"I":"\u0002","NAME":{"string":"Jane"}}
{"I":"\u0003","NAME":{"string":"Mary"}}
{"I":"\u0004","NAME":{"string":"Alice"}}

Schema

Let’s see what schemas we have registered for the EMP table

curl -s -X GET http://localhost:8081/subjects/ORCLCDB.C__MYUSER.EMP-value/versions/1 | jq -r .schema | jq .

Amongst other things, you’ll see version 1 of the schema has been registered like this

"fields": [
    {
      "name": "I",
      "type": {
        "type": "bytes"
    },
    {
      "name": "NAME",
      "type": [
        "string"
      ]
    }

Insert, update and delete some data

Let’s now insert more data, and try some updates and a delete.

Run docker-compose exec oracle /scripts/go_sqlplus.sh followed by this SQL

insert into C##MYUSER.emp (name) values ('Dale');
insert into C##MYUSER.emp (name) values ('Emma');
update C##MYUSER.emp set name = 'Robert' where name = 'Bob';
delete C##MYUSER.emp where name = 'Jane';
commit;
exit

Updated Sample Data

This adds 2 rows to EMP table, updates 1 row, and deletes 1 row.

Updated EMP tableUpdated EMP table

The (simplified) output of kafka-avro-console-consumer should look something like

{"I":"\u0005","NAME":{"string":"Dale"},"op_type":{"string":"I"}}
{"I":"\u0006","NAME":{"string":"Emma"},"op_type":{"string":"I"}}
{"I":"\u0001","NAME":{"string":"Robert"},"op_type":{"string":"U"}}
{"I":"\u0002","NAME":{"string":"Jane"},"op_type":{"string":"D"}

DDL

So far we’ve seen DML applied to a table. Now let’s try some DDL to modify a table.

Run docker-compose exec oracle /scripts/go_sqlplus.sh followed by this SQL

ALTER TABLE C##MYUSER.EMP ADD (SURNAME VARCHAR2(100));

insert into C##MYUSER.emp (name, surname) values ('Mickey', 'Mouse');
commit;

Updated Sample Data

Our new row looks like this (note the new surname column)

DDL applied to EMP tableDDL applied to EMP table

Schema mutation

Let’s see what schemas we have registered now.

curl -s -X GET http://localhost:8081/subjects/ORCLCDB.C__MYUSER.EMP-value/versions

Will show [1,2] — indicating two versions of the schema are available. We have data registered against version 1 and version 2 of the schema

curl -s -X GET http://localhost:8081/subjects/ORCLCDB.C__MYUSER.EMP-value/versions/2 | jq -r .schema | jq .

Note, schema version 2 has this addition

{
      "name": "SURNAME",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }

Tear Down

To tear down the containers

docker-compose down

Conclusion

Capturing data from an Oracle database into Kafka is a really common request. Having a Confluent supported CDC Source Connector is a great option to consider when you require a low touch way to stream both the Oracle data and schema changes into Kafka and Schema Registry.

Oh, and Docker is awesome !

References