Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6839

PostgresConnectorIT#shouldAddNewFieldToSourceInfo fails randomly

XMLWordPrintable

      PostgresConnectorIT#shouldAddNewFieldToSourceInfo fail randomly with

      2023-08-24 14:21:17,142 INFO   Postgres|test_server|0|streaming|postgres  REPLICA IDENTITY for 's1.a' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns   [io.debezium.connector.postgresql.PostgresSchema]
      2023-08-24 14:21:17,142 INFO   Postgres|test_server|0|streaming|postgres  REPLICA IDENTITY for 's2.a' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns   [io.debezium.connector.postgresql.PostgresSchema]
      2023-08-24 14:21:17,144 INFO   Postgres|test_server|0|streaming|postgres  Searching for WAL resume position   [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]
      Consumed record 1 / 2 (1 more)
      {"sourcePartition" : {"server" : "test_server"}, "sourceOffset" : {"last_snapshot_record" : "false", "lsn" : "149063088", "txId" : "6848", "ts_usec" : "1692879677013467", "snapshot" : "true"}, "topic" : "test_server.s1.a", "kafkaPartition" : null, "key" : {"pk" : "1"}, "value" : {"before" : null, "after" : {"pk" : "1", "aa" : "1"}, "source" : {"version" : "2.4.0-SNAPSHOT", "connector" : "postgresql", "name" : "test_server", "ts_ms" : "1692879677013", "snapshot" : "first", "db" : "postgres", "sequence" : "[null,"149063088"]", "schema" : "s1", "table" : "a", "txId" : "6848", "lsn" : "149063088", "xmin" : null, "newField" : "newFieldValue"}, "op" : "r", "ts_ms" : "1692879677076", "transaction" : null}}
      Problem with message on topic 'test_server.s1.a':
      org.junit.ComparisonFailure: [field name: null] expected:<..., "default" : null}}[, {"name" : "newField", "index" : "12", "schema" : {"type" : "STRING", "optional" : "false", "default" : null}}]]}}, {"name" : "op",...> but was:<..., "default" : null}}[]]}}, {"name" : "op",...>
              at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
              at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
              at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
              at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
              at io.debezium.data.VerifyRecord.assertConnectSchemasAreEqual(VerifyRecord.java:532)
              at io.debezium.data.VerifyRecord.assertEquals(VerifyRecord.java:961)
              at io.debezium.data.VerifyRecord.isValid(VerifyRecord.java:839)
              at io.debezium.embedded.AbstractConnectorTest.consumeRecordsUntil(AbstractConnectorTest.java:562)
              at io.debezium.embedded.AbstractConnectorTest.consumeRecords(AbstractConnectorTest.java:517)
              at io.debezium.embedded.AbstractConnectorTest.consumeRecords(AbstractConnectorTest.java:588)
              at io.debezium.embedded.AbstractConnectorTest.consumeRecordsByTopic(AbstractConnectorTest.java:627)
              at io.debezium.connector.postgresql.PostgresConnectorIT.shouldAddNewFieldToSourceInfo(PostgresConnectorIT.java:3489)
              at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
              at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
              at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
              at java.base/java.lang.reflect.Method.invoke(Method.java:568)
              at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
              at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
              at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
              at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
              at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
              at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
              at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
              at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
              at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
              at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
              at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
              at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
              at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
              at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
              at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
              at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
              at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
              at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
              at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
              at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
              at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:377)
              at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:284)
              at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:248)
              at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:167)
              at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:456)
              at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:169)
              at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:595)
              at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:581)
      error comparing value schema to that serialized/deserialized with Avro converter
        key: {"pk" : "1"}
        key deserialized from JSON: {
        "schema" : {
          "type" : "struct",
          "fields" : [ {
            "type" : "int32",
            "optional" : false,
            "default" : 0,
            "field" : "pk"
          } ],
          "optional" : false,
          "name" : "test_server.s1.a.Key"
        },
        "payload" : {
          "pk" : 1
        }
      }
        key to/from JSON: {"pk" : "1"}
        key to/from Avro: {"pk" : "1"}
        value: {"before" : null, "after" : {"pk" : "1", "aa" : "1"}, "source" : {"version" : "2.4.0-SNAPSHOT", "connector" : "postgresql", "name" : "test_server", "ts_ms" : "1692879677013", "snapshot" : "first", "db" : "postgres", "sequence" : "[null,"149063088"]", "schema" : "s1", "table" : "a", "txId" : "6848", "lsn" : "149063088", "xmin" : null, "newField" : "newFieldValue"}, "op" : "r", "ts_ms" : "1692879677076", "transaction" : null}
        value deserialized from JSON: {
        "schema" : {
          "type" : "struct",
          "fields" : [ {
            "type" : "struct",
            "fields" : [ {
              "type" : "int32",
              "optional" : false,
              "default" : 0,
              "field" : "pk"
            }, {
              "type" : "int32",
              "optional" : true,
              "field" : "aa"
            } ],
            "optional" : true,
            "name" : "test_server.s1.a.Value",
            "field" : "before"
          }, {
            "type" : "struct",
            "fields" : [ {
              "type" : "int32",
              "optional" : false,
              "default" : 0,
              "field" : "pk"
            }, {
              "type" : "int32",
              "optional" : true,
              "field" : "aa"
            } ],
            "optional" : true,
            "name" : "test_server.s1.a.Value",
            "field" : "after"
          }, {
            "type" : "struct",
            "fields" : [ {
              "type" : "string",
              "optional" : false,
              "field" : "version"
            }, {
              "type" : "string",
              "optional" : false,
              "field" : "connector"
            }, {
              "type" : "string",
              "optional" : false,
              "field" : "name"
            }, {
              "type" : "int64",
              "optional" : false,
              "field" : "ts_ms"
            }, {
              "type" : "string",
              "optional" : true,
              "name" : "io.debezium.data.Enum",
              "version" : 1,
              "parameters" : {
                "allowed" : "true,last,false,incremental"
              },
              "default" : "false",
              "field" : "snapshot"
            }, {
              "type" : "string",
              "optional" : false,
              "field" : "db"
            }, {
              "type" : "string",
              "optional" : true,
              "field" : "sequence"
            }, {
              "type" : "string",
              "optional" : false,
              "field" : "schema"
            }, {
              "type" : "string",
              "optional" : false,
              "field" : "table"
            }, {
              "type" : "int64",
              "optional" : true,
              "field" : "txId"
            }, {
              "type" : "int64",
              "optional" : true,
              "field" : "lsn"
            }, {
              "type" : "int64",
              "optional" : true,
              "field" : "xmin"
            }, {
              "type" : "string",
              "optional" : false,
              "field" : "newField"
            } ],
            "optional" : false,
            "name" : "io.debezium.connector.postgresql.Source",
            "field" : "source"
          }, {
            "type" : "string",
            "optional" : false,
            "field" : "op"
          }, {
            "type" : "int64",
            "optional" : true,
            "field" : "ts_ms"
          }, {
            "type" : "struct",
            "fields" : [ {
              "type" : "string",
              "optional" : false,
              "field" : "id"
            }, {
              "type" : "int64",
              "optional" : false,
              "field" : "total_order"
            }, {
              "type" : "int64",
              "optional" : false,
              "field" : "data_collection_order"
            } ],
            "optional" : true,
            "name" : "event.block",
            "version" : 1,
            "field" : "transaction"
          } ],
          "optional" : false,
          "name" : "test_server.s1.a.Envelope",
          "version" : 1
        },
        "payload" : {
          "before" : null,
          "after" : {
            "pk" : 1,
            "aa" : 1
          },
          "source" : {
            "version" : "2.4.0-SNAPSHOT",
            "connector" : "postgresql",
            "name" : "test_server",
            "ts_ms" : 1692879677013,
            "snapshot" : "first",
            "db" : "postgres",
            "sequence" : "[null,\"149063088\"]",
            "schema" : "s1",
            "table" : "a",
            "txId" : 6848,
            "lsn" : 149063088,
            "xmin" : null,
            "newField" : "newFieldValue"
          },
          "op" : "r",
          "ts_ms" : 1692879677076,
          "transaction" : null
        }
      }
        value to/from JSON: {"before" : null, "after" : {"pk" : "1", "aa" : "1"}, "source" : {"version" : "2.4.0-SNAPSHOT", "connector" : "postgresql", "name" : "test_server", "ts_ms" : "1692879677013", "snapshot" : "first", "db" : "postgres", "sequence" : "[null,"149063088"]", "schema" : "s1", "table" : "a", "txId" : "6848", "lsn" : "149063088", "xmin" : null, "newField" : "newFieldValue"}, "op" : "r", "ts_ms" : "1692879677076", "transaction" : null}
        value to/from Avro: {"before" : null, "after" : {"pk" : "1", "aa" : "1"}, "source" : {"version" : "2.4.0-SNAPSHOT", "connector" : "postgresql", "name" : "test_server", "ts_ms" : "1692879677013", "snapshot" : "first", "db" : "postgres", "sequence" : "[null,"149063088"]", "schema" : "s1", "table" : "a", "txId" : "6848", "lsn" : "149063088", "xmin" : null}, "op" : "r", "ts_ms" : "1692879677076", "transaction" : null}
      

      The issue is present probably only when run with Apicurio.

            vjuranek@redhat.com Vojtech Juranek
            vjuranek@redhat.com Vojtech Juranek
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: