-
Bug
-
Resolution: Done
-
Major
-
None
-
None
-
False
-
None
-
False
Bug report
First startup ever may fail because the ProcessorGroup starts its processors in parallel so processor.initialize() and processor.start() are executed in random order. The QueueProcessor is creating some filesystem folders in its initialize() method but CommitLogPostProcessor processor is using those in its start() method and mai fail if they do not exist.
What behaviour do you expect?
first startup should never fail
What behaviour do you see?
first startup sometimes fails
Implementation ideas (optional)
decouple processor.initialize() from processor.start()
one solution is to change the start() method of the ProcessorGroup from:
void start() { executorService = Executors.newFixedThreadPool(processors.size()); for (AbstractProcessor processor : processors) { Runnable runnable = () -> { try { processor.initialize(); processor.start(); } catch (Exception e) { LOGGER.error("Encountered exception while running {}; stopping all processors.", processor.getName(), e); try { stopProcessors(); } catch (Exception e2) { LOGGER.error("Encountered exceptions while stopping all processors", e2); } } }; executorService.submit(runnable); } }
to:
void start() { for (AbstractProcessor processor : processors) { try { processor.initialize(); } catch (Exception e) { throw new CassandraConnectorTaskException("Failed to initialize processors", e); } } executorService = Executors.newFixedThreadPool(processors.size()); for (AbstractProcessor processor : processors) { Runnable runnable = () -> { try { processor.start(); } catch (Exception e) { LOGGER.error("Encountered exception while running {}; stopping all processors.", processor.getName(), e); try { stopProcessors(); } catch (Exception e2) { LOGGER.error("Encountered exceptions while stopping all processors", e2); } } }; executorService.submit(runnable); } }