Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-4667

Fix null transaction id caused by VEvents without VGTID

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Unresolved
    • Icon: Major Major
    • 1.9-backlog
    • None
    • vitess-connector
    • None

      I identified a bug in Vitess connector that causes the connector to crash with "Invalid value: null used for required field: "id", schema type: STRING" error message.

      [2022-02-03 03:03:56,883] INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask)
      	... 5 more
      	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:700)
      	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:715)
      	at io.grpc.internal.DelayedClientCall$DelayedListener.onMessage(DelayedClientCall.java:447)
      	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:465)
      	at io.debezium.connector.vitess.connection.VitessReplicationConnection$1.onNext(VitessReplicationConnection.java:77)
      	at io.debezium.connector.vitess.connection.VitessReplicationConnection$1.onNext(VitessReplicationConnection.java:99)
      	at io.debezium.connector.vitess.connection.VStreamOutputMessageDecoder.processMessage(VStreamOutputMessageDecoder.java:58)
      	at io.debezium.connector.vitess.connection.VStreamOutputMessageDecoder.handleBeginMessage(VStreamOutputMessageDecoder.java:114)
      	at io.debezium.connector.vitess.VitessStreamingChangeEventSource.lambda$newReplicationMessageProcessor$0(VitessStreamingChangeEventSource.java:99)
      	at io.debezium.pipeline.EventDispatcher.dispatchTransactionStartedEvent(EventDispatcher.java:282)
      	at io.debezium.pipeline.txmetadata.TransactionMonitor.transactionStartedEvent(TransactionMonitor.java:148)
      	at io.debezium.pipeline.txmetadata.TransactionMonitor.beginTransaction(TransactionMonitor.java:166)
      	at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
      	at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
      	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
      Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "id", schema type: STRING
      	... 3 more
      	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
      	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
      	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:781)
      	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:802)
      	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:69)
      	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:616)
      	at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460)
      	at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427)
      	at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463)
      	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:478)
      	at io.grpc.Status.asRuntimeException(Status.java:533)
      Caused by: io.grpc.StatusRuntimeException: CANCELLED: Failed to read message.
      	at java.base/java.lang.Thread.run(Thread.java:829)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:108)
      	at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:139)
      	at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172)
      	at io.debezium.connector.vitess.VitessStreamingChangeEventSource.execute(VitessStreamingChangeEventSource.java:29)
      	at io.debezium.connector.vitess.VitessStreamingChangeEventSource.execute(VitessStreamingChangeEventSource.java:78)
      

      The cause is that the VEvents batch does not contain VGTID events, but the connector still dispatches transaction events for BEGIN and COMMIT events

      $ grpcurl -d '{ "vgtid": { "shard_gtids": [ { "keyspace": "test_keyspace", "shard": "80-", "gtid": "MySQL56/f84895a8-4eed-42ba-b500-469fd71cc08b:1-2499683:3415911-3415942:4415911-4415931" } ] } }' -plaintext xxxxxx:1234 vtgateservice.Vitess.VStream
      {
        "events": [
          {
            "type": "BEGIN",
            "timestamp": "1637704151",
            "currentTime": "1643857685432455570"
          },
          {
            "type": "COMMIT",
            "timestamp": "1637704151",
            "currentTime": "1643857685432479353"
          }
        ]
      }
      {
        "events": [
          {
            "type": "BEGIN",
            "timestamp": "1643838860",
            "currentTime": "1643857685432505735"
          },
          {
            "type": "COMMIT",
            "timestamp": "1643838860",
            "currentTime": "1643857685432535479"
          }
        ]
      }
      

       

              Unassigned Unassigned
              shichaoan Shichao An (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

                Created:
                Updated: