Kafka Connect in Action: JDBC Sink



This is a walkthrough of configuring #ApacheKafka #KafkaConnect to stream data from #ApacheKafka to a #database such as #MySQL.

It discusses common errors, how to place the #JDBC driver JAR correctly, how to deal with deserialisation, and how to use ksqlDB to apply a schema to schema-less data.

See for code and details.

🚨 *NOTE* How ksqlDB deals with keys changed in v0.10 (this video shows 0.8). See for details

Table of contents:
* 00:00 Introduction
* 0:44 Populating some data into a test topic
* 2:55 Creating the JDBC Sink
* 4:47 Putting the JDBC driver in the correct place
* 7:45 JDBC Sink connector in action
* 8:52 Debugging the JDBC Sink connector
* 10:27 INSERT vs UPSERT
* 12:26 Dropping fields, adding metadata
* 14:32 Evolving the target table schema
* 16:21 JDBC Sink and schemas
* 18:18 Working with JSON data and the JDBC Sink
* 28:03 Applying a schema to JSON data with ksqlDB
* 34:01 Working with CSV data and the JDBC Sink

References:
* Confluent Hub:
* JDBC Sink connector docs:
* Learn more about Kafka Connect in this talk:
* Kafka Connect docs:

☁️ Confluent Cloud ☁️
Confluent Cloud is a managed Apache Kafka and Confluent Platform service. It scales to zero and lets you get started with Apache Kafka at the click of a mouse. You can signup at and use code 60DEVADV for $60 towards your bill (small print:

source

35 thoughts on “Kafka Connect in Action: JDBC Sink”
  1. When creating the sink connector I get
    "Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector …"

    So after already 3 min i cannot even follow your guide. Maybe there is something wrong with the docker containers?

  2. Hi Robin, regarding 'blacklist : "COL2"'. It is loading NULL incase of insert. BUT, IF i need to load the value while inserting and DO NOT want to update. then ? what should be the configuration ?

  3. Isn't there a way to make a JDBC Sink perform an INSERT IGNORE? If I use mode insert I always get duplicate errors. Can it really be that this case has not been considered? (MySQL dialect)

  4. Hi i have question regarding the automatic table creation in the sink connector, how we can define the custom colum name the sink connector configuration.

    for eg: by default the column name is after_userId, after_firstName . i want to change it to UserId and FirstName. how we can do this in connector configuration?

  5. Hi! I'm having trouble working with timestamp (date) values. Can't reflect changes in sink table if source table has a timestamp (with current_timestamp default). Is there any specific transformation to set in sink/source connectors to solve this problem? Thanks!

  6. @Robin Moffatt Is it possible to have a a mysql database table as source and we have a topic here say "sourceTopic" and then we use this "sourceTopic" in our sink curl config and the sink happens in another mysql table. Basically trying to set a table and audit table kind of scenario here.

  7. So when converting plain JSON to JSON schema using KsqlDB stream, can we specify only certain fields? In my case, I have a nested JSON for more than 100 field values, so do I have to explicitly mention each of them while creating a KsqlDB stream?

  8. It’s a nice video appreciate your efforts.
    Can we create Key Schema as well for SOME_JASON _AS_AVRO? I have similar requirement where I need to do Upsert using JDBS Sink connector which reads from topic created out of streams.

  9. Is there a good way to diagnose why a connector becomes degraded? Is it ok to use a symlink to put the jar under the jdbc drivers folder?

  10. it is possible in apache kafka
    i am working on apache kafka to move data from kafka to mysql using Kafka jdbc connector and mysql connector bot jar files placed to my config folder
    How to run jdbc sink connector in windows cmd prompt

  11. Hi Robin,
    I am a great fan of your KAFKA JDBC source and sink connector. We are right now facing a challenge where in using the KAFKA JDBC Connectors we are unable to connect to ORACLE Database which is in Cloud Kerberoized Environment . Any video or details would be a great help .

  12. Hi Robin, Thanks for sharing. Can you use the Protobuff format from 5.5 instead of Avro? Also for MySQL, I notice you are using version 8. Will this also work on MySQL 5.6 or 5.7? Many thanks, Aidan

  13. Thanks for sharing it and I am following your videos a lot for learning Kafka, I have tried setting up JDBC sink connector to insert into SQL server with the batch size of 500, but it inserts into SQL server one by one rather than in batches which has a negative impact on SQL Server IO, Is something you can suggest to get the batch insertion working?Will look forward to your response. Thanks

Leave a Reply

Your email address will not be published.

Captcha loading...