Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5382

Support multiple tasks in vitess connector

XMLWordPrintable

    • Icon: Enhancement Enhancement
    • Resolution: Done
    • Icon: Critical Critical
    • 2.0.0.Beta2
    • 1.9.4.Final
    • vitess-connector
    • None
    • False
    • None
    • False

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      <Your answer>

      What is the connector configuration?

      <Your answer>

      What is the captured database version and mode of depoyment?

      (E.g. on-premises, with a specific cloud provider, etc.)

      <Your answer>

      What behaviour do you expect?

      <Your answer>

      What behaviour do you see?

      <Your answer>

      Do you see the same behaviour using the latest relesead Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      <Your answer>

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      <Your answer>

      How to reproduce the issue using our tutorial deployment?

      <Your answer>

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      The current Vitess Connector can be launched in two modes:

       

      1. Don't specify Vitess shard info, this will make the connector task to subscribe to ViTess VStream for all shards in the given KeySpace;

      2. Specify a particular Vitess shard info, this will make the connector task to subscribe to VStream just for this shard;

       

      In both modes, the connector will only launch one source task (VitessConnectorTask) to do the actual VStream processing.

       

      For a bigger Vitess installation which has many shards, one vitess-dbzium connector with only one source task (mode 1) is not going to be able to sustains the event stream from Vitess.  For mode 2, the operation person needs to manually figure out all the shards in the vitess and manually launch many vitess-dbzium connectors (one connector for one Vitess shard).  This is very error prone and might end up with 1000 connectors to launch and manage.

       

      In Kafka-Connect's deployment model, the recommendation is to launch one connector and have the connector do the service discovery of Vitess shards and launch appropriate number of source tasks to manage the traffic load from Vitess side.

       

      Clearly we need to be able to support multi-tasks for vitess-debezium connector for large vitess installation.

      Implementation ideas (optional)

      The current limitation of Vitess-debezium connector is rooted in the fact that it is very hard to separate out the vitess shard information in change data events.  For Kafka Connect programming model, the connector is supposed to emit SourceRecord objects and each SourceRecord object contains a SourcePartition and SourceOffset, these SourcePartition and SourceOffset are the basis for Kafka Connect runtime to persist the state for the change data stream.  For the change data stream (VStream) coming from vitess side, each VStream will contain records from multiple shards if the original vstream subscription is associated with multiple shards.  The following is one example of VStream event:

       

      [ type:BEGIN timestamp:1616749631 current_time:1616749631516372189

        type:ROW timestamp:1616749631 row_event:<table_name:"customer.customer" row_changes:<after:<lengths:1 lengths:22 values:"6sougou@planetscale.com" > > row_changes:<after:<lengths:1 lengths:23 values:"7deepthi@planetscale.com" > > > current_time:1616749631517779353

        type:VGTID vgtid:<shard_gtids:<keyspace:"customer" shard:"80-" gtid:"MySQL56/6a60d315-8e10-11eb-b894-04ed332e05c2:1-77" > shard_gtids:<keyspace:"customer" shard:"-80" gtid:"MySQL56/629442b7-8e10-11eb-a0bb-04ed332e05c2:1-76" > >

        type:COMMIT timestamp:1616749631 current_time:1616749631517789376 ]

       

      VGTID is Vitess's GTID (on top of original MySQL's Gtid) which contains shard/gtid pairs.

       

      In the example above, the row change event is on customer.customer table for two shards: "80-" and "80" (80 and -80 are Vitess shard names).  Although we can parse the VGTID and extract the two shards information but it will become very hard for Kafka Connect framework to understand this SourceRecord is associated with two SourceParitions (with two distinct SourceOffsets) and needs to persist both of them into Kafka offset topic for each SourceRecord.

       

      To move forward, I can think of two approaches:

      1. Each source task object only subscribe to one vitess shard.  So for a 1000 shard vitess installation, the connector object will launch 1000 source tasks.  Although this approach makes the modeling easier but it has two major drawbacks:

          a. The vitess shards can be further split any time.  Once the shard splits, the original source task needs to stop and send a signal to the connector object and the connector object needs to launch two source tasks to replace the original one, this handshake between source task and connector is not trivial to implement in Kafka connect and there will also be issues on persisting the state on the old source task before it exits;

          b. Performance side, launching 1000 VStream subscriptions will mean a heavy load on Vitess's VtGate layer.  Now the VtGate service needs to create 1000 connections with downstream and filter out and dispatch the event streams coming from VtTablets into those 1000 mini-streams, it's less efficient than only managing one VStream;

      2. Give up the idea of mapping the Vitess Shard into a SourcePartition and persist state based on Vitess Shard.  Instead, we will simply use the source TaskID as the SourcePartition, and persist the state to Kafka offset topic based on source task Ids.  Let me illustrate: 

          a. The connector configuration will have a maxTasks config param, let's say user specified 2 tasks;

          b. The connector will scan the VtGate service and discover there are 4 shards for the given key space and do a simple work assignment between shards and tasks and return the assignments in taskConfigs() method.  Let's say the assignment is (task0: [shard0,shard1], task1: [shard2,shard3])

          c. KafkaConnect will launch two tasks based on the return value of connector.taskConfigs()

          d. task0 will open a VStream with VtGate for shard0 and shard1; task1 will open a VStream with VtGate for shard2 and shard3;

          e. task0 will process its VStream and emit the SourceRecord with SourcePartition=task0, SourceOffset=VGTID from VStream (e.g. <shard_gtids:<shard:shard0, gtid:123>, <shard:shard1, gtid:234>>, and Kafka Connect will persist the offset state using key=task0, value=<shard_gtids:<shard:shard0, gtid:123>, <shard:shard1, gtid:234>>;  Similar thing will happen on task1;

          f. when task0 stops and resumes later, it will retrieve the state from Kafka offset topic using key=task0 and use <shard_gtids:<shard:shard0, gtid:123> to re-subscribe to VStream

       

      Although the approach 2 seems dumb (it didn't attempt to extract the vitess shard information), this approach is very easy to implement in the current Vitess-Debezium code base, very little change is needed to apply the algorithm.

       

      There are two follow-up questions people might have:

      1. What happens if one of the vitess shard further split, e.g. shard0 becomes shard00 and shard01?  This actually wouldn't cause any problems. Shard0 was assigned to task0 and the VStream task0 receives will be changed from <shard_gtids:<shard:shard0>,<shard:shard1>> to <shard_gtids:<shard:shard00>,<shard:shard01>,<shard:shard1>> after the split is done.  The new shard_gtids with three shard information will overwrite the value on key=task0 on kafka offset topic (since the key=task0 didn't change).

          a. And this would work even if later task0 was killed and resumed since VtGate understands the three shards now.

          b. And this scheme would also work even if task0 was killed before the shard split and resumed after the shard split.  In that case, task0 would use shard_gtids<shard0,shard1> to subscribe to VtGate, VtGate actually have the historical Shard To VtTablet mapping to route the request to the correct VtTablet which still understands shard0 and the old gtid with shard0.  That VtTablet will continue issue all the missing events  until the split happens and the VStream afterwards will issue shard_gtids with 3 shards info.

       

      2. What would happen if the operator decide to scale up on debezium side to launch 4 tasks to match the increasing load coming from Vitess side?  This need to pause the original connector and relaunch it with maxTasks=4.  But you would see there is problem now, the Kafka offset topic is partitioned by 2 tasks, now we need to partition by 4 tasks.  We would need a migration step to read in the old K/Vs from Kafka offset topics and re-generate the offset states using 4 task keys.  This migration step can be done either by a separate tool reading and writing Kafka offset topic, or be done inside debezium-vitess connector during restart once it detects maxTasks config params changes.

              Unassigned Unassigned
              haiyingcai Henry Haiying Cai (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: