Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2461

PostgresStreamingChangeEventSource's replicationStream flushLsn after closed

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Critical Critical
    • 1.3.0.Beta2
    • 1.2.1.Final, 1.2.2.Final
    • postgresql-connector
    • None
    • Hide

      Below is my code to reproduce this issue.

      Between the first 'engine.close();' and the last 'engine.close();', insert some records( I'm using jmeter, insert about 300 records) to the database and the engine will stop with the error on console.

       

      ~public void run() throws IOException, InterruptedException {~
       ~final Properties props = new Properties();~
       ~props.setProperty("name", "mizzle");~
       ~props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");~
       ~props.setProperty("offset.storage.file.filename", "/tmp/offsets1.dat");~
       ~/* begin connector properties */~
       ~props.setProperty("database.hostname", "");~
       ~props.setProperty("database.port", "");~
       ~props.setProperty("database.user", "");~
       ~props.setProperty("database.password", "");~
       ~props.setProperty("database.dbname", "");~
      
      ~props.setProperty("offset.flush.interval.ms", "3000");~
       ~props.setProperty("slot.name", "debezium_test_c");~
       ~props.setProperty("publication.name", "dbz_publication_test_c");~
      
      ~props.setProperty("plugin.name", "pgoutput");~
       ~props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");~
       ~props.setProperty("database.server.name", "my-app-connector");~
      
      ~// Create the engine with this configuration ...~
      
      ~System.out.println(props.toString());~
       ~engine = DebeziumEngine.create(Json.class).using(props)~
       ~.using(new DebeziumEngine.ConnectorCallback() {~
       ~@Override~
       ~public void connectorStarted() {~
       ~System.out.println("started");~
       ~}~
      
      ~@Override~
       ~public void connectorStopped() {~
       ~try {~
       ~System.out.println("stoped");~
       ~engine.close();~
       ~} catch (IOException e) {~
       ~e.printStackTrace();~
       ~}~
       ~}~
       ~}).using((success, message, error) -> {~
       ~System.out.println(message);~
       ~System.out.println(error);~
       ~})~
       ~.notifying((records, committer) -> {~
       ~try{~
      
      ~for(ChangeEvent<String, String> record : records){~
       ~System.out.println(record);~
       ~committer.markProcessed(record);~
       ~}~
       ~committer.markBatchFinished();~
       ~}catch (Exception e){~
       ~}~
       ~})~
       ~.build();~
      
      ~// Run the engine asynchronously ...~
       ~ExecutorService executor = Executors.newSingleThreadExecutor();~
       ~executor.execute(engine);~
      
      ~Thread.sleep(5000);~
       ~engine.close();~
      
      ~Thread.sleep(10000);~
       ~executor.execute(engine);~
      
      ~Thread.sleep(5000);~
       ~engine.close();~
      
      Show
      Below is my code to reproduce this issue. Between the first 'engine.close();' and the last 'engine.close();', insert some records( I'm using jmeter, insert about 300 records) to the database and the engine will stop with the error on console.   ~public void run() throws IOException, InterruptedException {~ ~final Properties props = new Properties();~ ~props.setProperty("name", "mizzle");~ ~props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");~ ~props.setProperty("offset.storage.file.filename", "/tmp/offsets1.dat");~ ~/* begin connector properties */~ ~props.setProperty("database.hostname", "");~ ~props.setProperty("database.port", "");~ ~props.setProperty("database.user", "");~ ~props.setProperty("database.password", "");~ ~props.setProperty("database.dbname", "");~ ~props.setProperty("offset.flush.interval.ms", "3000");~ ~props.setProperty("slot.name", "debezium_test_c");~ ~props.setProperty("publication.name", "dbz_publication_test_c");~ ~props.setProperty("plugin.name", "pgoutput");~ ~props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");~ ~props.setProperty("database.server.name", "my-app-connector");~ ~// Create the engine with this configuration ...~ ~System.out.println(props.toString());~ ~engine = DebeziumEngine.create(Json.class).using(props)~ ~.using(new DebeziumEngine.ConnectorCallback() {~ ~@Override~ ~public void connectorStarted() {~ ~System.out.println("started");~ ~}~ ~@Override~ ~public void connectorStopped() {~ ~try {~ ~System.out.println("stoped");~ ~engine.close();~ ~} catch (IOException e) {~ ~e.printStackTrace();~ ~}~ ~}~ ~}).using((success, message, error) -> {~ ~System.out.println(message);~ ~System.out.println(error);~ ~})~ ~.notifying((records, committer) -> {~ ~try{~ ~for(ChangeEvent<String, String> record : records){~ ~System.out.println(record);~ ~committer.markProcessed(record);~ ~}~ ~committer.markBatchFinished();~ ~}catch (Exception e){~ ~}~ ~})~ ~.build();~ ~// Run the engine asynchronously ...~ ~ExecutorService executor = Executors.newSingleThreadExecutor();~ ~executor.execute(engine);~ ~Thread.sleep(5000);~ ~engine.close();~ ~Thread.sleep(10000);~ ~executor.execute(engine);~ ~Thread.sleep(5000);~ ~engine.close();~

      See the class: io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.

      In the finnaly block of function execute(ChangeEventSourceContext context), the replicationStream is closed.

      In the function commitOffset(Map<String, ?> offset), {{replicationStream.flushLsn(lsn) is called.

      As the execute method is running in an additional thread, in my case replicationStream is always closed before the last time of replicationStream.flushLsn(lsn) is called when closing the debeziumEngine. And an error is reported on the console:

      Error while trying to stop the task and commit the offsets
      { {org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Database connection failed when writing to copy}}

              jpechane Jiri Pechanec
              qiumeng1989 Meng Qiu (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: