Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-4441

Redis Sink - Change the format of the message sent to the stream

XMLWordPrintable

    • Icon: Enhancement Enhancement
    • Resolution: Done
    • Icon: Major Major
    • 2.0.0.Beta2
    • 1.7.1.Final
    • debezium-server
    • None
    • False
    • False

      Current format is:

       

      1) 1) "1639304527499-0"
         2) 1) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Key\"}, \"payload\": {\"empno\": 11}}"
            2) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"before\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"after\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"version\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"connector\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"name\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"ts_ms\"}, {\"type\": \"string\", \"optional\": true, \"name\": \"io.debezium.data.Enum\", \"version\": 1, \"parameters\": {\"allowed\": \"true,last,false\"}, \"default\": \"false\", \"field\": \"snapshot\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"db\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"sequence\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"schema\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"table\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"change_lsn\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"commit_lsn\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"event_serial_no\"}], \"optional\": false, \"name\": \"io.debezium.connector.sqlserver.Source\", \"field\": \"source\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"op\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"ts_ms\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"id\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"total_order\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"data_collection_order\"}], \"optional\": true, \"field\": \"transaction\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Envelope\"}, \"payload\": {\"before\": {\"empno\": 11, \"fname\": \"Yossi\", \"lname\": \"Mague\", \"job\": \"PFE\", \"mgr\": 1, \"hiredate\": 1562630400000, \"sal\": \"dzWUAA==\", \"comm\": \"AYag\", \"dept\": 3}, \"after\": null, \"source\": {\"version\": \"1.6.0.Final\", \"connector\": \"sqlserver\", \"name\": \"redislabs\", \"ts_ms\": 1637859764960, \"snapshot\": \"false\", \"db\": \"RedisConnect\", \"sequence\": null, \"schema\": \"dbo\", \"table\": \"emp\", \"change_lsn\": \"0000003a:00002f50:0002\", \"commit_lsn\": \"0000003a:00002f50:0005\", \"event_serial_no\": 1}, \"op\": \"d\", \"ts_ms\": 1637859769370, \"transaction\": null}}" 

       

      While it should be in this format of key-value pairs to be consistent with Redis documentation [1] (and to other integrations such as Grafana):

      1) 1) "1639304527499-0"
         2) 1) "key"
            2) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Key\"}, \"payload\": {\"empno\": 11}}"
            3) "value"
            4) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"before\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"after\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"version\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"connector\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"name\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"ts_ms\"}, {\"type\": \"string\", \"optional\": true, \"name\": \"io.debezium.data.Enum\", \"version\": 1, \"parameters\": {\"allowed\": \"true,last,false\"}, \"default\": \"false\", \"field\": \"snapshot\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"db\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"sequence\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"schema\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"table\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"change_lsn\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"commit_lsn\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"event_serial_no\"}], \"optional\": false, \"name\": \"io.debezium.connector.sqlserver.Source\", \"field\": \"source\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"op\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"ts_ms\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"id\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"total_order\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"data_collection_order\"}], \"optional\": true, \"field\": \"transaction\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Envelope\"}, \"payload\": {\"before\": {\"empno\": 11, \"fname\": \"Yossi\", \"lname\": \"Mague\", \"job\": \"PFE\", \"mgr\": 1, \"hiredate\": 1562630400000, \"sal\": \"dzWUAA==\", \"comm\": \"AYag\", \"dept\": 3}, \"after\": null, \"source\": {\"version\": \"1.6.0.Final\", \"connector\": \"sqlserver\", \"name\": \"redislabs\", \"ts_ms\": 1637859764960, \"snapshot\": \"false\", \"db\": \"RedisConnect\", \"sequence\": null, \"schema\": \"dbo\", \"table\": \"emp\", \"change_lsn\": \"0000003a:00002f50:0002\", \"commit_lsn\": \"0000003a:00002f50:0005\", \"event_serial_no\": 1}, \"op\": \"d\", \"ts_ms\": 1637859769370, \"transaction\": null}}"
        

       

       [1] https://redis.io/topics/streams-intro

            Unassigned Unassigned
            yossish Yossi Shirizli (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated:
              Resolved: