-
Notifications
You must be signed in to change notification settings - Fork 18
Comparison of harvesting methods
This document compares the methods used for harvesting an outbox, focusing mainly on the scraping approach.
Broadly speaking, there are two ways one can harvest an outbox:
- Tailing — using a separate Change Data Capture (CDC) facility to identify new outbox records as they come in.
-
Scraping — inferring the changes by analysing the contents of the outbox table.
goharvest
lies in this category.
It must be acknowledged that, in some ways, tailing is ideally suited to the problem. CDC presents data in the order they were committed, rather than the order in which the sequence numbers were assigned. There are no gaps and the audit records and queries are straightforward.
While it appears straightforward on the surface, there are several challenges inherent in CDC:
- CDC is not portable. Not all databases support CDC, and those that do might not offer CDC capability on managed offerings. For example, AWS offers CDC on conventional RDS, but not Aurora. Google Cloud SQL does not offer CDC at all. (Log-based CDC requires parsing of the database Write-Ahead Log, which means additional software must be running on each database server node.)
- Due to the asynchronous nature of publishing and the prospect of failures, we need to store a delivery state for each outbox record. Depending on how CRC is implemented, this may not be trivial.
- For example, log-based CRC in Postgres does not present data in a table; instead, it uses a stream that may be queried like a table, using a SQL-like syntax. The state data may need to be persisted in a separate table, or alternatively, the tailer may limit itself to peeking into the stream, and only retrieve records when acknowledgements have been received. This makes it harder to implement pipelining, as in-flight records will appear when peeking into a CDC stream.
- Trigger-based CDC is easier to work with, as it writes to a dedicated, long-lived table. On the flip side, trigger-based CDC is inefficient, impacting the write transaction.
- Log-based CDC is not enabled at the database, but at the master node. (This is the case for Postgres; others may differ.) If a failover occurs, logical replication must be explicitly re-enabled on the new active node. The failover event may not be apparent to the application, particularly when using a Cloud-based service. Trigger-based CDC does not suffer from this approach, but is markedly less efficient than the log-based model.
By all accounts, when it comes to harvest an outbox, scraping is doing it the 'hard way'. This path was chosen because, while it is harder on goharvest
, it is easier on the user — create the outbox table and fire away. No portability issues and no menial DBA work. Also, MP/R does not impact update performance for applications logging to the outbox — there are no triggers on the commit path. Finally, while goharvest
is presently limited to Postgres, there is a strong desire to support other relational databases, which makes proprietary CDC solutions less tractable.
Anecdotally, most scrapers keep track of their offset in the outbox — the sequential ID of the last row returned by a polling query. The next query uses this as the offset. Typically, this is implemented using Kafka Connect, which takes care of contention, availability and state management.
This approach is prone to skipping over records as a result of out-of-order transactions. Often, this is not caught in development, where the traffic volume is insufficient to induce this condition. Fundamentally, Kafka Connect is designed for reading monotonic CDC audit tables, not for scraping an outbox. This is not common knowledge; there is a low-key discussion on Stack Overflow that touches on this. Nonetheless, judging by the Internet chatter, this appears to be the most common initial implementation. (At least until the problems begin to emerge.)
Once the problem with the above approach is identified, the go-to fix is typically to allow for a grace period for latent records, rather than relying solely on sequential IDs. Effectively, each subsequent query trails the time of the oldest record returned by the last query by a fixed period — in the order of seconds or minutes. The challenge here is to find a suitable grace period, such that it is generous enough to account for the longest-running transactions, yet not so generous that it blows out the query. Taking this route implies dropping Kafka Connect in favour of a custom scraper, which means all of a sudden having to deal with contention and availability. This means using an atomic broadcast protocol or a distributed lock service built on top of one, or alternatively, using the database itself as a centralised lock store. NELI is another option.
A variation of the two approaches above, this tactic recognises that gaps in ID sequences are possible, but are relatively infrequent — caused by rollbacks or otherwise failed writes.
Instead of querying by time, this approach starts off by optimistically querying by a starting ID, advancing linearly for as long as the returned IDs are densely populated. If sparsity is detected, the model pins the starting offset to the point of the observed discontinuity. Naturally, it cannot wait forever, so a grace period is accorded – after which the offset starts advancing again.
The challenge with this approach is the complexity of the implementation, being bimodal in nature — requiring special treatment of gaps. It can also be stalled by discontinuities that take a long time to resolve (such as long-running transactions) or are never resolved (for example, rollbacks). When these events occur, the harvester will 'stutter', creating undue lag in outbox message delivery.
The non-monotonic nature of outbox queries is owed largely to transaction isolation. Remove isolation on the harvester (by setting it connection to Read Uncommitted), and transaction side-effects become immediately observable.
This approach is unsuitable for several reasons:
- It does not support rollbacks. Once the effects of a transaction have been observed, they cannot be revoked. Even if the outbox write is the last statement in a transaction, there is still a remote likelihood of a rollback in databases that support MVCC, such as Oracle and Postgres.
- While dirty reads might appear to be monotonic, there is (at least) one case where they are not. Specifically, the atomic allocation of numbers from a sequence is independent of that number being subsequently used. It is possible for T0 to acquire a smaller number that T1, but for T1 to use its acquired number to insert a row first, thereby appearing sooner than T0 to an observer and leaving a gap in the sequence. T0 may appear a split second later to fill the gap, at which point the query may have already advanced past that point.
- Postgres does not support Read Uncommitted; the lowest isolation level is Repeatable Read.
Database-assigned timestamps are used to efficiently identify records that have been changed. Because time only runs forward, especially when it is sampled from a centralised source, there is a misconception that time-based predicates are inherently monotonic. They are not, for the following reasons:
- The monotonicity of the underlying time source depends on a range of factors that are outside of our control, such as the operating system, the presence of a hypervisor, the use of NTP, and the database implementation itself. As it stands, Postgres uses the real-time clock today, which is not monotonic. (It may appear to run backwards under some circumstances.)
- Under Read Committed isolation, transaction side-effects may be observed out of order.
- Postgres serves each database connection out of a dedicated POSIX thread. When two queries are issued on different connections, the two may return mutually-unordered timestamps, due to the behaviour of
clock_gettime(CLOCK_REALTIME,...)
across multiple OS-level threads.
An alternative approach to time-based querying is to drop the pipeline and process records one batch at a time. In other words, read a batch of records, write to Kafka, await confirmations, delete the records (or mark them as done), then repeat — always reading from the head (not dissimilar to MP/R). A straightforward model that is bulletproof, owing largely to the simplicity of not having to manage individual record state — we implicitly know which records are in flight because we are blocked on them.
While state management is simplified, the process still needs to account for contention and availability. A notable drawback of this approach is its blocking nature. While a batch is being processed and the scraper waits for acknowledgements from the brokers, pending outbox records are held up. The lack of pipelining means it can only do one thing — either query and publish records, await confirmations, or update its state.
One of the most elaborate solutions for monotonic table reads was presented by Leigh Brenecki in Efficiently fetching updated rows with Postgres. In summary, it injects a monotonically increasing internal transaction ID into each record, using a trigger function. Subsequent SELECT
queries locate the oldest transaction that is currently active and use it as a predicate for filtering records. In this manner, the query acts like a sliding window — advancing only when the oldest transaction commits or rolls back. This technique is Postgres-specific; other databases may permit similar queries, but due to their proprietary nature, each is likely to differ substantially.
Briefly, there are several challenges with Brenecki's approach:
- Triggers impact insert/update performance.
- Assuming a
LIMIT
clause is in place (as it should be), a long-running transaction will stall the value returned bytxid_snapshot_xmin(txid_current_snapshot())
. The harvester will not see records beyond its query window until the late-running transaction completes. In the meantime, there may be multiple short-running transactions that would have deposited records into the outbox; they will not be processed until the window advances. - Postgres transaction IDs are 32-bit counters padded to a 64-bit integer. They are recycled after approximately 4B transactions, causing a discontinuity in the number sequence.
There is a known variation of this model that uses commit timestamps instead of transaction IDs. It has similar limitations, with the exception of the txid
recycling issue.
This model applies pipelining to the previous model. Records may be read and published independently of receiving confirmations. The issue here is that the poll query returns both pending and in-flight records. We need to discern among them; otherwise, records will be published in duplicate. So the scraper keeps track of all previously read records and filters them out of the query's result set. Because queries can get quite large, this model significantly limits the number of records that may be in-flight, making it harder to scale under high-latency broker connections, where the number of in-flight records must increase to allow for reasonable throughput.
Of the approaches outlined, application-level diff is the closest relative of MP/R. The main difference is that MP/R does not track in-flight records in memory — it only keeps their count for throttling purposes, which does not functionally affect the algorithm. The in-flight tracking is accomplished by atomically 'marking' the leader_id
on each record as soon as it has been read, which doubles as a predicate for the subsequent query. Marking is the database equivalent of the compare-and-swap instruction, used to implement synchronization primitives in multithreaded systems. A mark only returns those records that are either pending, or have been left in an indeterminate state by a failed harvester; having returned those rows, it atomically excludes these records from subsequent queries, for as long as the same leader ID is maintained.
It may appear that MP/R should not need to rely on the atomicity of the marking process, as marking is performed on a dedicated thread that is uncontended — owing to the overarching leader election process curated by NELI. And that is, indeed, the case. However, if the query were not atomic, we would need a way knowing which records were marked, as a plain UPDATE
statement only returns the number of affected rows. In the absence of atomicity, the easiest way is to assign a unique identifier as part of each UPDATE
, then perform a follow-up SELECT
with the same identifier as its WHERE
predicate.
In addition to acting as a synchronization primitive, the atomicity property is useful in eliminating multiple round-trips and index traversals. A single UPDATE...RETURNING
clause employs one index traversal to locate the candidate records, update them, and return them in a result set.
MP/R requires one read and two writes for each outbox record. By comparison, application-level diff gets away with one read and one write. On the flip side, the read queries are larger. Because a query returns both pending and in-flight records, the limit of the query is the maximum number of in-flight records. By contrast, MP/R supports small, incremental queries that are uncorrelated to the in-flight limit. To appreciate the difference, consider a scenario where the harvester is configured with an in-flight limit of 1,000 records, and is currently sitting on a full backlog. It may safely stop polling to save database I/O. As soon as an acknowledgement arrives for the oldest record, the backlog drops to 999 records and the record is struck out. It can now poll the database again. But here is the snag: it must query for another 1,000 records despite only having one transmit slot available — it doesn't have a way of querying for just the new records. It cannot simply use the offset of the most recent enqueued record as the starting point, owing to the non-monotonic nature of the outbox.
The descriptions above are just some of the tactics that have been employed by the industry to scrape database tables, not just for implementing a transactional outbox, but for general-purpose replication scenarios. Likely, each has several variations, plus a host of proprietary tactics for specific database servers. This is not a trivial problem, and will likely require several attempts to get right.
The main objective behind goharvest
was to create a working model that is straightforward to use and reasonably performant. Unencumbered of proprietary techniques, it may be adapted to other database technologies and partially ordered event streaming brokers in the future.