Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-3558

Processing of new commit log files seems to be sequential

    XMLWordPrintable

Details

    • False
    • False
    • Undefined

    Description

      Via stefan.miklosovic:

      Hi,

      this question came to my mind while I was reading a googlegroups discussion around PR#25. The problem we have in production is that there is more commit log files created then they are being processed (we do not know yet why) so data are pilling up in cdc_raw dir and the processing is getting behind pretty much.

      We are playing with various configuration properties, one of them being increasing the size of event queues which happens to be 1 by default so we aim to increase this to get bigger throughput.

      However, lets say that we have increased this figure to, for example, 4. In CommitLogReadHandlerImpl, in handleRowModifications method, there is this case for INSERT (just to make an example out of that):

      switch (rowType) {
          case INSERT:
              recordMaker.insert(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
                      Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET,
                      queues.get(Math.abs(offsetPosition.fileName.hashCode() % queues.size()))::enqueue);
              break;
      

      Here we see that an individual event will be put into a queue based on the commit log file that modification is from.

      However, look at this closely, in AbstractDirectoryWatcher, I see this:

      public void poll() throws InterruptedException, IOException {
          LOGGER.info("Polling commitLog files from {} ...", directory);
          WatchKey key = watchService.poll(pollInterval.toMillis(), TimeUnit.MILLISECONDS);
      
          if (key != null) {
              LOGGER.info("Detected new commitLog files in {}.", directory);
              for (WatchEvent<?> event : key.pollEvents()) {
                  Path relativePath = (Path) event.context();
                  Path absolutePath = directory.resolve(relativePath);
      
                  if (kinds.contains(event.kind())) {
                      handleEvent(event, absolutePath);
                  }
              }
              key.reset();
          }
          else {
              LOGGER.info("No commitLogFile is detected in {}.", directory);
          }
      }
      

      So what it does, if I am not mistaken, that it is polling a dir content and in key.pollEvents() it gets all new files which have been created in the meanwhile.

      But this iteration / processing is sequential. From my point of view, it is literally going over that list and it invokes handleEvent which does this in CommitLogProcessor.

              watcher = new AbstractDirectoryWatcher(cdcDir.toPath(), context.getCassandraConnectorConfig().cdcDirPollInterval(), Collections.singleton(ENTRY_CREATE)) {
                  @Override
                  void handleEvent(WatchEvent<?> event, Path path) {
                      if (isRunning()) {
                          processCommitLog(path.toFile());
                      }
                  }
              };
      

      In other word, from that point on, I just do not see how processing of files would be parallel, which means that at one particular time, events are ever being added to the very same queue and after whole log is processed it just goes over to the other queue (based on the hash of that file name and number of queues in total) and it starts to fill that queue instead.

      If we "gave up" on events ordering in general, why is this done like that? Why can not be even the processing of newly created commit log files parallelised? I just do not see a lot of benefit to have multiple event queues when we can fill them with data from one commit log file at once at most.

      One implication doing it like it is now is that a commit log is deleted only after it is processed. So by doing it on "log-by-log" basis, it will occupy disk space until it is processed. We could start to delete them more often / frequently if their processing is parallelised so disk space necessary for this whole processing might be lowered.

      I think there should be as many commit log processors as many even queues there are. There would be a queue for appending new commit log files to process and each commit log processor would take a log from there repeatedly. There would be "a bus of commit logs" and processors would process them eventually and in parallel.

      Metrics would have to be changed as well as logic around offsets but that is just an implementation detail I would say.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              stefan.miklosovic Stefan Miklosovic (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated: