Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-4272

Signal based incremental snapshot is failing when launched right after a schema change

XMLWordPrintable

    • False
    • False
    • Hide

      1. Configure the connector:

      signal.data.collection: public.debezium_signal 
      table.include.list: public.test_table,public.debezium_signal 
      plugin.name: pgoutput
      

      2. Create tables:

      create table test_table (column_1 serial constraint test_table_pk primary key, column_2 text); 
      
      create table debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
      

      3. Insert a few records into the test_table and make sure they are processed by the connector 

      insert into test_table values (default, default);
      insert into test_table values (default, default);  
      

      4. Alter the table schema and trigger an incremental snapshot:

      alter table test_table add column column_3 int default 0;
      insert into debezium_signal values('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["public.test_table"]}');
      

      5. You should now see the error in debezium logs

       

       

      Show
      1. Configure the connector: signal.data.collection: public .debezium_signal table.include.list: public .test_table, public .debezium_signal plugin.name: pgoutput 2. Create tables: create table test_table (column_1 serial constraint test_table_pk primary key, column_2 text); create table debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL); 3. Insert a few records into the test_table and make sure they are processed by the connector  insert into test_table values ( default , default ); insert into test_table values ( default , default );   4. Alter the table schema and trigger an incremental snapshot: alter table test_table add column column_3 int default 0; insert into debezium_signal values( 'ad-hoc-1' , 'execute-snapshot' , '{ "data-collections" : [ " public .test_table" ]}' ); 5. You should now see the error in debezium logs    

      Hi,

      I'm using the recently added Postgres ad-hoc incremental snapshots.
      When table schema changes, an incremental snapshot is triggered by inserting a row into the signal table. This is done right after the schema change, so that no new change events with the new schema have been received yet.

      Expectation:
      Incremental snapshot recognises the updated schema and completes without errors. 

      Error:
      Incremental snapshot does not recognise the updated schema and fails, both for adding and removing a column. In 2/10 cases the connector can complete the snapshot after a restart, but most commonly it can't: the error re-occurs after a restart and the connector can't recover. When a column is added, the error message is as follows: 

      org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.                                                              
           at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)                                                                                                                
           at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:170)                                                                    
           at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:40)                                                                     
           at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:166)                                                                                       
           at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:127)                                                                                     
           at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)                                                                                                           
           at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)                                                                                                                          
           at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)                                                                                                   
           at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)                                                                                                   
           at java.base/java.lang.Thread.run(Thread.java:829)                                                                                                                                             
       Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, lsn_proc=33786640, lsn_commit=33786544, lsn=33786640, incremental_snapsh 
           at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:252)                                                                                                      
           at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.lambda$processMessages$0(PostgresStreamingChangeEventSource.java:246)                                                   
           at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.decodeInsert(PgOutputMessageDecoder.java:395)                                                                   
           at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:179)                                                         
           at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33)                                                                           
           at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:493)                                                     
           at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:485)                                                             
           at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:205)                                                            
           at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:167)                                                                    
           ... 8 more                                                                                                                                                                                     
       Caused by: java.lang.NullPointerException                                                                                                                                                          
           at io.debezium.util.ColumnUtils.toArray(ColumnUtils.java:41)                                                                                                                                   
           at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.lambda$readChunk$0(AbstractIncrementalSnapshotChangeEventSource.java:273)                     
           at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:645)                                                                                                                        
           at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:513)                                                                                                                        
           at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readChunk(AbstractIncrementalSnapshotChangeEventSource.java:268)                              
           at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.addDataCollectionNamesToSnapshot(AbstractIncrementalSnapshotChangeEventSource.java:327)       
           at io.debezium.pipeline.signal.ExecuteSnapshot.arrived(ExecuteSnapshot.java:56)                                                                                                                
           at io.debezium.pipeline.signal.Signal.process(Signal.java:140)                                                                                                                                 
           at io.debezium.pipeline.signal.Signal.process(Signal.java:184)                                                                                                                                 
           at io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:226)                                                                                                               
           at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:78)                                                                                
           at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:46)                                                                               
           at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.emitChangeRecords(PostgresChangeRecordEmitter.java:93)                                                                         
           at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:215)                                                                                                      
           ... 16 more  

       

              ccranfor@redhat.com Chris Cranford
              armadillo123 A E (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: