Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-4111

projectId not being set when injecting a custom PublisherBuilder

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 1.8.0.Alpha1
    • 1.7.0.Final
    • debezium-server
    • None
    • False
    • False
    • Hide

      Provide a custom publisher to PubSubChangeConsumer like this:

      @Dependent
      @CustomConsumerBuilder
      public class PubSubPublisher implements PubSubChangeConsumer.PublisherBuilder {
          private static final Logger LOGGER = LoggerFactory.getLogger(PubSubPublisher.class);
          @Override
          public Publisher get(ProjectTopicName topicName) {
              LOGGER.info("topic name " + topicName.toString());
              Publisher publisher;
              BatchingSettings batchingSettings = BatchingSettings.newBuilder()
                      .setRequestByteThreshold(Publisher.getApiMaxRequestBytes()) // 1000
                      .setElementCountThreshold(Publisher.getApiMaxRequestElementCount()) // 10MB
                      .setDelayThreshold(Duration.ofMillis(50))
                      .build();
      
              try {
                  publisher = Publisher.newBuilder(topicName)
                          .setEnableMessageOrdering(true)
                          .setBatchingSettings(batchingSettings)
                          .build();
              } catch (IOException e) {
                  e.printStackTrace();
                  return null;
              }
              return publisher;
          };
      }
      

      Run Debezium and make sure that it's being picked up, you should see this in the logs

      Obtained custom configured PublisherBuilder 'io.quarkus.arc.impl.InstanceImpl@7db0565c'
      

      Once Debezium will try and publish events to Pub/Sub, it will crash with

      2021-10-05 11:01:39,724 INFO  [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: null', error = '{}': java.lang.NullPointerException
      	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:878)
      	at com.google.pubsub.v1.ProjectTopicName.<init>(ProjectTopicName.java:55)
      	at com.google.pubsub.v1.ProjectTopicName.<init>(ProjectTopicName.java:28)
      	at com.google.pubsub.v1.ProjectTopicName$Builder.build(ProjectTopicName.java:156)
      	at com.google.pubsub.v1.ProjectTopicName.of(ProjectTopicName.java:60)
      	at io.debezium.server.pubsub.PubSubChangeConsumer.lambda$handleBatch$2(PubSubChangeConsumer.java:120)
      	at java.base/java.util.HashMap.computeIfAbsent(Unknown Source)
      	at io.debezium.server.pubsub.PubSubChangeConsumer.handleBatch(PubSubChangeConsumer.java:120)
      	at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
      	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:821)
      	at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188)
      	at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:145)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
      	at java.base/java.lang.Thread.run(Unknown Source)
      
      Show
      Provide a custom publisher to PubSubChangeConsumer like this: @Dependent @CustomConsumerBuilder public class PubSubPublisher implements PubSubChangeConsumer.PublisherBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(PubSubPublisher.class); @Override public Publisher get(ProjectTopicName topicName) { LOGGER.info( "topic name " + topicName.toString()); Publisher publisher; BatchingSettings batchingSettings = BatchingSettings.newBuilder() .setRequestByteThreshold(Publisher.getApiMaxRequestBytes()) // 1000 .setElementCountThreshold(Publisher.getApiMaxRequestElementCount()) // 10MB .setDelayThreshold(Duration.ofMillis(50)) .build(); try { publisher = Publisher.newBuilder(topicName) .setEnableMessageOrdering( true ) .setBatchingSettings(batchingSettings) .build(); } catch (IOException e) { e.printStackTrace(); return null ; } return publisher; }; } Run Debezium and make sure that it's being picked up, you should see this in the logs Obtained custom configured PublisherBuilder 'io.quarkus.arc.impl.InstanceImpl@7db0565c' Once Debezium will try and publish events to Pub/Sub, it will crash with 2021-10-05 11:01:39,724 INFO [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = ' false ' , message = 'Stopping connector after error in the application' s handler method: null ', error = ' {}': java.lang.NullPointerException at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:878) at com.google.pubsub.v1.ProjectTopicName.<init>(ProjectTopicName.java:55) at com.google.pubsub.v1.ProjectTopicName.<init>(ProjectTopicName.java:28) at com.google.pubsub.v1.ProjectTopicName$Builder.build(ProjectTopicName.java:156) at com.google.pubsub.v1.ProjectTopicName.of(ProjectTopicName.java:60) at io.debezium.server.pubsub.PubSubChangeConsumer.lambda$handleBatch$2(PubSubChangeConsumer.java:120) at java.base/java.util.HashMap.computeIfAbsent(Unknown Source) at io.debezium.server.pubsub.PubSubChangeConsumer.handleBatch(PubSubChangeConsumer.java:120) at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83) at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:821) at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188) at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:145) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang. Thread .run(Unknown Source)

      When trying to provide a custom PubSub Publisher to PubSubChangeConsumer via @CustomConsumerBuilder injection point, the projectId property of the change consumer is not being set. Because of this return once a custom publisher is found, the projectId is null since it's being set below, here

      It's actually failing here since it's trying to create a TopicName for a null project.

      One way to fix this is to move the lines of code where the project is being set above the if that checks customPublisherBuilder.isResolvable(). Like this

      @PostConstruct    void connect() {
      final Config config = ConfigProvider.getConfig();        
      projectId = config.getOptionalValue(PROP_PROJECT_ID, String.class).orElse(ServiceOptions.getDefaultProjectId());
      
      ... // if statement
      
      }
      

       

       

       Another way is to app new properties to configure the default Publisher with things like batch size/number of messages inside a batch, but this one is going to be difficult.

              Unassigned Unassigned
              plugarut Tudor Plugaru (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: