Part 1 of this series covered the challenges Rubrik initially faced with its distributed database. Our applications had become quite complex to work around some of these challenges. Part 2 of this series covered how we chose a new distributed database (CockroachDB), and how we performed the migration.
Next, we will dive into some of the challenges we faced after migrating to CockroachDB and how we developed on top of CockroachDB to mitigate them.
Challenges with CockroachDB
The first challenge we had was the instability of CockroachDB in the presence of clock skew between nodes of a cluster. The context of how clocks are used in CockroachDB is explained below.
Serializability in CockroachDB
CockroachDB relies on timestamps for ordering transactions, but clocks (wall time) on different nodes can be slightly out of sync. Hence CockroachDB uses hybrid logical clocks (HLC) timestamps instead of just relying on wall time.
An HLC timestamp is a tuple (Wall Time, Logical Time). Wall Time is the highest physical time seen by a node so far, from local time or messages from other nodes, and Logical Time is a counter which increments to represent a future timestamp if the wall time remains unchanged.
The below example shows how HLC works. The blue boxes show how the wall time of different nodes progresses (wall time can have clock jumps) and the purple/yellow boxes show how HLC timestamps update. Arrows indicate RPC / message between nodes in which the HLC of the source node is given to the target node.
With HLC, the ordering of related events is easy to reason about since we can make sure the HLC timestamp always increases. Either the Wall Time component or the Logical time component always increases as events progress. As an example, using HLC, we can be sure that event A occurred before causally connected event B.
But the global ordering of unrelated events (Like B and C) is difficult to reason about.
For ordering unrelated events, CockroachDB expects clock skew of wall time between nodes to be < 500ms. With this guarantee, if two events are more than 500ms apart, it can guarantee the ordering of those two events. Ordering of events is ambiguous if their wall time differs by < 500ms. In the case where the ordering of two conflicting transactions is ambiguous, one of them is retried at a later point in time.
CockroachDB goes down when the above constraint is violated. Some of our physical clusters were prone to clock skew due to NTP servers misbehaving.
Clock skew was even more widespread in our virtual clusters as mentioned in part 1 of our blog series because…
They can have an interrupt backlog and some clocks work on interrupts. They have an artificial catch-up mechanism for these interrupts backlogs.
Hypervisors can adjust the clock of the virtual machines causing clock jumps in the virtual machines.
NTP helps correct clock jumps, but it does not prevent them. In some cases when the NTP server malfunctions, NTP ends up causing clock jumps, and hence CockroachDB crashes.
Solution - A custom time server - Kronos
To solve the clock skew problem, we built a distributed time service and plugged it into CockroachDB. We patched CockroachDB to use this time service to get wall time instead of using time served by the system clock. The properties of the time service are:
It has the same rate of flow as real-time.
It returns the same time on all nodes in a cluster.
It is immune to system time jumps.
It is always monotonic. Time can never go backward.
It is fault-tolerant; i.e. works in the presence of node failures (as long as a quorum - more than half of nodes are alive).
The time service runs inside CockroachDB on each node. It elects an "Oracle" within the cluster to drive the time selection. Selecting the Oracle is a consensus problem and we used a distributed state machine backed by Raft for this selection.
All nodes periodically sync time with the Oracle. The source code for our time service can be found here.
Kronos is also now being used for application features where we want to protect against clock jumps, like
Retaining snapshots for the correct duration of time
Comparison with Network Time Protocol daemon (NTPD)
The Network Time Protocol daemon (NTPD) program is an operating system daemon which sets and maintains the system time of day in synchronism with the Internet-standard time server.
Kronos and NTPD solve different problems. The objective of Kronos is to provide access to monotonic time which is guaranteed to be nearly the same on all nodes in the cluster at all times, irrespective of changes to system time. The key differences are:
Kronos provides an API to access time. It does not modify system time like NTPD.
Kronos time is monotonic. NTPD is not resilient to backward system time jumps.
Kronos time will not have large forward jumps (Go v1.9 onwards) since it uses CLOCK_REALTIME only for a reference point, and continues to use CLOCK_MONOTONIC to measure the progress of time. NTPD does not prevent large system time jumps (forward / backward), but corrects those jumps after they happen. This correction can take a while.
If a Kronos node finds itself partitioned away from the cluster, it will stop serving time requests. Hence if Kronos time is served by a node, it can be assumed to be accurate.
The same cannot be said for system time. It is (almost) always accessible and there is no guarantee that it would be nearly the same as other nodes of the cluster.
The time provided by Kronos may not be the same as the system time / UTC of any time server. It provides a guarantee of monotonicity and that it would always be nearly the same on all nodes of the cluster. Hence it should be used when the order of events is important, and not when the events happened.
Hence this should be used in conjunction with NTPD which keeps system time in sync with an Internet standard time server (but can be prone to time jumps).
Kronos does not attempt to measure RTT variance. It is not optimized for cross data-center deployments yet. NTPD can synchronize time across the data center.
Kronos is not a replacement for NTPD. It's recommended to use NTPD in conjunction with Kronos so that applications can choose which timestamp they want to use (system time vs Kronos time).
This section talks about how each property of the time service is achieved.
Time is always monotonic.
This is easy to ensure for a running service by storing the last served timestamp in memory and flat-lining if a backward jump is detected. Kronos time is exposed through a service. To ensure monotonic time when all kronos services restart, an upper bound to time is periodically persisted in the raft cluster (like current time +10s). This time cap is used to ensure monotonicity across restarts. The time service does not return a time greater than the time cap so that continuing time from time cap would not lead to a backward time jump.
It is immune to large jumps in system time. It has the same rate of flow as real-time.
Kronos uses CLOCK_MONOTONIC along with CLOCK_REALTIME. CLOCK_REALTIME represents the machine's best guess as to the current wall-clock, time-of-day time.
CLOCK_MONOTONIC represents the absolute elapsed wall-clock time since some arbitrary, fixed point in the past. It isn't affected by changes in the system time-of-day clock.
CLOCK_MONOTONIC is used to compute the progress of time from a fixed reference picked from CLOCK_REALTIME.
Time used in Kronos is given by:
local_time = (start_time + time.Since(start_time))
start_time is a fixed point of reference taken from CLOCK_REALTIME.
time.Since(start_time) is the monotonic time since start_time. Time used this way is immune to large system jumps and has the same rate of flow as in real-time. This is supported from Go 1.9 onwards.
Returns nearly the same time on all the nodes (within a few ms). It is fault-tolerant and works in the presence of node failures. It supports the addition or removal of nodes from the cluster.
Kronos elects a single "Oracle" from within the cluster. This Oracle is a node with which all nodes in the cluster periodically sync time. Periodic time synchronization suffices since each node is immune to large system time jumps. RPCs are used to synchronize time with the Oracle. Kronos time on each node is given as local_time + delta, where delta is the time difference between Kronos time of this node and Kronos time of the Oracle. The delta is updated during every synchronization with the Oracle. The Oracle is elected using raft (distributed consensus protocol). Raft provides a mechanism to manage a distributed state machine. Kronos stores the identity of the current Oracle and an upper bound to time (for safety reasons) in this state machine.
Raft is failure tolerant and can continue to work as long as a quorum of nodes is healthy. It also supports the addition and removal of nodes to the cluster.
Syncing time with the Oracle
Time synchronization with the Oracle happens over RPCs. Kronos has an upper limit on the RTT allowed for this RPC. If the RTT is too high, then that data point is ignored. The goal is to keep the Kronos time of the follower in sync with Kronos time of the Oracle.
Let's say a follower requests the Oracle for its Kronos time at ts and receives a response at the (where ts is the Kronos time of the follower), and the Oracle responds with to. Time needs to be adjusted taking RTT into account. The true Oracle time will lie between to and to + RTT. If it does not lie within this interval, delta is adjusted so that it falls into this interval (Kronos time on follower is given as local_time + delta with Oracle).
This ensures that time on all followers is nearly the same as the Oracle. Practically, we saw that most nodes on the cluster had a time within a few hundred microseconds of the Oracle. Integrating Kronos with CockroachDB solved the clock skew problem.
The next challenge we faced with CockroachDB was when frequent updates were made to large rows. Backpressure is a mechanism in CockroachDB to fail updates when certain limits are hit.
CockroachDB creates a copy of a row each time it is updated for MVCC, as explained previously. The stale copies are eventually garbage collected after a configurable expiry. However, if a row is updated frequently, the total size of the copies of the row can grow very large. Usually, CockroachDB splits a range when it grows large, but a single row together with its MVCC values cannot span two ranges. In this scenario, CockroachDB backpressures future updates to the impacted rows to ensure that ranges don't grow too large.
We did have cases where we frequently updated rows and hit backpressure in our applications. One example where we hit this is when a user-visible task runs in Rubrik, the task updates its progress in our database so that it can be shown in the UI. Sometimes this was done too frequently, causing backpressure.
We have also seen cases where CockroachDB garbage collection lags. Even though we set the TTL for garbage collection to 5 minutes, we saw some ranges in CockroachDB which were not garbage collected for 12+ hours when the range was hitting backpressure.
We analyzed cases where we hit backpressure and changed the application to avoid frequent updates to the same row. We also added internal retries in our Object-relational Mapper (ORM) when an update fails due to backpressure. These steps helped reduce the number of backpressure errors we faced in our application.
We also added patches to CockroachDB to more aggressively run garbage collection on ranges prone to backpressure. This mitigated the problem of lagging garbage collection.
The final challenge we wanted to solve was to improve the multi-node fault tolerance of CockroachDB.
CockroachDB provides fault tolerance by replicating data. The number of replicas is configurable. Data is available as long as a quorum (majority) of replicas are live. For example, if we use 3-way replication for a range, the cluster will be 1 node fault-tolerant, and 5-way replication would be 2 node fault-tolerant (not considering rack failure tolerance).
Let's say each CockroachDB range is 3-way replicated: the node-range allocation can be as follows. N1 to N6 are 6 nodes and R1 to R6 are 6 ranges, each being 3 way replicated.
Losing any one node would not lead to data loss since we would only lose 1 copy of any range.
But if we lose 2 nodes simultaneously, we would lose data (2 copies of a range) for some ranges. Like in the below example, we lose ranges R1 and R3.
The probability of multi-node failures in large clusters; for example, a 100 node cluster, is more than a 4 node cluster assuming each node can independently fail. Losing 2 nodes simultaneously in a 100 node cluster will lead to data loss. This is something we wanted to improve.
Solution - Copysets
To improve fault tolerance in large clusters, we can do smarter replica allocation. Copysets can be used to improve failure tolerance.
The set of nodes in the cluster is divided into (disjoint) 3-sets assuming each range is 3-way replicated. Each range is completely contained into one of these 3-sets. Non-disjoint 3-sets would lead to lower failure tolerance but faster recovery in the presence of failures since there are more nodes to up-replicate data from failed nodes.
An example of range allocation with disjoint 3-sets can look like the following.
Losing 1 node in each copyset would not lead to any data loss since each range would only lose one of its three copies. For example, the following 2 node failure scenarios would not lead to any data loss.
But, losing 2 nodes within the same copyset would still lead to data loss (in the below example, we lose ranges R1, R4, and R6).
Advantages of using copysets
Lower probability of data loss.
In the best case, lose 1/3rd of the cluster and still have no data loss. For example, we can lose 33 out of 100 nodes and still not have any data loss.
No extra cost at steady state.
The disadvantage of using copysets
Higher magnitude of data loss, if it occurs.
More expensive rebalancing of ranges; for example if a node goes down.
The node to copyset allocation can also be done taking rack fault tolerance into account. Note that failures of nodes in a rack can be correlated (a rack can lose power).
If we do node-copyset allocation like the below, we will be rack failure fault-tolerant. Losing a rack would mean we only lose one node per copyset. Each copyset below is color-coded.
Alternatives to Copysets
Instead of using copysets for improving failure tolerance for multi-node failures, a replication factor of 5 can be used instead of 3. This configuration will be tolerant to simultaneously losing up to 2 nodes. The drawback of using a high replication factor of 5 is
More space utilization (uses 67% more space than 3-way replication).
More resource utilization to keep 5 replicas in sync.
The configuration 2 node fault is the best case.
One thing to note is that using a higher replication factor can go hand in hand with using copysets. Copysets can be used with a replication factor of 5. We integrated copysets into CockroachDB's range rebalancing algorithm. We have the RFC published here and have it implemented in our private fork of CockroachDB.
Overall, migrating to CockroachDB from Cassandra was great for our apps. We no longer needed to deal with the challenges posed by Cassandra and the new support cross-table transactions (through CQLProxy) greatly helped simplify our application logic.
We were able to get an understanding of and contribute to CockroachDB's public repo. This understanding also helped us integrate custom features into our private fork of CockroachDB like Kronos (the distributed time server) and Copysets (for improved fault tolerance).
Last but not least, because of the CQL-SQL translation layer we built when migrating to CockroachDB, migrating to a new metadata store in the future, if we choose to do so, would not take much effort.
Interested in working on projects similar to this and developing innovative solutions to help secure and protect enterprise data? View the open positions on the Rubrik engineering team here.