Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5719

cassandra connector first startup ever may fail

XMLWordPrintable

    • False
    • None
    • False

      Bug report

      First startup ever may fail because the ProcessorGroup starts its processors in parallel so processor.initialize() and processor.start() are executed in random order. The QueueProcessor is creating some filesystem folders in its initialize() method but CommitLogPostProcessor processor is using those in its start() method and mai fail if they do not exist.

      What behaviour do you expect?

      first startup should never fail

      What behaviour do you see?

      first startup sometimes fails

      Implementation ideas (optional)

      decouple processor.initialize() from processor.start()
      one solution is to change the start() method of the ProcessorGroup from:

      void start() {
          executorService = Executors.newFixedThreadPool(processors.size());
          for (AbstractProcessor processor : processors) {
              Runnable runnable = () -> {
                  try {
                      processor.initialize();
                      processor.start();
                  }
                  catch (Exception e) {
                      LOGGER.error("Encountered exception while running {}; stopping all processors.", processor.getName(), e);
                      try {
                          stopProcessors();
                      }
                      catch (Exception e2) {
                          LOGGER.error("Encountered exceptions while stopping all processors", e2);
                      }
                  }
              };
              executorService.submit(runnable);
          }
      } 

       

      to:

       

      void start() {
          for (AbstractProcessor processor : processors) {
              try {
                  processor.initialize();
              }
              catch (Exception e) {
                  throw new CassandraConnectorTaskException("Failed to initialize processors", e);
              }
          }
          executorService = Executors.newFixedThreadPool(processors.size());
          for (AbstractProcessor processor : processors) {
              Runnable runnable = () -> {
                  try {
                      processor.start();
                  }
                  catch (Exception e) {
                      LOGGER.error("Encountered exception while running {}; stopping all processors.", processor.getName(), e);
                      try {
                          stopProcessors();
                      }
                      catch (Exception e2) {
                          LOGGER.error("Encountered exceptions while stopping all processors", e2);
                       }
                  }
              };
              executorService.submit(runnable);
          }
      }

       

       

              Unassigned Unassigned
              ggaborg Gabor Andras (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: