-
Story
-
Resolution: Unresolved
-
Normal
-
None
-
Product / Portfolio Work
-
False
-
-
False
-
None
-
Unset
-
None
-
-
-
The Consumer is built to consume from a Kafka topic of outbox events and update the relationship tuple store so that its state is in sync with inventory. There are a few issues with the current implementation:
- Consumer violates repository encapsulation (direct DB calls w/ db datamodel)
- Consumer contains some business logic, crowded by infrastructure concerns
It does not fit into the layered architecture, because it's in its own peer package, and this violates dependency rules. Only "main" can depend on an infrastructure package.
Proposal:
- Add EventSource() from Store/Repository
- This is a stream of some "Delivery" abstraction that contains both the event and Ack/Nack methods.
- Move consumer logic to some RelationReplicationUsecase (application layer) and RelationReplicationService (domain) which together accept this event stream and handle replication logic
- In here, use ResourceRepository for Resource token update
As with repositories, Config types should be moved with the implementations they configure.
This has been prototyped and completed already in a separate branch. See: RelationReplicationUsecase, Store, Delivery
This organizes the logic between:
- Infrastructure: implementation of EventSource
- RelationReplicationUsecase: handles transaction boundary, interacts with repository to register event handler, invokes next layer. The event handler return value acks or nacks the event.
- RelationReplicationService (in domain)
RelationReplicationService is a domain service that takes a Delivery (containing an OutboxEvent + fencing Lock) and replicates the corresponding inventory changes to the relations service (e.g., SpiceDB). Its Replicate method does four things in sequence:
- Fetches representations – Based on the operation type (created/updated/deleted), it retrieves the current and/or previous versioned representations of the resource from the repository.
- Calculates tuples – Delegates to SchemaService.CalculateTuplesForResource to diff the current vs. previous representations and determine which relation tuples need to be created or deleted. Short-circuits if there are no tuple changes.
- Executes the relations operation – Calls RelationshipRepository (or equivalent) with the appropriate creates/deletes (varying by operation type) and the fencing Lock from the delivery. Gets back a ConsistencyToken.
- Updates the consistency token – For non-delete operations, persists the returned ConsistencyToken on the resource so downstream reads can use it for consistency guarantees.
The service is purely domain logic – it knows nothing about Kafka, offset management, or transactions. Those concerns live in the KafkaEventSource (infrastructure) and RelationReplicationUsecase (orchestration) respectively.