Report resources
This guide provides service providers with general recommendations for reliably replicating data into Kessel. Successful integration requires navigating the complexities of distributed systems by selecting appropriate strategies that solve for consistency, resiliency, and durability. Depending on what guarantees your system needs, you can expect to face some challenges such as the dual-write problem, write skew, and more. This guide should help you understand how to address these challenges and implement a replication strategy that suits your application.
Using an Outbox
Section titled âUsing an OutboxâWriting to your database and publishing events to a message broker are two distinct operations on external systems that cannot happen in an atomic manner (aka Dual Write Problem). This means that if your application crashes after writing to the database but before publishing an asynchronous event, your database and any message consumers may become permanently out of sync. The outbox pattern helps solve this problem by writing resource aggregates and messaging events within the same database transaction using an outbox table. When coupled with something like Debezium, the outbox table can be used to publish events to a message broker, ensuring that the database and messages are always (eventually) in sync.
The Outbox Table
Section titled âThe Outbox TableâThis example uses the same structure as the Debezium Outbox Event Router example.
CREATE TABLE outbox ( id UUID NOT NULL, aggregatetype VARCHAR(255) NOT NULL, aggregateid VARCHAR(255) NOT NULL, payload JSONB, PRIMARY KEY (id));
Column Breakdown:
- id: A unique identifier for each event, your primary key
- aggregatetype: Describes the type of business entity that an event relates to, for example, customer or order. This is used by Debezium to determine the destination topic.
- aggregateid: The unique identifier of the specific entity instance, like the customerâs ID or order number.
- payload: The actual content of the event message, typically stored as JSON or JSONB for flexibility. This is what message consumers will receive.
Additional columns can be added as needed, such as a timestamp
for event creation or operation
to indicate the type
of operation (e.g., create, update, delete). There are additional configurations
that need to be added to the debezium configuration if youâd like these fields to be mapped to headers or the payload
when producing messages.
Writing to the Outbox
Section titled âWriting to the OutboxâWith the outbox table in place, youâll need to modify your applicationâs business logic to write to it within the same transaction as your primary data changes.
For example, if youâre creating a new customer and want to publish an event about it, you could consider your transaction looking something like this:
BEGIN;INSERT INTO customers (id, name) VALUES ('123', 'John Doe');INSERT INTO outbox (id, aggregatetype, aggregateid, payload)VALUES ('456', 'customer', '123', '{"event": "customer_created", "data": {"id": "123", "name": "John Doe"}}');COMMIT;
This ensures that both the customer record and the outbox event are created atomically. If the transaction fails, neither the customer nor the outbox entry will be written, maintaining consistency between your business entity data and downstream consumers.
Pruning the Outbox
Section titled âPruning the OutboxâTo prevent the outbox table from growing indefinitely, you will need to implement a cleanup strategy.
With Debezium, since it will be reading from the write-ahead log, you can safely delete entries from the outbox table immediately; even within the same transaction that created them.
BEGIN;INSERT INTO customers (id, name) VALUES ('123', 'John Doe');INSERT INTO outbox (id, aggregatetype, aggregateid, payload) VALUES ('456', 'customer', '123', '{"event": "customer_created", "data": {"id": "123", "name": "John Doe"}}');DELETE FROM outbox WHERE id = '456';COMMIT;
If you need to retain history for auditing, debugging, or recovery purposes, consider implementing a retention policy or reconciler that archives old events to a separate table, long-term data store, or even deletes them entirely. Itâs important to note that any retained events will not be ordered by transaction commit order inherently. You may need additional mechanisms in your application or database to ensure outbox ordering, such as using a serial primary key.
Monitoring
Section titled âMonitoringâOutbox Event Creation: Tracking the rate of event creation in combination with the rate of event consumption can help identify end-to-end lag in your system. If the outbox is filling up faster than it can be processed, you may need to scale your consumers or optimize your event processing logic.
See our Monitoring Data Replication guide for more details on how to monitor your replication processes.
Beware: Write Skew
Section titled âBeware: Write SkewâInterleaved database writes could lead to write skew issues, where concurrent application read-modify-write cycles could be operating on stale data causing silent corruption. Itâs advisable to take this into consideration when designing your outbox and event processing logic.
Immediate Write Visibility with Asynchronous Messaging
Section titled âImmediate Write Visibility with Asynchronous MessagingâFor use cases where immediate visibility of data changes are required, but are bound by the nature of asynchronous event processing, you can consider using Postgres LISTEN and NOTIFY features. This can afford your request handler to wait for a notification related to the outcome of event consumption (e.g. replicating to kessel) before proceeding or returning to the client.
How it works
Section titled âHow it worksâThe process is fairly straightforward:
- Produce & Listen: The request handler produces an event to a Kafka topic (directly or via an outbox table) and then begins LISTENing on a Postgres channel.
- Consume & Notify: A separate consumer process parses the event. Upon completion, it issues a NOTIFY command to the same channel the request handler is listening on.
- Synchronize: The LISTENing request handler receives this notification, confirming that the event has been successfully consumed. This allows the handler to then continue its execution, for example, by returning a response to the client.
This approach effectively creates a synchronous workflow over an asynchronous workflow, ensuring that request handlers can await the completion of downstream event processing.
Examples
Section titled âExamplesâListening on a Postgres channel
Section titled âListening on a Postgres channelâTo listen for notifications on a specific Postgres channel, you can use the LISTEN
command. The example below listens
on the host-replication
channel. Channels are arbitrarily defined strings that you can use to group related notifications.
LISTEN "host-replication";
Notifying a Postgres channel
Section titled âNotifying a Postgres channelâThe SQL below sends a notification to all listeners on the host-replication
channel. The payload can be any string,
but in this example, it is a UUID that could be used to identify the specific event or request.
NOTIFY "host-replication", 'c0740fcd-c2a3-4767-b2d2-fd21cd12e31a';
Considerations
Section titled âConsiderationsâ- LISTEN/NOTIFY may not be supported by all database drivers, you should double-check that your current postgres driver can handle it.
- Postgres channels are not durable, meaning that if the request handler is not actively listening when the NOTIFY is sent, it will miss the notification. Listening should be one of the first things you do in your request handler.
- You may run into limitations if your Postgres channels are ephemeral, so it is not advised to create and drop channels frequently. Instead, use a consistent channel name for each type of event you want to listen for. This means that you may have many listeners on the same channel and your event/notification payloads should have some identifier to distinguish which event the listener is interested in.
- LISTEN/NOTIFY should use itâs own pg connection, separate from the one used for your application logic. This is because LISTEN/NOTIFY is a long-lived operation that can block other operations on the same connection. You should also aim to minimize connections in a way that each new LISTEN does not produce a new connection, but rather reuses an existing one.
Kafka Consumer Strategies
Section titled âKafka Consumer StrategiesâAs a service provider, you need a reliable way to replicate data changes from your systems to the Fabric. A Kafka consumer offers a robust and scalable pattern to solve this problem. A consumer reads messages from Kafka topics by subscribing to topic partitions. The consuming application then processes the messages to apply those changes.
This guide outlines the key challenges your consumer should address and provides some high-level strategies for building a robust solution.
We built our consumer in Go using the confluent-kafka-go client library.
Why Use a Kafka Consumer for Replication?
Section titled âWhy Use a Kafka Consumer for Replication?âWhen replicating data, a Kafka consumer solves the core problem of reliably processing asynchronous messages. It decouples your primary service from the replication process, meaning an issue in the replication pipeline wonât directly impact your main applicationâs performance.
Key Guarantees for Data Integrity
Section titled âKey Guarantees for Data IntegrityâTo maintain data consistency between your service and Kessel, your consumer implementation should prioritize data integrity. Below are three guarantees that we aim to enforce with our own consumer that you should too.
-
Messages must be processed in order: If messages are not processed in the order in which they are received we risk applying changes in a manner that causes unexpected consequences. For example, processing an
update
event before acreate
event for the same record would fail and cause data issues. -
Guarantee At-least-once processing: A consumer must never âloseâ a message. In other words, we should not be continuing to process the next message until the current one has been confirmed to be processed. Without this guarantee we risk losing data and will have permanent inconsistency for the âskippedâ event.
-
Processing should be idempotent: If a message is processed more than once, it should not cause any side effects or data corruption. This means that if a message is reprocessed due to a failure or retry, the outcome should be the same as if it were processed only once. This comes with a caveat that if a âhistoricalâ message is replayed, all messages after it also be reprocessed in order to ensure no data loss.
Authentication
Section titled âAuthenticationâTo read from a secured Kafka cluster, a consumer must first authenticate itself with the brokers. This is configured in the consumerâs properties. For our setup we use SASL with the SCRAM-SHA-512 mechanism.
A typical configuration would look like this:
security-protocol: sasl_plaintextsasl-mechanism: SCRAM-SHA-512sasl-username: my-generic-consumersasl-password: <PASSWORD>
Your consumer configuration must include properties like security.protocol
, sasl.mechanism
,
and other related settings to provide the necessary credentials for a successful connection to Kafka.
Retry Logic
Section titled âRetry LogicâMessage processing can fail for many reasons. Some failures are transient (e.g. a temporary network failure), while others are permanent (e.g. a malformed message). A robust consumer must handle these failures without halting all processing.
Our implementation uses two retry systems to handle message failures, prioritizing message durability over pure throughput.
- Operation-level Retry:
For transient failures within a specific task, a blocking
Retry
function is used.
// Retry executes the given function and will retry on failure with backoff until max retries is reachedfunc (i *InventoryConsumer) Retry(operation func() (string, error)) (string, error) { // ... for i.RetryOptions.OperationMaxRetries == -1 || attempts < i.RetryOptions.OperationMaxRetries { resp, err = operation() if err != nil { // ... logs error, increments metric time.Sleep(backoff) // Blocks here continue } return fmt.Sprintf("%s", resp), nil } // ...}
This function synchronously retries a failing operation with an exponential backoff up to a configured maximum.
Because it uses time.Sleep
, it blocks the consumerâs main processing loop.
- Consumer-Level Retry:
If a message fails processing in a way that the Consume loop cannot recover from (indicated by
consumer.ErrClosed
), a more drastic retry strategy is engaged.
// Outer consumer loopfor consumerOptions.RetryOptions.ConsumerMaxRetries == -1 || retries < consumerOptions.RetryOptions.ConsumerMaxRetries { // ... recreates the entire consumer connection inventoryConsumer, err = consumer.New(...) err = inventoryConsumer.Consume() if e.Is(err, consumer.ErrClosed) { // ... backs off and retries by re-entering the loop continue } // ...}
This strategy ensures a failed message is re-attempted from scratch by forcing a re-read from the last committed offset, preventing potential message loss.
Rebalances
Section titled âRebalancesâRebalances are critical to handle, as if your consumer loses a partition without first saving its progress the next consumer will start processing from the last saved point, leading to duplicate processing.
Our consumer handles partition reassignments from the Kafka cluster by registering a RebalanceCallback
. This
ensures its processing state is committed before partitions are lost.
func (i *InventoryConsumer) RebalanceCallback(consumer *kafka.Consumer, event kafka.Event) error { switch ev := event.(type) { // ... case kafka.RevokedPartitions: i.Logger.Warnf("consumer rebalance event: %d partition(s) revoked: %v\n", len(ev.Partitions), ev.Partitions)
// Commits stored offsets before partition is lost err := i.commitStoredOffsets() // ... } return nil}
When the consumer is notified that it is about to lose ownership of its partitions, it calls commitStoredOffsets()
. This
is essential to guarantee that all successfully processed messages before this point have been marked as complete.
This prevents the next consumer that receives the partition from reprocessing messages, maintaining the at-least-once
processing guarantee.
Leveraging Client Statistics
Section titled âLeveraging Client StatisticsâIn addition to application level metrics (e.g. âevents processedâ), most Kafka client libraries can be configured to emit internal performance statistics, often as JSON. These âstats messagesâ provide a deep, real-time view into the health and performance of the consumer client itself.
To enable this you typically set a configuration property like statistics.interval.ms
to a non-zero value
(e.g. 60000 for every 60 seconds). The client will then periodically provide a detailed report containing metrics such as:
- Round-trip time (rtt)
- Total messages consumed (rxmsgs)
These stats messages metrics are invaluable for debugging. You can use these metrics in dashboards and alerts we offer or build your own to track the health of the consumer.
Deployment
Section titled âDeploymentâHow you deploy your consumer has a few key considerations, with 3 main ways to do so.
- In-process thread (used by the Kessel team)
- Consumer logic runs as a thread within the main application process.
- Simple to deploy and manage without any network overhead for communication.
- Tightly coupled resources. A CPU and memory intensive task in the main application can starve the consumer thread and vice versa.
- Cannot scale the consumer independently from the main application.
- Sidecar container
- Consumer logic runs in its own container within the same pod as the main application.
- Decouples resources. The consumer and application have their own CPU and memory limits.
- Allows for independent scaling and focused monitoring and logging.
- Standalone pod
- Consumer logic runs as a completely separate deployment/service.
- Maximum decoupling of resources, scaling, and lifecycle.
- Highest operational complexity. Requires a separate deployment definition, monitoring, and alerting setup.
- Can be overkill if the consumerâs logic is tightly coupled to the domain of a single application.