-
Bug
-
Resolution: Done
-
Major
-
0.5.1
-
None
When PostgreSQL connector start to copy following table, BigDecimal scale error occurred.
# \d mytable1 Table "public.mytable1" Column | Type | Modifiers --------------------+-----------------------------+--------------------------------------------------------- id | integer | not null default nextval('mytable1_id_seq'::regclass) name | character varying(255) | not null key | character varying(255) | not null price | numeric | select_p | integer | default 2 myfield1 | integer | not null created_at | timestamp without time zone | updated_at | timestamp without time zone | Indexes: "mytable1_pkey" PRIMARY KEY, btree (id) "index_mytable1_on_myfield1" UNIQUE, btree (myfield1)
[2017-08-02 08:14:19,815] ERROR Task mydebezium-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.DataException: BigDecimal has mismatching scale value for given Decimal schema at org.apache.kafka.connect.data.Decimal.fromLogical(Decimal.java:69) at io.confluent.connect.avro.AvroData$5.convert(AvroData.java:237) at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:341) at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:487) at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:487) at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:295) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:73) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:197) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2017-08-02 08:14:19,816] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [2017-08-02 08:14:19,816] INFO Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer)
debezium connector config:
{ "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.dbname": "mydb_staging", "database.hostname": "mydb-test-db-1", "database.password": "mypass", "database.port": "6543", "database.server.name": "mydb", "database.user": "postgres", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://my_schema_registry:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://my_schema_registry:8081" } }
A sample of logging data:
[2017-08-03 11:34:27,038] DEBUG Mapped primary key for table 'public.mytable1' to schema: {"name" : "mydb.public.mytable1.Key", "type" : "STRUCT", "optional" : "false", "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false"}}]} (io.debezium.relational.TableSchemaBuilder) [2017-08-03 11:34:27,038] DEBUG Mapped columns for table 'public.mytable1' to schema: {"name" : "mydb.public.mytable1.Value", "type" : "STRUCT", "optional" : "true", "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "name", "index" : "1", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "key", "index" : "2", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "price", "index" : "3", "schema" : {"name" : "org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "true", "version" : "1"}}, {"name" : "select_p", "index" : "4", "schema" : {"type" : "INT32", "optional" : "true"}}, {"name" : "myfield1", "index" : "5", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "created_at", "index" : "6", "schema" : {"name" : "io.debezium.time.NanoTimestamp", "type" : "INT64", "optional" : "true", "version" : "1"}}, {"name" : "updated_at", "index" : "7", "schema" : {"name" : "io.debezium.time.NanoTimestamp", "type" : "INT64", "optional" : "true", "version" : "1"}}]} (io.debezium.relational.TableSchemaBuilder) [2017-08-03 11:34:27,049] INFO Step 2: locking each of the database tables, waiting a maximum of '10.0' seconds for each lock (io.debezium.connector.postgresql.RecordsSnapshotProducer) [2017-08-03 11:34:27,080] DEBUG checking for more records... (io.debezium.connector.postgresql.PostgresConnectorTask) [2017-08-03 11:34:27,080] DEBUG no records available yet, sleeping a bit... (io.debezium.connector.postgresql.PostgresConnectorTask) [2017-08-03 11:34:27,126] DEBUG refreshing DB schema for table 'public.mytable1' (io.debezium.connector.postgresql.PostgresSchema) [2017-08-03 11:34:27,126] DEBUG Mapping table 'public.mytable1' to schemas under 'mydb.public.mytable1' (io.debezium.relational.TableSchemaBuilder) [2017-08-03 11:34:27,126] DEBUG - field 'id' (INT32) from column id serial(10,0) NOT NULL AUTO_INCREMENTED (io.debezium.relational.TableSchemaBuilder) [2017-08-03 11:34:27,126] DEBUG - field 'id' (INT32) from column id serial(10,0) NOT NULL AUTO_INCREMENTED (io.debezium.relational.TableSchemaBuilder) [2017-08-03 11:34:27,126] DEBUG - field 'name' (STRING) from column name varchar(255,0) NOT NULL (io.debezium.relational.TableSchemaBuilder) [2017-08-03 11:34:27,126] DEBUG - field 'key' (STRING) from column key varchar(255,0) NOT NULL (io.debezium.relational.TableSchemaBuilder) [2017-08-03 11:34:27,126] DEBUG - field 'price' (BYTES) from column price numeric(131089,0) (io.debezium.relational.TableSchemaBuilder) [2017-08-03 11:34:27,126] DEBUG - field 'select_p' (INT32) from column select_p int4(10,0) (io.debezium.relational.TableSchemaBuilder) [2017-08-03 11:34:27,126] DEBUG - field 'myfield1' (INT32) from column myfield1 int4(10,0) NOT NULL (io.debezium.relational.TableSchemaBuilder) [2017-08-03 11:34:27,127] DEBUG - field 'created_at' (INT64) from column created_at timestamp(29,6) (io.debezium.relational.TableSchemaBuilder) [2017-08-03 11:34:27,127] DEBUG - field 'updated_at' (INT64) from column updated_at timestamp(29,6) (io.debezium.relational.TableSchemaBuilder) [2017-08-03 11:34:27,127] DEBUG Mapped primary key for table 'public.mytable1' to schema: {"name" : "mydb.public.mytable1.Key", "type" : "STRUCT", "optional" : "false", "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false"}}]} (io.debezium.relational.TableSchemaBuilder) [2017-08-03 11:34:27,127] DEBUG Mapped columns for table 'public.mytable1' to schema: {"name" : "mydb.public.mytable1.Value", "type" : "STRUCT", "optional" : "true", "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "name", "index" : "1", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "key", "index" : "2", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "price", "index" : "3", "schema" : {"name" : "org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "true", "version" : "1"}}, {"name" : "select_p", "index" : "4", "schema" : {"type" : "INT32", "optional" : "true"}}, {"name" : "myfield1", "index" : "5", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "created_at", "index" : "6", "schema" : {"name" : "io.debezium.time.NanoTimestamp", "type" : "INT64", "optional" : "true", "version" : "1"}}, {"name" : "updated_at", "index" : "7", "schema" : {"name" : "io.debezium.time.NanoTimestamp", "type" : "INT64", "optional" : "true", "version" : "1"}}]} (io.debezium.relational.TableSchemaBuilder) [2017-08-03 11:34:27,132] INFO read xlogStart at '28/9807C450' from transaction '727746' (io.debezium.connector.postgresql.RecordsSnapshotProducer) [2017-08-03 11:34:27,132] INFO Step 3: reading and exporting the contents of each table (io.debezium.connector.postgresql.RecordsSnapshotProducer) [2017-08-03 11:34:27,136] INFO exporting data from table 'public.mytable1' (io.debezium.connector.postgresql.RecordsSnapshotProducer) [2017-08-03 11:34:27,171] DEBUG sending read event 'SourceRecord{sourcePartition={server=mydb}, sourceOffset={last_snapshot_record=false, lsn=174349337680, txId=727746, ts_usec=1501760067167000, snapshot=true}} ConnectRecord{topic='mydb.public.mytable1', kafkaPartition=null, key=Struct{id=41}, value=Struct{after=Struct{id=41,name=testdata1,key=2134-e78c6c52-9fbe-4bdb-a909-2b37c0284ea7,price=199.0,select_p=1,myfield1=65,created_at=1478167039574000000,updated_at=1478167039574000000},source=Struct{name=mydb,ts_usec=1501760067167000,txId=727746,lsn=174349337680,snapshot=true,last_snapshot_record=false},op=r,ts_ms=1501760067170}, timestamp=null}' (io.debezium.connector.postgresql.RecordsSnapshotProducer)
Actual data of the numeric field:
# select distinct(price) from mytable; price --------- 223.0 556.0 500.0 1000.0 230.0 110.0 130.0 340.0 101.0 1.0 12345.0 200.0 8.0 10000.0 600.0 150.0 50.0 199.0 400.0 100.0 40.0 123.0 256.0 1112.0 1234.0 2.0 218.0 378.0 10.0 300.0 (30 rows)
I thought the issue described at https://issues.jboss.org/browse/DBZ-287 might be related, but the error still happen in 0.5.2-SNAPSHOT which solved DBZ-287.
I originally reported this issue to https://groups.google.com/forum/#!topic/debezium/7NzZtKBS-PA .