-
Bug
-
Resolution: Not a Bug
-
Minor
-
None
-
None
-
None
-
None
-
False
-
False
-
hi there
when i use postgresql source connector, it try to registry schema with out non nullable columns
external_invoice_urls is not nullable but it try to registry schema with out external_invoice_urls
my env is
kafka connect : 5.5.1
debezium : 1.7.1
postgresql(RDS 10.18) with wal2json
here is error logs
---------------------------
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:297) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:323) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:247) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic rt.postgresql.mrt-service.public.reservations : at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:91) at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:297) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 11 more Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema:
{ "type": "record", "name": "Envelope", "namespace": "rt.postgresql.mrt_service.public.reservations", "fields": [ { "name": "before", "type": [ "null", { "type": "record", "name": "Value", "fields": [ { "name": "id", "type": { "type": "int", "connect.default": 0 }, "default": 0 }, { "name": "begin_at", "type": [ "null", { "type": "int", "connect.version": 1, "connect.name": "io.debezium.time.Date" } ], "default": null }, { "name": "number_of_people", "type": [ "null", "int" ], "default": null }, { "name": "message", "type": [ "null", "string" ], "default": null }, { "name": "offer_id", "type": [ "null", "int" ], "default": null }, { "name": "created_at", "type": { "type": "long", "connect.version": 1, "connect.name": "io.debezium.time.MicroTimestamp" } }, { "name": "updated_at", "type": { "type": "long", "connect.version": 1, "connect.name": "io.debezium.time.MicroTimestamp" } }, { "name": "phone_number", "type": [ { "type": "string", "connect.default": "NULL::character varying" }, "null" ], "default": "NULL::character varying" }, { "name": "user_id", "type": [ "null", "int" ], "default": null }, { "name": "price_amount", "type": [ "null", "double" ], "default": null }, { "name": "price_currency_code", "type": [ "null", "string" ], "default": null }, { "name": "commission_price_amount", "type": [ "null", "double" ], "default": null }, { "name": "commission_price_currency_code", "type": [ "null", "string" ], "default": null }, { "name": "guide_price_amount", "type": [ "null", "double" ], "default": null }, { "name": "guide_price_currency_code", "type": [ "null", "string" ], "default": null }, { "name": "deleted_at", "type": [ "null", { "type": "long", "connect.version": 1, "connect.name": "io.debezium.time.MicroTimestamp" } ], "default": null }, { "name": "purpose", "type": [ "null", "string" ], "default": null }, { "name": "status", "type": [ "null", "string" ], "default": null }, { "name": "status_updated_at", "type": [ "null", { "type": "long", "connect.version": 1, "connect.name": "io.debezium.time.MicroTimestamp" } ], "default": null }, { "name": "type", "type": [ "null", "string" ], "default": null }, { "name": "phone_icc", "type": [ "null", "int" ], "default": null }, { "name": "meeting_point", "type": [ "null", "string" ], "default": null }, { "name": "meeting_time", "type": [ "null", { "type": "long", "connect.version": 1, "connect.name": "io.debezium.time.MicroTime" } ], "default": null }, { "name": "including_service", "type": [ "null", "string" ], "default": null }, { "name": "excluding_service", "type": [ "null", "string" ], "default": null }, { "name": "course_description", "type": [ "null", "string" ], "default": null }, { "name": "duration_unit", "type": [ "null", "string" ], "default": null }, { "name": "duration_size", "type": [ "null", "int" ], "default": null }, { "name": "pickup", "type": { "type": "boolean", "connect.default": false }, "default": false }, { "name": "custom", "type": { "type": "boolean", "connect.default": false }, "default": false }, { "name": "access_secret_key", "type": [ "null", "string" ], "default": null }, { "name": "created_country", "type": [ "null", "string" ], "default": null }, { "name": "created_platform", "type": [ "null", "string" ], "default": null }, { "name": "extra_choices", "type": { "type": "string", "connect.version": 1, "connect.default": "{}", "connect.name": "io.debezium.data.Json" }, "default": "{}" }, { "name": "istanbul_invoice_url", "type": [ "null", "string" ], "default": null }, { "name": "istanbul_sync_result_message", "type": [ "null", "string" ], "default": null }, { "name": "istanbul_sync_status", "type": [ "null", "string" ], "default": null }, { "name": "istanbul_sync_additional_message", "type": [ "null", "string" ], "default": null }, { "name": "istanbul_reservation_key", "type": [ "null", "string" ], "default": null }, { "name": "meeting_time_extra", "type": [ "null", "string" ], "default": null }, { "name": "price_guarantee", "type": { "type": "boolean", "connect.default": false }, "default": false }, { "name": "guide_commission_rate", "type": [ "null", "double" ], "default": null }, { "name": "offer_commission_rate", "type": [ "null", "double" ], "default": null }, { "name": "extra_required_encrypt", "type": { "type": "string", "connect.version": 1, "connect.default": "{}", "connect.name": "io.debezium.data.Json" }, "default": "{}" }, { "name": "extra_choices_description", "type": [ { "type": "string", "connect.version": 1, "connect.default": "\"{}\"", "connect.name": "io.debezium.data.Json" }, "null" ], "default": "\"{}\"" }, { "name": "user_resting", "type": { "type": "boolean", "connect.default": false }, "default": false }, { "name": "user_withdraw", "type": { "type": "boolean", "connect.default": false }, "default": false } ], "connect.name": "rt.postgresql.mrt_service.public.reservations.Value" } ], "default": null }, { "name": "after", "type": [ "null", "Value" ], "default": null }, { "name": "source", "type": { "type": "record", "name": "Source", "namespace": "io.debezium.connector.postgresql", "fields": [ { "name": "version", "type": "string" }, { "name": "connector", "type": "string" }, { "name": "name", "type": "string" }, { "name": "ts_ms", "type": "long" }, { "name": "snapshot", "type": [ { "type": "string", "connect.version": 1, "connect.parameters": { "allowed": "true,last,false" }, "connect.default": "false", "connect.name": "io.debezium.data.Enum" }, "null" ], "default": "false" }, { "name": "db", "type": "string" }, { "name": "sequence", "type": [ "null", "string" ], "default": null }, { "name": "schema", "type": "string" }, { "name": "table", "type": "string" }, { "name": "txId", "type": [ "null", "long" ], "default": null }, { "name": "lsn", "type": [ "null", "long" ], "default": null }, { "name": "xmin", "type": [ "null", "long" ], "default": null } ], "connect.name": "io.debezium.connector.postgresql.Source" } }, { "name": "op", "type": "string" }, { "name": "ts_ms", "type": [ "null", "long" ], "default": null }, { "name": "transaction", "type": [ "null", { "type": "record", "name": "ConnectDefault", "namespace": "io.confluent.connect.avro", "fields": [ { "name": "id", "type": "string" }, { "name": "total_order", "type": "long" }, { "name": "data_collection_order", "type": "long" } ] } ], "default": null } ], "connect.name": "rt.postgresql.mrt_service.public.reservations.Envelope" }