Skip to content
Emil Koutanov edited this page Apr 26, 2020 · 2 revisions

Q. Why piggy-back on Kafka for leader election?

The piggy-backing approach using simplified NELI is one option. Another approach is the use of an external Group Management Service (GMS) or, more likely, a Distributed Lock Manager (DLM) which is built on top of a GMS specifically for arbitrating leadership among contending processes.

A DLM/GMS, such as Consul, Etcd, Chubby or ZooKeeper, is an appropriate choice in many cases. A point raised by the NELI paper (and the main reason for its existence) is that infrastructure may not be readily available to provide this capability. Further to that point, someone needs to configure and maintain this infrastructure, and ensure its continuous availability — otherwise it becomes a point of failure in itself. This problem is exacerbated in a µ-services architecture, where it is common-practice for services to own their dependencies. Should DLMs be classified as service-specific dependencies, or should they be shared? Either approach has its downsides.

The NELI approach is particularly attractive where the middleware used by applications for routine operation can also be employed in a secondary role for deriving leader state. There are no additional dependencies to maintain; everything one needs is at hand.

An alternate approach that does not require external dependencies is to use the database (that hosts the outbox table) to arbitrate leadership. This can be achieved using a simple, timestamp-based lease protocol. Contending processes collectively attempt to set a common field in a table, using transaction scope to ensure that only one of the contending processes succeeds. Other processes will back off, waiting for the lease to expire. If the leader is alive, it can renew the lease; otherwise, it will fall onto one of the competing processes to take over the lease.

The lease algorithm is a simple and robust way of achieving consensus within a process group, using a centralised service that is already at their disposal. One of its drawbacks is that contention requires continuous querying of the database by all processes. This querying is in addition to the harvesting of the outbox table. As the group grows in size, the load on the database increases, making the lease model less scalable. Another drawback is that the best-case failure detection period equates to the lease interval. In other words, if a process acquires a lease and fails, no other process can step in until the lease period elapses.

By comparison, NELI does not place additional load on the database. Leader election is done via Kafka; once a leader is elected, only that process accesses the database. There are no assumptions made as to the scalability needs of applications utilising goharvest — it is designed to work equally well for tiny deployments, as well as for massive clusters — provided Kafka is appropriately sized. (A relational database is not designed to scale horizontally, and must be viewed as a scarce resource. By comparison, Kafka was designed for horizontal scalability from the outset.)

Failure detection is also more responsive. If a process fails or experiences a network partition, the group coordinator (a component within Kafka) will detect the absence of heartbeats and will initiate a group rebalance. This is configured by the session.timeout.ms consumer property, which is ten seconds by default. If the process departs gracefully, closing its Kafka connections in an orderly manner, the partition rebalance will occur sooner.

Q: Why should I change the default leader topic and group ID?

The Config.LeaderTopic and Config.LeaderGroupID must be shared among contending application instances and must also be unique outside that group. As such, goharvest uses the name of the application's executable binary, which works out reasonably well most of the time, provided applications are well named; for example, bin/billing-api rather than bin/api.

The default value unravels when using a debugger such as Delve, which generates a file named __debug_bin. Unless Config.LeaderGroupID is set, anyone who is debugging their Go application will end up sharing the same group ID.

Q: Can Kubernetes be used to solve the contention and availability problems?

As a control system for arbitrating service resources and ensuring their continued availability, Kubernetes is indispensable. However, it doesn't offer a robust solution for ensuring process exclusivity — this is not its purpose. Limiting the maximum number of replicas to 1 does not ensure that at most one replica is live — a new pod may be launched as part of a deployment, while taking over the traffic from an existing one — running side by side. Also, if one of the containers in a pod fails a health check, it will be summarily replaced; however, during the transition period, the failed pod may be running alongside its replacement.

Q: Why throttle the total number of in-flight records?

Limiting the number of in-flight records minimises the memory pressure on the client (as records are queued in an in-memory buffer before being forwarded to the broker). This method of flow control transfers the buffering responsibility to the database, reducing the memory footprint of the goharvest client.

Q: Why throttle in-flight records by key?

Messages are generally published in batches for efficiency, which is particularly crucial in high-latency networks (where Kafka is not collocated with the application deployment). One of the challenges of enqueuing multiple messages relates to the prospect of observing intermittent errors, where those errors may affect a part of the transmission. (The producer client splits the backlog of queued messages into multiple batches under the hood; some batches may fail in isolation, while others may succeed.) Under the naive model of sending all marked messages, causality is only respected when there are no I/O errors during publishing. Given causally related records R0, R1 and R2, and the asynchronous nature of publishing messages, it is conceivable that R0 might succeed, R1 might fail, while R2 might again succeed. The harvester would detect the error for R1, by which time it may be too late — R2 may already have been persisted on the broker. Retrying R1 (assuming it will eventually succeed) will result in records appearing out of order; specifically, R0, R2, R1.

A related condition is where an existing leader process fails, having a batch of messages in flight. (Messages sent to the broker, where an acknowledgement is pending.) The new leader will re-mark the in-flight records, scavenging the workload of the outgoing leader. It will then send records again, being unaware of their previous status. This may result not only in one-off record duplication, but duplication of a contiguous sequence of records; for example, R0, R1, R2, R0, R1, R2. The mid-point of the sequence, where R0 follows R2, is unacceptable — it constitutes order reversal. Conversely, one-off duplication, such as the sequence R0, R1, R1, R2 is acceptable; while suboptimal, duplicate message delivery is an inherent characteristic of messaging systems that are based around at-least-once delivery patterns.

MP/R deals with this problem by constraining not only the total number of in-flight records, but the number of records in flight for any given key. Because causality in Kafka is captured via a record's key, ensuring that at most one message is in an indeterminate state solves both the issue of I/O errors and abrupt leadership changes.

In the absence of the per-key scoreboard, this would have to be addressed by changing the normally asynchronous nature of the mark-purge/reset pipeline to behave fully synchronously. This would involve setting Config.MaxInFlightRecords to 1, effectively annulling the MP/R pipeline. The throughput would drop significantly and, crucially, becomes tied to the round-trip latency of a single Kafka write call. The performance would correspondingly drop by several orders of magnitude. By utilising a scoreboard, MP/R maintains high throughput, provided the keys are reasonably distributed. (Conversely, if records are clustered around a small set of unique keys, the throughput will degrade considerably; but this would effect any algorithm that maintains partial order.)

Q. What's the performance like?

There are several factors affecting the performance profile:

  • The database query throughput and latency;
  • The Kafka publishing throughput and latency; and
  • The distribution of message keys.

Based on extensive benchmarking under a variety of network conditions with appropriate configuration tuning, the performance is in the order of thousands of records per second. goharvest generally operates in near-real-time, keeping up with the rate of outbox insertion under high load.

The main limitation is the one-at-a-time processing of unique record keys, which is essential for maintaining correct order among causally related records. To that point, the factors affecting MP/R the most are the publishing latency and the distribution of keys.

In its most basic form, the MP/R algorithm accommodates publishing parallelism for well-distributed records, but is throttled when records are clustered around a small number of unique keys. This is exacerbated by round-trip times (from publishing a message to receiving broker ACKs).

This is addressed by employing key-based sharding and concurrent producer clients to maximise throughput. This enables records to be processed independently, based on causal chains. For example, if two records are unrelated, they may be safely published in parallel. Records will still be throttled for identical keys (this cannot be avoided), but unrelated keys will flow unimpeded. When dealing with high publishing latency, the appropriate response is to increase the value of Config.Limits.SendConcurrency. The greater this number, the more producer clients are employed to push unrelated records out in parallel.

In addition, a three-stage pipeline allows for concurrent marking and publishing, whereby a batch of marked records is sent across several producer clients, throttled as necessary, while the next batch is marked in the background. This accounts for slow database connections. To vary the buffering between the marking and sending elements of the pipeline, set the Config.Limits.SendBuffer field.

The defaults for both Config.Limits.SendConcurrency and Config.Limits.SendBuffer are intentionally conservative, to avoid overwhelming Kafka with excessive connections and to prevent a buildup of records in memory. The intention is for these numbers to be wound up, rather than down.

Q: Why didn't you use a native Go Kafka library?

The issue isn't so much with whether a library is native of not, but with the implementation itself. There are two mainstream native Go libraries: Segment.io and Serama. Both are relatively new compared to the C/C++ and Java libraries. And both have similar limitations that would significantly complicate the harvester implementation:

  1. No rebalance notifications. The libraries support group-based assignment of partitions, but do not inform the application when partitions have been assigned or revoked. We piggy-back on Kafka for leader election, which is more difficult (but not impossible) to achieve when the client library is not forwarding partition assignment changes. (See the NELI paper on how this can be done in the absence of notifications.)
  2. Lack of fine-grained delivery reports. When using the asynchronous API, there is no notification of a successful or failed message delivery — which is needed for the purge/reset operation. The synchronous API can report errors, but only if messages are sent one at a time, which is inefficient, especially over high-latency links. When batching messages, it is not possible to determine which of the messages could not be sent.