Notes on the “ZooKeeper” paper

ZooKeeper is a service for coordinating processes of distributed applications. The common coordination use cases are configuration, group membership and leader election. One approach to address the coordination problem is to develop services for each coordination needs. However, ZooKeeper tackles the problem by providing a coordination kernel, based on which new primitives can be built to support high level use cases like configuration.

As a service, Zookeeper comprises an ensemble of servers, with one leader and multiple followers. Zookeeper data are replicated on these servers to achieve high availability and performance. The read requests are processed locally for low latency. The write requests are handled by the leader and propagated to followers. FIFO order of client requests is guaranteed to meet the consistency requirement.

ZooKeeper Services

  • client: the user of ZooKeeper service.
  • service: a process that provides the ZooKeeper service.
  • znode: an in-memory data node in the ZooKeeper.
  • data tree: a hierarchical namespace organizing the znodes.
  • update and write: any operation that modifies the state of the data tree.
  • session: clients establish a session when they connect to ZooKeeper servers and obtain a session handle through which they issue the requests.

Data Model

/
|_ /app1
|_ /app1/p_1
|_ /app1/p_2
|_ /app1/p_3
|_ /app2

A znode can be either

  • Regular: persistent until the client deletes it.
  • Ephemeral: auto deleted when the session terminates.

Znode can be created with theSEQUENTIAL flag. With this flag, the znode will be assigned an auto-increased counter as the sequential value.

The data model is a file system that only supports fully data reads and writes. It is not designed as a general data storage. Instead, the data tree abstract the client applications, storing metadata for coordination purpose. In the example above, /app1/p1 means the metadata of application 1 process 1.

API

delete(String path, int version): Delete the node with the given path if the node’s version matches version.

exists(String path, boolean watch): Return the stat of the node of the given path.

getData(String path, boolean watch, Stat stat): Return the data and the stat of the node of the given path.

setData(String path, byte[] data, int version): Set the data for the node of the given path if such a node exists and the given version matches the version of the node (if the given version is -1, it matches any node’s versions).

getChildren(String path, boolean watch): Return the list of the children of the node of the given path.

sync(String path, AsyncCallback.VoidCallback cb, Object ctx): Asynchronous sync.

The version is used to avoid race condition.

The watch allows clients to be notified when the znode is updated. The notification only triggers once.

Each operation has a synchronous version and an asynchronous version. For asynchronous version, one client can have multiple outstanding ZooKeeper operations. The client callbacks are guaranteed to invoke in order.

See the complete list of methods in https://zookeeper.apache.org/doc/r3.4.6/api/org/apache/zookeeper/ZooKeeper.html and examples of calling the APIs in https://www.tutorialspoint.com/zookeeper/zookeeper_api.htm.

ZooKeeper guarantees

  1. FIFO client order: all requests from a given client are executed in the order that they were sent by the client.

Primitives on ZooKeeper

Configuration Management

Group Membership

Locks

Get the lock

n = create(zl + “/lock-”, EPHEMERAL | SEQUENTIAL)
do {
C = getChildren(l, false)
if n is lowest znode in C, exit
p = znode in C ordered just before n
} while exists(p, true) wait for watch event

Release the lock

delete(n)

How does this work?

A list of znodes /lock-000000001, /lock-000000002, /lock-000000003, … are created for each clients. The client with the lowest znode id /lock-000000001 gets the lock, while others wait for znode, which is exactly before the client’s znode, to be deleted. For example, when /lock-000000002 is deleted, only /lock-000000003 gets notified and access the lock. This avoid the herd effects, where many clients compete for one just released lock. The sequential values also helps debug the lock process.

One step further, read/write locks can be implemented by znode

write Lock

n = create(l + “/write-”, EPHEMERAL | SEQUENTIAL)
do {
C = getChildren(l, false)
if n is lowest znode in C, exit
p = znode in C ordered just before n
} while exist(p, true) wait for event

read lock

n = create(l + “/read-”, EPHEMERAL | SEQUENTIAL)
C = getChildren(l, false)
do {if no write znodes lower than n in C, exit
p = write znode in C ordered just before n
} while exist(p, true) wait for event

ZooKeeper Applications

It is also used in other apache projects such as Apache HBase, Apache Hadoop and Apache Solr.

Implementation

What are the requirements?

  • Consistency
  • Recoverability

High availability and performance

The high performance means high throughput and low latency. For reading heavy tasks, the read requests are handled locally without coordination and the data is in memory, so the latency is primarily the network latency. The high throughput can be achieved with more servers. The benchmark shows the number of read requests that can processed scales linearly with the number of servers.

For write requests, ZooKeeper leverage Linearizable writes and FIFO client order guarantees. The write requests are sent asynchronously and pipelined in the servers, making the average latency shorten a lot.

Consistency

A leader is elected at the beginning of the ZooKeeper service. All servers create a sequential ephemeral znode and the one with lowest sequential value is elected as the leader. Other servers watch the znode exactly before its znode. Once the leader fails, the next in line server will be notified and take the leadership.

When client sends a write request,

  1. The server will forward the request to the leader.
  2. The leader translates write requests into transactions, which are idempotent and capture the new state of the system with the help of znode version. For example, <SetDataTXN, /foo, f2, 2> is only executed when the future znode version is 2. Multiple execution of this transaction won’t impact the correctness of the znode data as the version won’t much.
  3. The leader broadcasts the transactions to followers with Zab protocol. Zab works when aa majority of the servers are correct and guarantees changes broadcast by a leader are delivered in the order they were sent and all changes from previous leaders are delivered to an established leader before it broadcasts its own changes.
  4. The followers receive the transactions. Before update the in-memory database, they first log the transactions to disk. This is used for failure recovery.
Workflow of write requests

Recoverability

Client — Server interaction

A tiny staleness is tolerable. If the client really needs the latest data, it can issue the sync request followed by the read request. The FIFO order guarantees the read will get the data after all outstanding transactions are committed.

Another problem is what if the client connects to a new server or a server just recovered from failure?

ZooKeeper assigns each read request a zxid, which corresponds the last transaction seen by the server. The client will send its last seen zxid to server and if the server’s last zxid is smaller than the client’s, it has a less recent view of the client and will reject the session. The client will connect to another server. ZooKeeper guarantees client can find one because the zxid the client sees are always replicated to a majority of the ZooKeeper servers.

Reference

https://www.tutorialspoint.com/zookeeper/index.htm

Software Engineer@Facebook

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store