Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8296

Debezium connector having implemented signal snapshot in connector but its not working

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Not a Bug
    • Icon: Major Major
    • None
    • None
    • postgresql-connector
    • None
    • False
    • None
    • False
    • Critical

      We are using Debezium in Azure Container Apps with a Debezium connector that connects to a PostgreSQL server. We have already configured the connector to receive signals from a specific table and process all the records within it. The signal data is as follows: {{{}

      {"data-collections": ["core-apis.department"]}

      {}}}.

      Type: execute-snapshot

      Problem Statement: The schema name contains a hyphen , which is causing issues. As a result, the Debezium connector is failing with a syntax error. The connector works correctly if the schema name is either without a hyphen or uses an underscore (_).

      I have tried with all the possible option to escape hyphen but its working.
      Thanks in advance if someone can help me here.

      Error Message : 

      2024-09-25T13:51:47.165455404Z 2024-09-25 13:51:47,164 WARN   Postgres|source|streaming  Signal 'signal-58' has arrived but the data '{"data-collections": [\"\\\"core-api\\\".\\\"department__s\\\"\"]}' cannot be parsed   [io.debezium.pipeline.signal.Signal]

       

      2024-09-25T13:51:47.165523150Z com.fasterxml.jackson.core.JsonParseException: Unexpected character ('\' (code 92)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
      2024-09-25T13:51:47.165533320Z  at [Source: (String)"{"data-collections": [\"\\\"core-api\\\".\\\"department__s\\\"\"]}"; line: 1, column: 24]
      2024-09-25T13:51:47.165541335Z   at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
      2024-09-25T13:51:47.165548809Z   at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
      2024-09-25T13:51:47.165556403Z   at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:637)
      2024-09-25T13:51:47.165583373Z   at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1922)
      2024-09-25T13:51:47.165591969Z   at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:772)
      2024-09-25T13:51:47.165599112Z   at io.debezium.document.JacksonReader.parseArray(JacksonReader.java:209)
      2024-09-25T13:51:47.165605735Z   at io.debezium.document.JacksonReader.parseDocument(JacksonReader.java:131)
      2024-09-25T13:51:47.165612969Z   at io.debezium.document.JacksonReader.parse(JacksonReader.java:102)
      2024-09-25T13:51:47.165630110Z   at io.debezium.document.JacksonReader.read(JacksonReader.java:57)
      2024-09-25T13:51:47.165637224Z   at io.debezium.pipeline.signal.Signal.process(Signal.java:135)
      2024-09-25T13:51:47.165643806Z   at io.debezium.pipeline.signal.Signal.process(Signal.java:180)
      2024-09-25T13:51:47.165669544Z   at io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:227)
      2024-09-25T13:51:47.165676928Z   at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:77)
      2024-09-25T13:51:47.165685214Z   at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:45)
      2024-09-25T13:51:47.165692026Z   at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.emitChangeRecords(PostgresChangeRecordEmitter.java:91)
      2024-09-25T13:51:47.165698589Z   at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:217)
      2024-09-25T13:51:47.165705602Z   at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.lambda$processMessages$0(PostgresStreamingChangeEventSource.java:244)
      2024-09-25T13:51:47.165712024Z   at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.decodeInsert(PgOutputMessageDecoder.java:395)
      2024-09-25T13:51:47.165718566Z   at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:179)
      2024-09-25T13:51:47.165725389Z   at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33)
      2024-09-25T13:51:47.165732101Z   at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:493)
      2024-09-25T13:51:47.165762649Z   at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:485)
      2024-09-25T13:51:47.165769782Z   at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:203)
      2024-09-25T13:51:47.165776975Z   at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:165)
      2024-09-25T13:51:47.165784199Z   at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:40)
      2024-09-25T13:51:47.165791162Z   at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:160)
      2024-09-25T13:51:47.165798355Z   at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:122)
      2024-09-25T13:51:47.165805088Z   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      2024-09-25T13:51:47.165812792Z   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      2024-09-25T13:51:47.165820046Z   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      2024-09-25T13:51:47.165853328Z   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      2024-09-25T13:51:47.165863307Z   at java.base/java.lang.Thread.run(Thread.java:834)

       

      2024-09-25T13:51:47.455244531Z 2024-09-25 13:51:47,455 INFO   ||  1 records sent during previous 00:00:44.117, last recorded offset: {transaction_id=null, lsn_proc=354234151504, lsn_commit=354234143080, lsn=354234151504, txId=4715010, ts_usec=1727272306966439}   [io.debezium.connector.common.BaseSourceTask]
      2024-09-25T13:51:47.456031373Z 2024-09-25 13:51:47,455 INFO   ||  The task will send records to topic 'celitodev' for the first time. Checking whether topic exists   [org.apache.kafka.connect.runtime.WorkerSourceTask]

       

      2024-09-25T13:51:47.479318187Z 2024-09-25 13:51:47,479 INFO   ||  Topic 'celitodev' already exists.   [org.apache.kafka.connect.runtime.WorkerSourceTask]

       

      2024-09-25T13:52:03.344738386Z 2024-09-25 13:52:03,344 INFO   ||  WorkerSourceTask{id=celito-search-connector-0} Committing offsets   [org.apache.kafka.connect.runtime.WorkerSourceTask]
      2024-09-25T13:52:03.344910586Z 2024-09-25 13:52:03,344 INFO   ||  WorkerSourceTask{id=celito-search-connector-0} flushing 0 outstanding messages for offset commit   [org.apache.kafka.connect.runtime.WorkerSourceTask]

       

      2024-09-25T13:52:03.366722885Z 2024-09-25 13:52:03,366 INFO   ||  WorkerSourceTask{id=celito-search-connector-0} Finished commitOffsets successfully in 22 ms   [org.apache.kafka.connect.runtime.WorkerSourceTask]

       

      2024-09-25T13:52:35.446895190Z 2024-09-25 13:52:35,446 INFO   ||  [Worker clientId=connect-1, groupId=1] Session key updated   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]

              Unassigned Unassigned
              palsandip Pal Sandip (Inactive)
              Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: