Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7098

Empty object sent to GCP Pub/Sub after DELETE event

XMLWordPrintable

      Bug report

      For bug reports, provide this information, please:

      Context

      I want Debezium to send the captured database change events to GCP Pub/Sub topics. Create and Update events work fine but I have an error on Delete.

      What Debezium connector do you use and what version?

      I am using the docker image debezium/server:2.4.0.Final for Postgresql.

      What is the connector configuration?

       

      # Pubsub setup: https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_google_cloud_pubsub
      debezium.sink.type=pubsub
      debezium.sink.pubsub.project.id=project_id
      debezium.sink.pubsub.ordering.enabled=true
      
      # Offset storage
      debezium.source.offset.storage.file.filename=data/offsets.dat
      debezium.source.offset.flush.interval.ms=0
      
      # Postgres
      debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
      debezium.source.database.hostname=hostname
      debezium.source.database.port=5432
      debezium.source.database.user=user
      debezium.source.database.password=password
      debezium.source.database.dbname=dbname
      debezium.source.database.server.name=server_name
      debezium.source.schema.include.list=public
      debezium.source.topic.prefix=staging
      debezium.source.table.include.list=public.tablename
      debezium.source.events.exclude=read
      
      #Error handling
      debezium.event.processing.failure.handling.mode=warn
      
      debezium.snapshot.mode=never
      debezium.source.plugin.name=pgoutput
      pk.mode=record_key
      
      debezium.transforms=Reroute
      debezium.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
      debezium.transforms.Reroute.topic.regex=(.*)name(.*)
      debezium.transforms.Reroute.topic.replacement=$1name.all
      

      What is the captured database version and mode of deployment?

      The captured database is a self-hosted PostgreSQL 16.

      What behaviour do you expect?

      I am expecting a delete event to be sent to GCP Pub/Sub containing a before object in the payload.

      What behaviour do you see?

      I am getting an error when Debezium sends the delete event to Pub/Sub. The error is 'INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.'

      Do you see the same behaviour using the latest relesead Debezium version?

      I have tried version 2.3.4.Final, 2.4.0.Final and 2.5.0.Alpha1. I get the same behavior.

      Do you have the connector logs, ideally from start till finish?

      Logs from when the delete event is received :

      {"timestamp":"2023-10-31T08:57:07.583Z","sequence":161,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.postgresql.connection.WalPositionLocator","level":"INFO","message":"Message with LSN 'LSN\{17/CF80F430}' arrived, switching off the filtering","threadName":"debezium-postgresconnector-staging-change-event-source-coordinator","threadId":39,"mdc":\{"dbz.taskId":"0","dbz.databaseName":"orthop_formation","dbz.connectorName":"staging","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"502bb8029a34","processName":"io.debezium.server.Main","processId":1}
      {"timestamp":"2023-10-31T08:58:25.945Z","sequence":162,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.common.BaseSourceTask","level":"INFO","message":"1 records sent during previous 00:01:37.997, last recorded offset of \{server=staging} partition is \{transaction_id=null, lsn_proc=102265593656, messageType=INSERT, lsn_commit=102265591432, lsn=102265593656, txId=91392, ts_usec=1698742705367688}","threadName":"pool-7-thread-1","threadId":23,"mdc":{},"ndc":"","hostName":"502bb8029a34","processName":"io.debezium.server.Main","processId":1}
      {"timestamp":"2023-10-31T08:58:52.218Z","sequence":163,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.EmbeddedEngine","level":"INFO","message":"Stopping the task and engine","threadName":"pool-7-thread-1","threadId":23,"mdc":{},"ndc":"","hostName":"502bb8029a34","processName":"io.debezium.server.Main","processId":1}
      {"timestamp":"2023-10-31T08:58:52.219Z","sequence":164,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.common.BaseSourceTask","level":"INFO","message":"Stopping down connector","threadName":"pool-7-thread-1","threadId":23,"mdc":{},"ndc":"","hostName":"502bb8029a34","processName":"io.debezium.server.Main","processId":1}
      {"timestamp":"2023-10-31T08:58:52.328Z","sequence":165,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-16-thread-1","threadId":85,"mdc":\{"dbz.taskId":"0","dbz.databaseName":"orthop_formation","dbz.connectorName":"staging","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"502bb8029a34","processName":"io.debezium.server.Main","processId":1}
      {"timestamp":"2023-10-31T08:58:52.339Z","sequence":166,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-17-thread-1","threadId":86,"mdc":\{"dbz.taskId":"0","dbz.databaseName":"orthop_formation","dbz.connectorName":"staging","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"502bb8029a34","processName":"io.debezium.server.Main","processId":1}
      {"timestamp":"2023-10-31T08:58:52.34Z","sequence":167,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"INFO","message":"Finished streaming","threadName":"debezium-postgresconnector-staging-change-event-source-coordinator","threadId":39,"mdc":\{"dbz.taskId":"0","dbz.databaseName":"orthop_formation","dbz.connectorName":"staging","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"502bb8029a34","processName":"io.debezium.server.Main","processId":1}
      {"timestamp":"2023-10-31T08:58:52.341Z","sequence":168,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"INFO","message":"Connected metrics set to 'false'","threadName":"debezium-postgresconnector-staging-change-event-source-coordinator","threadId":39,"mdc":\{"dbz.taskId":"0","dbz.databaseName":"orthop_formation","dbz.connectorName":"staging","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"502bb8029a34","processName":"io.debezium.server.Main","processId":1}
      {"timestamp":"2023-10-31T08:58:52.345Z","sequence":169,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.signal.SignalProcessor","level":"INFO","message":"SignalProcessor stopped","threadName":"pool-7-thread-1","threadId":23,"mdc":{},"ndc":"","hostName":"502bb8029a34","processName":"io.debezium.server.Main","processId":1}
      {"timestamp":"2023-10-31T08:58:52.354Z","sequence":170,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.storage.FileOffsetBackingStore","level":"INFO","message":"Stopped FileOffsetBackingStore","threadName":"pool-7-thread-1","threadId":23,"mdc":{},"ndc":"","hostName":"502bb8029a34","processName":"io.debezium.server.Main","processId":1}
      {"timestamp":"2023-10-31T08:58:52.355Z","sequence":171,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.ConnectorLifecycle","level":"ERROR","message":"Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.', error = 'io.debezium.DebeziumException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.'","threadName":"pool-7-thread-1","threadId":23,"mdc":{},"ndc":"","hostName":"502bb8029a34","processName":"io.debezium.server.Main","processId":1,"exception":\{"refId":1,"exceptionType":"io.debezium.DebeziumException","message":"java.util.concurrent.ExecutionException: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.","frames":[{"class":"io.debezium.server.pubsub.PubSubChangeConsumer","method":"handleBatch","line":244},\{"class":"io.debezium.embedded.ConvertingEngineBuilder","method":"lambda$notifying$2","line":101},\{"class":"io.debezium.embedded.EmbeddedEngine","method":"pollRecords","line":1008},\{"class":"io.debezium.embedded.EmbeddedEngine","method":"run","line":755},\{"class":"io.debezium.embedded.ConvertingEngineBuilder$2","method":"run","line":229},\{"class":"io.debezium.server.DebeziumServer","method":"lambda$start$1","line":170},\{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},\{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},\{"class":"java.lang.Thread","method":"run","line":829}],"causedBy":\{"exception":{"refId":2,"exceptionType":"java.util.concurrent.ExecutionException","message":"com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.","frames":[{"class":"com.google.common.util.concurrent.AbstractFuture","method":"getDoneValue","line":592},\{"class":"com.google.common.util.concurrent.AbstractFuture","method":"get","line":467},\{"class":"com.google.common.util.concurrent.AbstractFuture$TrustedFuture","method":"get","line":119},\{"class":"com.google.common.util.concurrent.ForwardingFuture","method":"get","line":75},\{"class":"io.debezium.server.pubsub.PubSubChangeConsumer","method":"handleBatch","line":241},\{"class":"io.debezium.embedded.ConvertingEngineBuilder","method":"lambda$notifying$2","line":101},\{"class":"io.debezium.embedded.EmbeddedEngine","method":"pollRecords","line":1008},\{"class":"io.debezium.embedded.EmbeddedEngine","method":"run","line":755},\{"class":"io.debezium.embedded.ConvertingEngineBuilder$2","method":"run","line":229},\{"class":"io.debezium.server.DebeziumServer","method":"lambda$start$1","line":170},\{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},\{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},\{"class":"java.lang.Thread","method":"run","line":829}],"causedBy":\{"exception":{"refId":3,"exceptionType":"com.google.api.gax.rpc.InvalidArgumentException","message":"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.","frames":[{"class":"com.google.api.gax.rpc.ApiExceptionFactory","method":"createException","line":92},\{"class":"com.google.api.gax.rpc.ApiExceptionFactory","method":"createException","line":41},\{"class":"com.google.api.gax.grpc.GrpcApiExceptionFactory","method":"create","line":86},\{"class":"com.google.api.gax.grpc.GrpcApiExceptionFactory","method":"create","line":66},\{"class":"com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture","method":"onFailure","line":97},\{"class":"com.google.api.core.ApiFutures$1","method":"onFailure","line":84},\{"class":"com.google.common.util.concurrent.Futures$CallbackListener","method":"run","line":1127},\{"class":"com.google.common.util.concurrent.DirectExecutor","method":"execute","line":31},\{"class":"com.google.common.util.concurrent.AbstractFuture","method":"executeListener","line":1286},\{"class":"com.google.common.util.concurrent.AbstractFuture","method":"complete","line":1055},\{"class":"com.google.common.util.concurrent.AbstractFuture","method":"setException","line":807},\{"class":"io.grpc.stub.ClientCalls$GrpcFuture","method":"setException","line":578},\{"class":"io.grpc.stub.ClientCalls$UnaryStreamToFuture","method":"onClose","line":548},\{"class":"io.grpc.PartialForwardingClientCallListener","method":"onClose","line":39},\{"class":"io.grpc.ForwardingClientCallListener","method":"onClose","line":23},\{"class":"io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener","method":"onClose","line":40},\{"class":"com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1","method":"onClose","line":541},\{"class":"io.grpc.internal.DelayedClientCall$DelayedListener$3","method":"run","line":489},\{"class":"io.grpc.internal.DelayedClientCall$DelayedListener","method":"delayOrExecute","line":453},\{"class":"io.grpc.internal.DelayedClientCall$DelayedListener","method":"onClose","line":486},\{"class":"io.grpc.internal.ClientCallImpl","method":"closeObserver","line":567},\{"class":"io.grpc.internal.ClientCallImpl","method":"access$300","line":71},\{"class":"io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed","method":"runInternal","line":735},\{"class":"io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed","method":"runInContext","line":716},\{"class":"io.grpc.internal.ContextRunnable","method":"run","line":37},\{"class":"io.grpc.internal.SerializingExecutor","method":"run","line":133},\{"class":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":515},\{"class":"java.util.concurrent.FutureTask","method":"run","line":264},\{"class":"java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask","method":"run","line":304},\{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},\{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},\{"class":"java.lang.Thread","method":"run","line":829}],"causedBy":\{"exception":{"refId":4,"exceptionType":"io.grpc.StatusRuntimeException","message":"INVALID_ARGUMENT: One or more messages in the publish request is empty. Each message must contain either non-empty data, or at least one attribute.","frames":[{"class":"io.grpc.Status","method":"asRuntimeException","line":539},\{"class":"io.grpc.stub.ClientCalls$UnaryStreamToFuture","method":"onClose","line":548},\{"class":"io.grpc.PartialForwardingClientCallListener","method":"onClose","line":39},\{"class":"io.grpc.ForwardingClientCallListener","method":"onClose","line":23},\{"class":"io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener","method":"onClose","line":40},\{"class":"com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1","method":"onClose","line":541},\{"class":"io.grpc.internal.DelayedClientCall$DelayedListener$3","method":"run","line":489},\{"class":"io.grpc.internal.DelayedClientCall$DelayedListener","method":"delayOrExecute","line":453},\{"class":"io.grpc.internal.DelayedClientCall$DelayedListener","method":"onClose","line":486},\{"class":"io.grpc.internal.ClientCallImpl","method":"closeObserver","line":567},\{"class":"io.grpc.internal.ClientCallImpl","method":"access$300","line":71},\{"class":"io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed","method":"runInternal","line":735},\{"class":"io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed","method":"runInContext","line":716},\{"class":"io.grpc.internal.ContextRunnable","method":"run","line":37},\{"class":"io.grpc.internal.SerializingExecutor","method":"run","line":133},\{"class":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":515},\{"class":"java.util.concurrent.FutureTask","method":"run","line":264},\{"class":"java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask","method":"run","line":304},\{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},\{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},\{"class":"java.lang.Thread","method":"run","line":829}]}}}}}}}}
      {"timestamp":"2023-10-31T08:58:52.381Z","sequence":172,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.DebeziumServer","level":"INFO","message":"Received request to stop the engine","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"502bb8029a34","processName":"io.debezium.server.Main","processId":1}
      {"timestamp":"2023-10-31T08:58:52.381Z","sequence":173,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.EmbeddedEngine","level":"INFO","message":"Stopping the embedded engine","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"502bb8029a34","processName":"io.debezium.server.Main","processId":1}
      {"timestamp":"2023-10-31T08:58:52.405Z","sequence":174,"loggerClassName":"org.jboss.logging.Logger","loggerName":"io.quarkus","level":"INFO","message":"debezium-server-dist stopped in 0.045s","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"502bb8029a34","processName":"io.debezium.server.Main","processId":1}
      

            Unassigned Unassigned
            arthurlr Arthur LE RAY (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

              Created:
              Updated:
              Resolved: