Notes on the “ZooKeeper” paper

ZooKeeper is a service for coordinating processes of distributed applications. The common coordination use cases are configuration, group membership and leader election. One approach to address the coordination problem is to develop services for each coordination needs. However, ZooKeeper tackles the problem by providing a coordination kernel, based on which new primitives can be built to support high level use cases like configuration.

As a service, Zookeeper comprises an ensemble of servers, with one leader and multiple followers. Zookeeper data are replicated on these servers to achieve high availability and performance. The read requests are processed locally for low latency. The write requests are handled by the leader and propagated to followers. FIFO order of client requests is guaranteed to meet the consistency requirement.

ZooKeeper Services

Below are terminology of ZooKeeper

  • client: the user of ZooKeeper service.
  • service: a process that provides the ZooKeeper service.
  • znode: an in-memory data node in the ZooKeeper.
  • data tree: a hierarchical namespace organizing the znodes.
  • update and write: any operation that modifies the state of the data tree.
  • session: clients establish a session when they connect to ZooKeeper servers and obtain a session handle through which they issue the requests.

Data Model

Natural the znode and data tree can be modeled as a file system. Use UNIX notation, the root of data tree is znode / and other znodes are descends of /. Here is an example

/
|_ /app1
|_ /app1/p_1
|_ /app1/p_2
|_ /app1/p_3
|_ /app2

A znode can be either

  • Regular: persistent until the client deletes it.
  • Ephemeral: auto deleted when the session terminates.

Znode can be created with theSEQUENTIAL flag. With this flag, the znode will be assigned an auto-increased counter as the sequential value.

The data model is a file system that only supports fully data reads and writes. It is not designed as a general data storage. Instead, the data tree abstract the client applications, storing metadata for coordination purpose. In the example above, /app1/p1 means the metadata of application 1 process 1.

API

create(String path, byte[] data, List<ACL> acl, CreateMode createMode): Create a node with path store data in it. acl is Access control list and CreateMode determines the type of znode and SEQUENTIAL flag.

delete(String path, int version): Delete the node with the given path if the node’s version matches version.

exists(String path, boolean watch): Return the stat of the node of the given path.

getData(String path, boolean watch, Stat stat): Return the data and the stat of the node of the given path.

setData(String path, byte[] data, int version): Set the data for the node of the given path if such a node exists and the given version matches the version of the node (if the given version is -1, it matches any node’s versions).

getChildren(String path, boolean watch): Return the list of the children of the node of the given path.

sync(String path, AsyncCallback.VoidCallback cb, Object ctx): Asynchronous sync.

The version is used to avoid race condition.

The watch allows clients to be notified when the znode is updated. The notification only triggers once.

Each operation has a synchronous version and an asynchronous version. For asynchronous version, one client can have multiple outstanding ZooKeeper operations. The client callbacks are guaranteed to invoke in order.

See the complete list of methods in https://zookeeper.apache.org/doc/r3.4.6/api/org/apache/zookeeper/ZooKeeper.html and examples of calling the APIs in https://www.tutorialspoint.com/zookeeper/zookeeper_api.htm.

ZooKeeper guarantees

  1. Linearizable writes: all requests that update the state of ZooKeeper are serializable and respect precedence.
  2. FIFO client order: all requests from a given client are executed in the order that they were sent by the client.

Primitives on ZooKeeper

How to implement powerful primitives with ZooKeeper API?

Configuration Management

Assume we have a process read the configuration from znode zc. The process reads the data by calling getData(path_zc, true). true means watch is set. zc is updated by configuration push service. If zc is updated, the process is notified and issue an getData(path_zc, true) again.

Group Membership

The group membership is built with the ephemeral node. Assume there is a znode zg representing the group. A process can register itself in the group by creating an ephemeral znode under zg. If the process name is unique, the name is the znode id. Otherwise the SEQUENTIAL flag is set and the sequential value + process name is used as the id. The list of processes in the group can be gotten by getChildren. The process unregisters it from the group by deleting its znode under zg. If the process fails, the znode is automatically removed.

Locks

Assume we have znode zl, the lock znode, and there is a list of clients trying to access the locked section. The lock can be implemented by a list of ephemeral and sequential znodes.

Get the lock

n = create(zl + “/lock-”, EPHEMERAL | SEQUENTIAL)
do {
C = getChildren(l, false)
if n is lowest znode in C, exit
p = znode in C ordered just before n
} while exists(p, true) wait for watch event

Release the lock

delete(n)

How does this work?

A list of znodes /lock-000000001, /lock-000000002, /lock-000000003, … are created for each clients. The client with the lowest znode id /lock-000000001 gets the lock, while others wait for znode, which is exactly before the client’s znode, to be deleted. For example, when /lock-000000002 is deleted, only /lock-000000003 gets notified and access the lock. This avoid the herd effects, where many clients compete for one just released lock. The sequential values also helps debug the lock process.

One step further, read/write locks can be implemented by znode

write Lock

n = create(l + “/write-”, EPHEMERAL | SEQUENTIAL)
do {
C = getChildren(l, false)
if n is lowest znode in C, exit
p = znode in C ordered just before n
} while exist(p, true) wait for event

read lock

n = create(l + “/read-”, EPHEMERAL | SEQUENTIAL)
C = getChildren(l, false)
do {if no write znodes lower than n in C, exit
p = write znode in C ordered just before n
} while exist(p, true) wait for event

ZooKeeper Applications

ZooKeeper is widely used. In Yahoo, it is used in Fetching Service, Katta, and Message Broker for leading election, group membership and configuration management.

It is also used in other apache projects such as Apache HBase, Apache Hadoop and Apache Solr.

Implementation

What are the requirements?

  • High availability and performance
  • Consistency
  • Recoverability

High availability and performance

The high availability is achieved by replicating ZooKeeper database on each server. The database is in-memory and contains the entire data tree. Each znode has a maximum size (1M by default).

The high performance means high throughput and low latency. For reading heavy tasks, the read requests are handled locally without coordination and the data is in memory, so the latency is primarily the network latency. The high throughput can be achieved with more servers. The benchmark shows the number of read requests that can processed scales linearly with the number of servers.

For write requests, ZooKeeper leverage Linearizable writes and FIFO client order guarantees. The write requests are sent asynchronously and pipelined in the servers, making the average latency shorten a lot.

Consistency

For requests requiring coordination like write requests. ZooKeeper need to make sure all replica are consistent. ZooKeeper achieves this with atomic broadcast.

A leader is elected at the beginning of the ZooKeeper service. All servers create a sequential ephemeral znode and the one with lowest sequential value is elected as the leader. Other servers watch the znode exactly before its znode. Once the leader fails, the next in line server will be notified and take the leadership.

When client sends a write request,

  1. The server will forward the request to the leader.
  2. The leader translates write requests into transactions, which are idempotent and capture the new state of the system with the help of znode version. For example, <SetDataTXN, /foo, f2, 2> is only executed when the future znode version is 2. Multiple execution of this transaction won’t impact the correctness of the znode data as the version won’t much.
  3. The leader broadcasts the transactions to followers with Zab protocol. Zab works when aa majority of the servers are correct and guarantees changes broadcast by a leader are delivered in the order they were sent and all changes from previous leaders are delivered to an established leader before it broadcasts its own changes.
  4. The followers receive the transactions. Before update the in-memory database, they first log the transactions to disk. This is used for failure recovery.
Workflow of write requests

Recoverability

If a server fails, it needs to replay all delivered transactions once it recovers. The transactions are logged in disk as mentioned before. However, replay all transactions will take too much time. ZooKeeper periodically builds snapshots so the server only needs to replay the transactions after the timestamp of snapshot. The snapshot is fuzzy. The fuzzy means ZooKeeper doesn’t lock the data tree when building the snapshot so different znodes in the snapshots may have different transactions committed. As transactions are idempotent, it is okay to apply transactions twice on the fuzzy snapshots.

Client — Server interaction

Sometimes the client reads stale values from the server. For example, the client read the data while there are on-going write requests. How to handle this?

A tiny staleness is tolerable. If the client really needs the latest data, it can issue the sync request followed by the read request. The FIFO order guarantees the read will get the data after all outstanding transactions are committed.

Another problem is what if the client connects to a new server or a server just recovered from failure?

ZooKeeper assigns each read request a zxid, which corresponds the last transaction seen by the server. The client will send its last seen zxid to server and if the server’s last zxid is smaller than the client’s, it has a less recent view of the client and will reject the session. The client will connect to another server. ZooKeeper guarantees client can find one because the zxid the client sees are always replicated to a majority of the ZooKeeper servers.

Reference

ZooKeeper: Wait-free coordination for Internet-scale systems

https://www.tutorialspoint.com/zookeeper/index.htm

Software Engineer@Facebook