Kafka is a distributed messaging system that developed for collecting and delivering high volumes of log data with low latency.
Basic Concepts
A stream of message of a particular type is defined by a topic. A producer can publish messages to a topic. The pushed messages are then stored at a set of servers called brokers.
Kafka uses pull model instead of push, in which the broker forwards data to consumers, since each consumer can retrieve the messages at themaximum rate it can sustain and avoid being flooded by messages pushed faster than it can handle.
To subscribe a topic, a consumer first creates one or more message streams for the topic. The messages published to that topic will be evenly distributed into these sub-streams. The message stream iterator never terminates. If there are currently no more messages to consume, the iterator blocks until new messages are published to the topic.
Kafaka supports both the point-to-point delivery modelin which multiple consumers jointly consume a single copy of all messages in a topic, as well as the pub/sub model in which multiple consumers each retrieve its own copy of a topic.
To balance load, a topic is divided into multiple partitions and each broker stores one or more of those partitions. Multiple producers and consumers can publish and retrieve messages at the same time.
Design Principles
Kafka has a very simple storage layout. Each partition of a topic corresponds to a logical log. Physically, a log is implemented as a set of segment files of approximately the same size (e.g. 1GB). Kafka flushes the segment file to disk only after a configurable number of messages have been published or a certain amount of time has elapsed, A message is only exposed to the consumer after it is flushed.
Each message in Kafka is addressed by its logical offset in the log. Thus the message ids are increasing but not consecutive. A consumer always consume messages from particular partition sequentially. Under the cover, the consumer is issuing asynchronous pull requests to the broker to have a buffer of data ready for the application to consume. Each pull request contains the offset of the message from which the consumption begins and an acceptable number of bytes to fetch. Each broker keeps in memory a sorted list of offsets as index, including the offset of the first message in every segment file.
Kafka avoid explicitly caching messages in memory at the Kafka layer. Instead, it relys on the underlying file system page cache which benefits the sequential access, normal operating system caching heuristics are very effective.
Kafka exploit the sendfile API to avoid extra copy from page cache to application buffer and kernel buffer, and efficiently deliver bytes in a log segment file from a broker to a consumer.
In Kafka, the information about how much each consumer has consumed is not maintained by the broker, but by the consumer itself. This design decision results in a stateless broker. However, this makes it tricky to delete a message since a broker doesn’t know whether all subscribers have consumed the message. Kafka solves this problem by using a simple time-based SLA for the retention policy.
One important benefit of this design is that a consumer can deliberately rewind back to an old offset and re-consume data. One example is that the consumer application might flush the data to disk periodically. If the consumer crashes, the unflushed data is lost. In this case, the consuemr can checkpoint the smallest offset of the unflushed messages and re-consume from that offset when it is restarted. Rewind is much easier to support in pull model than push model.
Producer & Consumer Coordination
Each producer can publish a message to either a random selected partition or a partition semantically determined by a partitioning key and a partitioning fucntion.
Each consumer group consists of one or more consumers that jointly consume a set of subscribed topics. Different consumer groups each independently consume the full set of subscribed messages and no coordination is needed across consumer groups.
To make the coordination easier, the first decision is to make a partition within a topic the smallest unit of parallelism. This means at any given time, all messages from one partition are consumed only by a single consumer within each consumer group. The second decision is to not have a master node, but instead let consumers coordinate among themselves in a decentralized fashion since adding master can complicated the system, for example, master failure needs to be handled.
Thus Kafka uses ZooKeeper for coordinating following tasks:
- Detecting the addition and the removal of brokers and consumers.
- Triggering a rebalance process in each consumer when the above events happen.
- Maintaining the consumption relationship and keeping track of the consumed offset of each partition (per consumer group).
Specifically, when each broker or consumer starts up, it stores its information in a broker or consumr registry in ZooKeeper. The broker registry contains the broker’s host name and port, and the set of topics and partitions it stored on it. The consumer registry includes the consumer group to which a consumer belongs and the set of topics that it subscribes to. Each consumer group is associated with an ownership registry and an offset registry in ZooKeeper. The ownership registry has one path for every subscribed partition and the consumer which currently owns this partition. The offset registry stores last consumed offset of the messages in the partition.
The ZooKeeper node created for broker registry, consumer registry and ownership registry is ephemeral, and persistent for the offset registry. If a broker fails, all partition on it are automatically removed from the broker registry. Same as consumer, which causes it to lose its entry in the consumer registry and all partitions it owns in ownership registry. Each consumer registers a ZooKeeper watcher on both broker and consumer registry, and will be notified when ther is a broker set or consumer group change. A rebalance process will be initialized to determine the new subset of partition each consumer should consume from when receiving notification, please refer to the orginal article for more details.
The notification may come at slightly different times at the consuemrs. So it is possible that one consumer tries to take ownership of a partition stilled owned by another consumer. When this happens, the first consumer simply release all the partitions that it currently owns, wait a bit and retries the reblance process.
Guarantees
In general, Kafka only guarantees at-least-once delivery. Most of time, the message is delivered exactly once to each consumer group. In case when a consumer process crashes without a clean shutdown, after it restarts, it might get duplicates after the last offset successfully commited to ZooKeeper.
Kafka guarantees that messages from a single partition are delivered to a consumer in order. However, there is no guarantee on the ordering of messages coming from differernt partitions.
To avoid log corruption, Kafka stores CRC (Cyclic Redundancy Check) for each message in the log.
The paper doesn’t discuss about replications in Kafka, but it should already be supported.