Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2461

PostgresStreamingChangeEventSource's replicationStream flushLsn after closed

    XMLWordPrintable

Details

    • Bug
    • Resolution: Done
    • Critical
    • 1.3.0.Beta2
    • 1.2.1.Final, 1.2.2.Final
    • postgresql-connector
    • None
    • Hide

      Below is my code to reproduce this issue.

      Between the first 'engine.close();' and the last 'engine.close();', insert some records( I'm using jmeter, insert about 300 records) to the database and the engine will stop with the error on console.

       

      ~public void run() throws IOException, InterruptedException {~
       ~final Properties props = new Properties();~
       ~props.setProperty("name", "mizzle");~
       ~props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");~
       ~props.setProperty("offset.storage.file.filename", "/tmp/offsets1.dat");~
       ~/* begin connector properties */~
       ~props.setProperty("database.hostname", "");~
       ~props.setProperty("database.port", "");~
       ~props.setProperty("database.user", "");~
       ~props.setProperty("database.password", "");~
       ~props.setProperty("database.dbname", "");~
      
      ~props.setProperty("offset.flush.interval.ms", "3000");~
       ~props.setProperty("slot.name", "debezium_test_c");~
       ~props.setProperty("publication.name", "dbz_publication_test_c");~
      
      ~props.setProperty("plugin.name", "pgoutput");~
       ~props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");~
       ~props.setProperty("database.server.name", "my-app-connector");~
      
      ~// Create the engine with this configuration ...~
      
      ~System.out.println(props.toString());~
       ~engine = DebeziumEngine.create(Json.class).using(props)~
       ~.using(new DebeziumEngine.ConnectorCallback() {~
       ~@Override~
       ~public void connectorStarted() {~
       ~System.out.println("started");~
       ~}~
      
      ~@Override~
       ~public void connectorStopped() {~
       ~try {~
       ~System.out.println("stoped");~
       ~engine.close();~
       ~} catch (IOException e) {~
       ~e.printStackTrace();~
       ~}~
       ~}~
       ~}).using((success, message, error) -> {~
       ~System.out.println(message);~
       ~System.out.println(error);~
       ~})~
       ~.notifying((records, committer) -> {~
       ~try{~
      
      ~for(ChangeEvent<String, String> record : records){~
       ~System.out.println(record);~
       ~committer.markProcessed(record);~
       ~}~
       ~committer.markBatchFinished();~
       ~}catch (Exception e){~
       ~}~
       ~})~
       ~.build();~
      
      ~// Run the engine asynchronously ...~
       ~ExecutorService executor = Executors.newSingleThreadExecutor();~
       ~executor.execute(engine);~
      
      ~Thread.sleep(5000);~
       ~engine.close();~
      
      ~Thread.sleep(10000);~
       ~executor.execute(engine);~
      
      ~Thread.sleep(5000);~
       ~engine.close();~
      
      Show
      Below is my code to reproduce this issue. Between the first 'engine.close();' and the last 'engine.close();', insert some records( I'm using jmeter, insert about 300 records) to the database and the engine will stop with the error on console.   ~public void run() throws IOException, InterruptedException {~ ~final Properties props = new Properties();~ ~props.setProperty("name", "mizzle");~ ~props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");~ ~props.setProperty("offset.storage.file.filename", "/tmp/offsets1.dat");~ ~/* begin connector properties */~ ~props.setProperty("database.hostname", "");~ ~props.setProperty("database.port", "");~ ~props.setProperty("database.user", "");~ ~props.setProperty("database.password", "");~ ~props.setProperty("database.dbname", "");~ ~props.setProperty("offset.flush.interval.ms", "3000");~ ~props.setProperty("slot.name", "debezium_test_c");~ ~props.setProperty("publication.name", "dbz_publication_test_c");~ ~props.setProperty("plugin.name", "pgoutput");~ ~props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");~ ~props.setProperty("database.server.name", "my-app-connector");~ ~// Create the engine with this configuration ...~ ~System.out.println(props.toString());~ ~engine = DebeziumEngine.create(Json.class).using(props)~ ~.using(new DebeziumEngine.ConnectorCallback() {~ ~@Override~ ~public void connectorStarted() {~ ~System.out.println("started");~ ~}~ ~@Override~ ~public void connectorStopped() {~ ~try {~ ~System.out.println("stoped");~ ~engine.close();~ ~} catch (IOException e) {~ ~e.printStackTrace();~ ~}~ ~}~ ~}).using((success, message, error) -> {~ ~System.out.println(message);~ ~System.out.println(error);~ ~})~ ~.notifying((records, committer) -> {~ ~try{~ ~for(ChangeEvent<String, String> record : records){~ ~System.out.println(record);~ ~committer.markProcessed(record);~ ~}~ ~committer.markBatchFinished();~ ~}catch (Exception e){~ ~}~ ~})~ ~.build();~ ~// Run the engine asynchronously ...~ ~ExecutorService executor = Executors.newSingleThreadExecutor();~ ~executor.execute(engine);~ ~Thread.sleep(5000);~ ~engine.close();~ ~Thread.sleep(10000);~ ~executor.execute(engine);~ ~Thread.sleep(5000);~ ~engine.close();~

    Description

      See the class: io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.

      In the finnaly block of function execute(ChangeEventSourceContext context), the replicationStream is closed.

      In the function commitOffset(Map<String, ?> offset), {{replicationStream.flushLsn(lsn) is called.

      As the execute method is running in an additional thread, in my case replicationStream is always closed before the last time of replicationStream.flushLsn(lsn) is called when closing the debeziumEngine. And an error is reported on the console:

      Error while trying to stop the task and commit the offsets
      { {org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Database connection failed when writing to copy}}

      Attachments

        Activity

          People

            jpechane Jiri Pechanec
            qiumeng1989 Meng Qiu (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: