Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8003

Postgres Connector throw exception from keepAliveExecutor

XMLWordPrintable

    • False
    • None
    • False

      Bug report

      What Debezium connector do you use and what version?

      1.9.8

      What is the connector configuration?

      no matter

      What is the captured database version and mode of depoyment?

      no matter

      What behaviour do you expect?

      I expect postgres connctor throw exception or log exception when something wrong occurs in keepAliveExecutor.

      What behaviour do you see?

      No thing found. So I see what keepAliveExecutor does now:

       

      //io.debezium.connector.postgresql.PostgresStreamingChangeEventSource#execute
      // for large dbs, the refresh of schema can take too much time
      // such that the connection times out. We must enable keep
      // alive to ensure that it doesn't time out
      ReplicationStream stream = this.replicationStream.get();
      stream.startKeepAlive(Threads.newSingleThreadExecutor(PostgresConnector.class, connectorConfig.getLogicalName(), KEEP_ALIVE_THREAD_NAME));
      
      //io.debezium.connector.postgresql.connection.ReplicationStream#startKeepAlive
      public void startKeepAlive(ExecutorService service) {
          if (keepAliveExecutor == null) {
              keepAliveExecutor = service;
              keepAliveRunning = new AtomicBoolean(true);
              keepAliveExecutor.submit(() -> {
                  while (keepAliveRunning.get()) {
                      try {
                          LOGGER.trace("Forcing status update with replication stream");
                          stream.forceUpdateStatus();
                          metronome.pause();
                      }
                      catch (Exception exp) {
                          throw new RuntimeException("received unexpected exception will perform keep alive", exp);
                      }
                  }
              });
          }
      
      }

       

      It seems keepAliveExecutor throw RuntimeException, however when you submit in executor, the thrown error will be ignore.

       

       

       

      Do you see the same behaviour using the latest relesead Debezium version?

      Yes, I see the code, nothing have been changed.

      How to reproduce the issue using our tutorial deployment?

      I have a simple test:

      1. when submit with exception, nothing will be printed.

      public class TrySource {
          public static void main(String[] args) throws Exception {
              ExecutorService keepAliveExecutor = Executors.newSingleThreadExecutor();
      
              keepAliveExecutor.submit(() -> {
                  throw new RuntimeException("received unexpected exception will perform keep alive");
                  });
      
              int i = 0;
              for (;;){
      
              }
          }
      } 

      2. when execute with exception, the error will be printed.

      public class TrySource {
          public static void main(String[] args) throws Exception {
              ExecutorService keepAliveExecutor = Executors.newSingleThreadExecutor();
      
              keepAliveExecutor.execute(() -> {
                  throw new RuntimeException("received unexpected exception will perform keep alive");
                  });
      
              int i = 0;
              for (;;){
      
              }
          }
      } 

      Error will be like it(though main thread won't catch the exception

      Exception in thread "pool-1-thread-1" java.lang.RuntimeException: received unexpected exception will perform keep alive     at TrySource.lambda$main$0(TrySource.java:9)     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)     at java.lang.Thread.run(Thread.java:750)
      
      
      

       

       

      Feature request or enhancement

      I think we should passed the exception to PostgresStreamingChangeEventSource, then restart or just stop job.

       

       

              Unassigned Unassigned
              loserwang1024 Hongshun Wang (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated: