Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1017

Connector crashes with java.lang.NullPointerException when using multiple sinks to consume the messages

XMLWordPrintable

    • Hide

      1. setup the sql server source connector with multiple whitelisted tables
      2. setup two mysql sink connectors
      3. see the source connector crash

      Show
      1. setup the sql server source connector with multiple whitelisted tables 2. setup two mysql sink connectors 3. see the source connector crash

      I'm trying to get the SQL Server connector to work but there seems to be some problems when connecting sinks to the topics produced by the source connector. The source connector crashes with the error message below.

      The error:

      [2018-11-29 15:57:33,288] ERROR Unexpected exception while processing record 'ConsumerRecord(topic = SQLSERVER01.history, partition = 0, offset = 25, CreateTime = 1543506974896, serialized key size = -1, serialized value size = 203, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {
        "source" : {
          "server" : "SQLSERVER01"
        },
        "position" : {
          "snapshot" : true,
          "snapshot_completed" : true
        },
        "databaseName" : "DATABASE01",
        "schemaName" : "dbo",
        "ddl" : "N/A"
      })' (io.debezium.relational.history.KafkaDatabaseHistory)
      java.lang.NullPointerException
              at io.debezium.relational.history.AbstractDatabaseHistory.lambda$recover$1(AbstractDatabaseHistory.java:92)
              at io.debezium.relational.history.KafkaDatabaseHistory.recoverRecords(KafkaDatabaseHistory.java:238)
              at io.debezium.relational.history.AbstractDatabaseHistory.recover(AbstractDatabaseHistory.java:73)
              at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:43)
              at io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:106)
              at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      [2018-11-29 15:57:33,291] INFO WorkerSourceTask{id=sqlserver-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
      [2018-11-29 15:57:33,291] INFO WorkerSourceTask{id=sqlserver-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
      [2018-11-29 15:57:33,292] ERROR WorkerSourceTask{id=sqlserver-source-connector-0} Exception thrown while calling task.commit() (org.apache.kafka.connect.runtime.WorkerSourceTask)
      java.lang.NullPointerException
              at io.debezium.connector.sqlserver.SqlServerConnectorTask.commit(SqlServerConnectorTask.java:167)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:506)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:447)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:238)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      [2018-11-29 15:57:33,292] ERROR WorkerSourceTask{id=sqlserver-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
      java.lang.NullPointerException
              at io.debezium.relational.history.AbstractDatabaseHistory.lambda$recover$1(AbstractDatabaseHistory.java:92)
              at io.debezium.relational.history.KafkaDatabaseHistory.recoverRecords(KafkaDatabaseHistory.java:238)
              at io.debezium.relational.history.AbstractDatabaseHistory.recover(AbstractDatabaseHistory.java:73)
              at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:43)
              at io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:106)
              at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      [2018-11-29 15:57:33,292] ERROR WorkerSourceTask{id=sqlserver-source-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
      

      My configuration

      My source connector:

      {
        "name": "sqlserver-source-connector",
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "database.hostname": "host.docker.internal",
        "database.port": "1433",
        "database.user": "dbusername",
        "database.password": "dbpassword",
        "database.dbname": "dbname",
        "database.history.kafka.bootstrap.servers": "kafka1:19092",
        "database.history.kafka.topic": "SQLSERVER01.history",
        "database.server.name": "SQLSERVER01",
        "table.whitelist": "^dbo\\.table1$,^dbo\\.table2$,^dbo\\.table3$"
      }
      

      My sink connectors:

      {
        "name": "table1-sink-connector",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "transforms": "route,unwrap",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "tasks.max": "1",
        "topics": "SQLSERVER01.dbo.table1",
        "auto.create": "true",
        "connection.url": "jdbc:mysql://dbservice.db/dbname?user=dbuser&password=r4ndom",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "PK_table1"
      }
      {
        "name": "table2-sink-connector",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "transforms": "route,unwrap",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "tasks.max": "1",
        "topics": "SQLSERVER01.dbo.table2",
        "auto.create": "true",
        "connection.url": "jdbc:mysql://dbservice.db/dbname?user=dbuser&password=r4ndom",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "PK_table2"
      }
      

              jpechane Jiri Pechanec
              edpenglund Victor Englund (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: