Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-9192

OpenLineage - Postgres connector emits Lineage Event with incorrect dataset name

XMLWordPrintable

    • Important

      Bug report

      What Debezium connector do you use and what version?

      Debezium 3.2

      What is the connector configuration?

       

      {
        "name": "postgres-connector",
        "config": {
          "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
          "database.hostname": "postgres",
          "database.port": "5432",
          "database.user": "postgres",
          "database.password": "postgres",
          "database.dbname": "debezium_poc_db",
          "database.server.name": "dbserver1",
          "plugin.name": "pgoutput",
          "slot.name": "debezium",
          "table.include.list": "public.product",
          "topic.prefix": "debezium",
          "value.converter": "org.apache.kafka.connect.json.JsonConverter",
          "openlineage.integration.enabled": "true",
          "openlineage.integration.config.file.path": "/debezium/openlineage.yml",
          "openlineage.integration.job.namespace": "dbz-jobs",
          "openlineage.integration.job.description": "Debezium Postgres Connector Job",
          "openlineage.integration.job.tags": "env=test",
          "openlineage.integration.job.owners": "Data Team=Jan Doe",
          "transforms": "openlineage",
          "transforms.openlineage.type": "io.debezium.transforms.openlineage.OpenLineage",
          "schema.history.internal.kafka.bootstrap.servers": "kafka:9094"
        }
      }
       

      What is the captured database version and mode of deployment?

      Docker compose. Here's my setup:

      debezium:
          image: quay.io/debezium/connect:3.2
          container_name: debezium
          ports:
            - "8083:8083"
          environment:
            - BOOTSTRAP_SERVERS=kafka:9094
            - GROUP_ID=1
            - CONFIG_STORAGE_TOPIC=dbz_configs
            - OFFSET_STORAGE_TOPIC=dbz_offsets
            - STATUS_STORAGE_TOPIC=dbz_statuses
            - CONNECT_REST_ADVERTISED_HOST_NAME=debezium
            - CONNECT_REST_PORT=8083
            - CONNECT_PLUGIN_PATH=/kafka/connect
            - KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
            - VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
          depends_on:
            - kafka
            - postgres
            - schema-registry
          volumes:
            - debezium_data:/kafka/connect
            - ./debezium/openlineage.yml:/debezium/openlineage.yml
            - ./debezium/jars/openlineage-java-1.31.0-all-deps.jar:/kafka/connect/debezium-connector-postgres/openlineage-java-1.31.0-all-deps.jar
       

      here's my openlineage.yml:

      transport:
        type: http
        url: http://request-logger-proxy:80/openapi/openlineage/api/v1/lineage
      facets:
        symlinks:
          disabled: true
       

      the request logger proxy is an nginx proxy that logs the payload and forwards it to the openlineage server (datahub, but that's not relevant)

      What behavior do you expect?

      <Your answer>

      What behavior do you see?

      The inputs:  [...] of the Lineage Event being published is this:

      {
        "time": "2025-06-26T18:33:49+00:00",
        "remote_addr": "172.19.0.7",
        "request": "POST /openapi/openlineage/api/v1/lineage HTTP/1.1",
        "status": 500,
        "body_bytes_sent": 0,
        "request_time": 0.005,
        "http_referer": "",
        "http_user_agent": "Apache-HttpClient/5.4.3 (Java/21.0.7)",
        "request_body": {
          "eventTime": "2025-06-26T18:33:49.987599294Z",
          "producer": "https://github.com/debezium/debezium/v3.2.0.CR1",
          "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
          "eventType": "RUNNING",
          "run": {
            "runId": "0197ad84-7a0d-7527-9f8a-a08602319b4d",
            "facets": {
              "processing_engine": {
                "_producer": "https://github.com/debezium/debezium/v3.2.0.CR1",
                "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet",
                "version": "3.2.0.CR1",
                "name": "Debezium",
                "openlineageAdapterVersion": "1.31.0"
              },
              "debezium_config": {
                "configs": [
                  "connector.class=io.debezium.connector.postgresql.PostgresConnector",
                  "database.user=postgres",
                  "database.dbname=debezium_poc_db",
                  "transforms.openlineage.type=io.debezium.transforms.openlineage.OpenLineage",
                  "slot.name=debezium",
                  "openlineage.integration.job.namespace=dbz-jobs",
                  "openlineage.integration.job.owners=Data Team=Jan Doe",
                  "openlineage.integration.config.file.path=/debezium/openlineage.yml",
                  "transforms=openlineage",
                  "database.server.name=dbserver1",
                  "schema.history.internal.kafka.bootstrap.servers=kafka:9094",
                  "database.port=5432",
                  "plugin.name=pgoutput",
                  "topic.prefix=debezium",
                  "task.class=io.debezium.connector.postgresql.PostgresConnectorTask",
                  "database.hostname=postgres",
                  "database.password=postgres",
                  "name=postgres-connector-2",
                  "table.include.list=public.product",
                  "value.converter=org.apache.kafka.connect.json.JsonConverter",
                  "openlineage.integration.job.description=Debezium Postgres Connector Job",
                  "openlineage.integration.enabled=true",
                  "openlineage.integration.job.tags=env=test"
                ],
                "additionalProperties": {},
                "_producer": "https://github.com/debezium/debezium/v3.2.0.CR1",
                "_schemaURL": "https://github.com/debezium/debezium/tree/main/debezium-core/src/main/java/io/debezium/openlineage/facets/spec/DebeziumRunFacet.json"
              }
            }
          },
          "job": {
            "namespace": "dbz-jobs",
            "name": "debezium.0",
            "facets": {
              "jobType": {
                "_producer": "https://github.com/debezium/debezium/v3.2.0.CR1",
                "_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet",
                "processingType": "STREAMING",
                "integration": "DEBEZIUM",
                "jobType": "TASK"
              },
              "ownership": {
                "_producer": "https://github.com/debezium/debezium/v3.2.0.CR1",
                "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/OwnershipJobFacet.json#/$defs/OwnershipJobFacet",
                "owners": [
                  {
                    "name": "Data Team",
                    "type": "Jan Doe"
                  }
                ]
              },
              "tags": {
                "_producer": "https://github.com/debezium/debezium/v3.2.0.CR1",
                "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/TagsJobFacet.json#/$defs/TagsJobFacet",
                "tags": [
                  {
                    "key": "env",
                    "value": "test",
                    "source": "CONFIG"
                  }
                ]
              },
              "documentation": {
                "_producer": "https://github.com/debezium/debezium/v3.2.0.CR1",
                "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DocumentationJobFacet.json#/$defs/DocumentationJobFacet",
                "description": "Debezium Postgres Connector Job"
              }
            }
          },
          "inputs": [
            {
              "namespace": "postgres://postgres:5432",
              "name": "public.product",
              "facets": {
                "datasetType": {
                  "_producer": "https://github.com/debezium/debezium/v3.2.0.CR1",
                  "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DatasetTypeDatasetFacet.json#/$defs/DatasetTypeDatasetFacet",
                  "datasetType": "TABLE",
                  "subType": ""
                },
                "schema": {
                  "_producer": "https://github.com/debezium/debezium/v3.2.0.CR1",
                  "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet",
                  "fields": [
                    {
                      "name": "id",
                      "type": "serial"
                    },
                    {
                      "name": "name",
                      "type": "varchar"
                    },
                    {
                      "name": "price",
                      "type": "int4"
                    },
                    {
                      "name": "creation_date",
                      "type": "timestamp"
                    }
                  ]
                }
              }
            }
          ],
          "outputs": []
        }
      } 

      The input table's name is "public.product" in the event. According to the OpenLineage specification, postgres tables should contain the database name as well:

      https://openlineage.io/docs/spec/naming

      {database}

      .{schema}.{table}

       

      Do you see the same behaviour using the latest released Debezium version?

      It is the last stable version AFAIK, and contains all OpenLineage related commits AFAIK

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      I can recreate the issue if necessary, but I don't think it will be necessary - it's pretty straightforward.

      How to reproduce the issue using our tutorial deployment?

      I'm still working on a repo and plan to implement a workaround (correcting the http request in my proxy). I'll make the repo available so reproducing shouldn't be an issue. I'm in contact with Mario who implemented Open Lineage integration.

       

              rh-ee-mvitale Mario Fiore Vitale
              jan.siekierski Jan Siekierski (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: