Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-318

BigDecimal has mismatching scale value for given Decimal schema error.

    XMLWordPrintable

Details

    • Bug
    • Resolution: Done
    • Major
    • 0.6
    • 0.5.1
    • postgresql-connector
    • None

    Description

      When PostgreSQL connector start to copy following table, BigDecimal scale error occurred.

      # \d mytable1
                                               Table "public.mytable1"
             Column       |            Type             |                        Modifiers
      --------------------+-----------------------------+---------------------------------------------------------
       id                 | integer                     | not null default nextval('mytable1_id_seq'::regclass)
       name               | character varying(255)      | not null
       key                | character varying(255)      | not null
       price              | numeric                     |
       select_p       | integer                     | default 2
       myfield1 | integer                     | not null
       created_at         | timestamp without time zone |
       updated_at         | timestamp without time zone |
      Indexes:
          "mytable1_pkey" PRIMARY KEY, btree (id)
          "index_mytable1_on_myfield1" UNIQUE, btree (myfield1)
      
      [2017-08-02 08:14:19,815] ERROR Task mydebezium-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
      org.apache.kafka.connect.errors.DataException: BigDecimal has mismatching scale value for given Decimal schema
       at org.apache.kafka.connect.data.Decimal.fromLogical(Decimal.java:69)
       at io.confluent.connect.avro.AvroData$5.convert(AvroData.java:237)
       at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:341)
       at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:487)
       at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:487)
       at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:295)
       at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:73)
       at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:197)
       at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)
       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
       at java.lang.Thread.run(Thread.java:745)
      [2017-08-02 08:14:19,816] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
      [2017-08-02 08:14:19,816] INFO Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer)
      
      

      debezium connector config:

      {
              "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
              "database.dbname": "mydb_staging",
              "database.hostname": "mydb-test-db-1",
              "database.password": "mypass",
              "database.port": "6543",
              "database.server.name": "mydb",
              "database.user": "postgres",
              "key.converter": "io.confluent.connect.avro.AvroConverter",
              "key.converter.schema.registry.url": "http://my_schema_registry:8081",
              "value.converter": "io.confluent.connect.avro.AvroConverter",
              "value.converter.schema.registry.url": "http://my_schema_registry:8081"
          }
      }
      

      A sample of logging data:

      [2017-08-03 11:34:27,038] DEBUG Mapped primary key for table 'public.mytable1' to schema: {"name" : "mydb.public.mytable1.Key", "type" : "STRUCT", "optional" : "false", "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false"}}]} (io.debezium.relational.TableSchemaBuilder)
      [2017-08-03 11:34:27,038] DEBUG Mapped columns for table 'public.mytable1' to schema: {"name" : "mydb.public.mytable1.Value", "type" : "STRUCT", "optional" : "true", "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "name", "index" : "1", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "key", "index" : "2", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "price", "index" : "3", "schema" : {"name" : "org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "true", "version" : "1"}}, {"name" : "select_p", "index" : "4", "schema" : {"type" : "INT32", "optional" : "true"}}, {"name" : "myfield1", "index" : "5", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "created_at", "index" : "6", "schema" : {"name" : "io.debezium.time.NanoTimestamp", "type" : "INT64", "optional" : "true", "version" : "1"}}, {"name" : "updated_at", "index" : "7", "schema" : {"name" : "io.debezium.time.NanoTimestamp", "type" : "INT64", "optional" : "true", "version" : "1"}}]} (io.debezium.relational.TableSchemaBuilder)
      [2017-08-03 11:34:27,049] INFO Step 2: locking each of the database tables, waiting a maximum of '10.0' seconds for each lock (io.debezium.connector.postgresql.RecordsSnapshotProducer)
      [2017-08-03 11:34:27,080] DEBUG checking for more records... (io.debezium.connector.postgresql.PostgresConnectorTask)
      [2017-08-03 11:34:27,080] DEBUG no records available yet, sleeping a bit... (io.debezium.connector.postgresql.PostgresConnectorTask)
      [2017-08-03 11:34:27,126] DEBUG refreshing DB schema for table 'public.mytable1' (io.debezium.connector.postgresql.PostgresSchema)
      [2017-08-03 11:34:27,126] DEBUG Mapping table 'public.mytable1' to schemas under 'mydb.public.mytable1' (io.debezium.relational.TableSchemaBuilder)
      [2017-08-03 11:34:27,126] DEBUG - field 'id' (INT32) from column id serial(10,0) NOT NULL AUTO_INCREMENTED (io.debezium.relational.TableSchemaBuilder)
      [2017-08-03 11:34:27,126] DEBUG - field 'id' (INT32) from column id serial(10,0) NOT NULL AUTO_INCREMENTED (io.debezium.relational.TableSchemaBuilder)
      [2017-08-03 11:34:27,126] DEBUG - field 'name' (STRING) from column name varchar(255,0) NOT NULL (io.debezium.relational.TableSchemaBuilder)
      [2017-08-03 11:34:27,126] DEBUG - field 'key' (STRING) from column key varchar(255,0) NOT NULL (io.debezium.relational.TableSchemaBuilder)
      [2017-08-03 11:34:27,126] DEBUG - field 'price' (BYTES) from column price numeric(131089,0) (io.debezium.relational.TableSchemaBuilder)
      [2017-08-03 11:34:27,126] DEBUG - field 'select_p' (INT32) from column select_p int4(10,0) (io.debezium.relational.TableSchemaBuilder)
      [2017-08-03 11:34:27,126] DEBUG - field 'myfield1' (INT32) from column myfield1 int4(10,0) NOT NULL (io.debezium.relational.TableSchemaBuilder)
      [2017-08-03 11:34:27,127] DEBUG - field 'created_at' (INT64) from column created_at timestamp(29,6) (io.debezium.relational.TableSchemaBuilder)
      [2017-08-03 11:34:27,127] DEBUG - field 'updated_at' (INT64) from column updated_at timestamp(29,6) (io.debezium.relational.TableSchemaBuilder)
      [2017-08-03 11:34:27,127] DEBUG Mapped primary key for table 'public.mytable1' to schema: {"name" : "mydb.public.mytable1.Key", "type" : "STRUCT", "optional" : "false", "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false"}}]} (io.debezium.relational.TableSchemaBuilder)
      [2017-08-03 11:34:27,127] DEBUG Mapped columns for table 'public.mytable1' to schema: {"name" : "mydb.public.mytable1.Value", "type" : "STRUCT", "optional" : "true", "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "name", "index" : "1", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "key", "index" : "2", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "price", "index" : "3", "schema" : {"name" : "org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "true", "version" : "1"}}, {"name" : "select_p", "index" : "4", "schema" : {"type" : "INT32", "optional" : "true"}}, {"name" : "myfield1", "index" : "5", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "created_at", "index" : "6", "schema" : {"name" : "io.debezium.time.NanoTimestamp", "type" : "INT64", "optional" : "true", "version" : "1"}}, {"name" : "updated_at", "index" : "7", "schema" : {"name" : "io.debezium.time.NanoTimestamp", "type" : "INT64", "optional" : "true", "version" : "1"}}]} (io.debezium.relational.TableSchemaBuilder)
      [2017-08-03 11:34:27,132] INFO   read xlogStart at '28/9807C450' from transaction '727746' (io.debezium.connector.postgresql.RecordsSnapshotProducer)
      [2017-08-03 11:34:27,132] INFO Step 3: reading and exporting the contents of each table (io.debezium.connector.postgresql.RecordsSnapshotProducer)
      [2017-08-03 11:34:27,136] INFO   exporting data from table 'public.mytable1' (io.debezium.connector.postgresql.RecordsSnapshotProducer)
      [2017-08-03 11:34:27,171] DEBUG sending read event 'SourceRecord{sourcePartition={server=mydb}, sourceOffset={last_snapshot_record=false, lsn=174349337680, txId=727746, ts_usec=1501760067167000, snapshot=true}} ConnectRecord{topic='mydb.public.mytable1', kafkaPartition=null, key=Struct{id=41}, value=Struct{after=Struct{id=41,name=testdata1,key=2134-e78c6c52-9fbe-4bdb-a909-2b37c0284ea7,price=199.0,select_p=1,myfield1=65,created_at=1478167039574000000,updated_at=1478167039574000000},source=Struct{name=mydb,ts_usec=1501760067167000,txId=727746,lsn=174349337680,snapshot=true,last_snapshot_record=false},op=r,ts_ms=1501760067170}, timestamp=null}' (io.debezium.connector.postgresql.RecordsSnapshotProducer)
      

      Actual data of the numeric field:

      # select distinct(price) from mytable;
        price
      ---------
         223.0
         556.0
         500.0
        1000.0
         230.0
         110.0
         130.0
         340.0
         101.0
           1.0
       12345.0
         200.0
           8.0
       10000.0
         600.0
         150.0
          50.0
         199.0
         400.0
         100.0
          40.0
         123.0
         256.0
        1112.0
        1234.0
           2.0
         218.0
         378.0
          10.0
         300.0
      (30 rows)
      

      I thought the issue described at https://issues.jboss.org/browse/DBZ-287 might be related, but the error still happen in 0.5.2-SNAPSHOT which solved DBZ-287.

      I originally reported this issue to https://groups.google.com/forum/#!topic/debezium/7NzZtKBS-PA .

      Attachments

        Activity

          People

            jpechane Jiri Pechanec
            t_hozumi Takahiro Hozumi (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: