Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1009

Postgres Connector fails on none nullable MACADDR field during initial snapshot

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 0.9.0.Beta2
    • 0.8.0.Final, 0.8.1.Final, 0.8.2.Final, 0.8.3.Final, 0.9.0.Beta1
    • postgresql-connector
    • None
    • Hide

      In a fresh database create the following tables

      BEGIN;

      SET ROLE 'postgres';

      CREATE TABLE actions (id SERIAL PRIMARY KEY, action INTEGER NOT NULL);
      INSERT INTO actions (action) VALUES (1);

      CREATE TABLE devices (id SERIAL PRIMARY KEY, device_uid MACADDR NOT NULL, active INTEGER);
      INSERT INTO devices (device_uid,active) VALUES ('11:22:33:44:55:66',1);

      COMMIT;

      Then configure a connect job and enable "include.unknown.datatypes": true

      During the initial snapshot debezium fails with this error:

      org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.
      at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:168)
      at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:149)
      at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:146)
      at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:186)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.util.concurrent.CompletionException: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "device_uid", schema type: BYTES
      at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
      at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
      at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1629)
      ... 3 more
      Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "device_uid", schema type: BYTES
      at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:218)
      at org.apache.kafka.connect.data.Struct.validate(Struct.java:233)
      at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:251)
      at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
      at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
      at io.debezium.data.Envelope.read(Envelope.java:245)
      at io.debezium.connector.postgresql.RecordsSnapshotProducer.generateReadRecord(RecordsSnapshotProducer.java:328)
      at io.debezium.connector.postgresql.RecordsSnapshotProducer.readTable(RecordsSnapshotProducer.java:266)
      at io.debezium.connector.postgresql.RecordsSnapshotProducer.lambda$takeSnapshot$6(RecordsSnapshotProducer.java:191)
      at io.debezium.jdbc.JdbcConnection.queryWithBlockingConsumer(JdbcConnection.java:408)
      at io.debezium.connector.postgresql.RecordsSnapshotProducer.takeSnapshot(RecordsSnapshotProducer.java:189)
      at io.debezium.connector.postgresql.RecordsSnapshotProducer.lambda$start$0(RecordsSnapshotProducer.java:81)
      at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
      ... 3 more

      A workaround is to not insert any data to the none nullable MACADDR table (devices in this case) before the doing the initial snapshot. After the snapshot is done, data can be inserted and the changes are streamed to kafka just fine.

      Another workaround is to not apply the NOT NULL constraint at table creation. In this case data can be inserted before the initial snapshot and debezium will not error out during the initial snapshot. However, the constraint may often be a requirement for data models.

      Show
      In a fresh database create the following tables BEGIN; SET ROLE 'postgres'; CREATE TABLE actions (id SERIAL PRIMARY KEY, action INTEGER NOT NULL); INSERT INTO actions (action) VALUES (1); CREATE TABLE devices (id SERIAL PRIMARY KEY, device_uid MACADDR NOT NULL, active INTEGER); INSERT INTO devices (device_uid,active) VALUES ('11:22:33:44:55:66',1); COMMIT; Then configure a connect job and enable "include.unknown.datatypes": true During the initial snapshot debezium fails with this error: org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped. at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:168) at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:149) at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:146) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:186) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.CompletionException: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "device_uid", schema type: BYTES at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1629) ... 3 more Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "device_uid", schema type: BYTES at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:218) at org.apache.kafka.connect.data.Struct.validate(Struct.java:233) at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:251) at org.apache.kafka.connect.data.Struct.put(Struct.java:216) at org.apache.kafka.connect.data.Struct.put(Struct.java:203) at io.debezium.data.Envelope.read(Envelope.java:245) at io.debezium.connector.postgresql.RecordsSnapshotProducer.generateReadRecord(RecordsSnapshotProducer.java:328) at io.debezium.connector.postgresql.RecordsSnapshotProducer.readTable(RecordsSnapshotProducer.java:266) at io.debezium.connector.postgresql.RecordsSnapshotProducer.lambda$takeSnapshot$6(RecordsSnapshotProducer.java:191) at io.debezium.jdbc.JdbcConnection.queryWithBlockingConsumer(JdbcConnection.java:408) at io.debezium.connector.postgresql.RecordsSnapshotProducer.takeSnapshot(RecordsSnapshotProducer.java:189) at io.debezium.connector.postgresql.RecordsSnapshotProducer.lambda$start$0(RecordsSnapshotProducer.java:81) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) ... 3 more A workaround is to not insert any data to the none nullable MACADDR table (devices in this case) before the doing the initial snapshot. After the snapshot is done, data can be inserted and the changes are streamed to kafka just fine. Another workaround is to not apply the NOT NULL constraint at table creation. In this case data can be inserted before the initial snapshot and debezium will not error out during the initial snapshot. However, the constraint may often be a requirement for data models.

      During the initial snapshot, if table contains a MACADDR NOT NULL field and the table contains data, debezium will fail hard.

      I have been able to reproduce this bug every time with version 0.8.2, 0.8.3 and 0.9.0.Beta1. I have not tried other versions but I assume the bug is present in those as well.

              jpechane Jiri Pechanec
              jabbors Johan Abbors (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: