Rubrik CDM is scale-out and fault-tolerant. Our software runs as a clustered system consisting of multiple nodes, where each node runs an identical copy of our software stack; each node is equally able to perform operations like data protection and recovery. To increase capacity a user simply adds more nodes. The system continues to operate when a node fails, other nodes pick up the workload while the node is offline. 

Scale-out, fault-tolerant products are built on distributed systems. Some examples from our stack include

  1. Distributed job scheduler: A masterless job scheduler schedules jobs to run both periodic and on-demand application code across nodes of the cluster.

  2. Distributed file system: Our distributed file system spreads data across disks attached to nodes in the cluster to provide fault tolerance against disk, node, and even rack failures.

These distributed components are in turn built on top of a distributed metadata store. 

  1. The job scheduler uses tables in the distributed metadata store as a queue. The queue stores all pending jobs with their target run time.

  2. The filesystem stores inode equivalents (filesystem objects like files or a directory) in the distributed metadata store.

This and the following blogs talk about our metadata store journey and is divided into 3 parts

  1. Cassandra as our v1 metadata store

  2. Transitioning to CockroachDB

  3. Developing on top of CockroachDB

In this blog series, we will dive into choosing a metadata store that meets your use cases and how we went about it at Rubrik.

Databases Concepts

A database is an organized collection of data. Transactions are atomic units of execution on a database. A transaction can consist of multiple steps all of which must pass or fail.

Rubrik's requirements from a database

Rubrik requires its database to be

  1. Scaleable

        a.    It should be able to scale (throughput, storage) based on the size of the cluster

  2. Fault tolerant

        a.     It should not lose any data and continue functioning in the presence of node failures (as long as too many nodes don't fail simultaneously)

  3. Performant

        a.    High-performance point queries - Point queries should have low latencies

        b.    High throughput range scans - Multiple range scans on a table should not impact the performance of other queries on the table, and range scans should have reasonable latencies (though the latency per row may be greater than point queries).

A distributed database is necessary

Requirements (1) and (2) mean that a distributed database has to be used. Distributed databases replicate data (store multiple copies of the same data) to provide fault tolerance. If a copy of the data is lost, the database can continue to correctly function with the other surviving copies. Consensus protocols are used, popular ones are Paxos and Raft, to keep a quorum of the replicas in sync even in the presence of node failures.

The CAP theorem is an important concept to understand when working with distributed databases. The CAP theorem states that a database can’t provide all 3. Since network partitions can happen in the real world, it essentially boils down to whether the database chooses to provide Consistency or Availability in the presence of network partitions. As a backup application, consistency is always more important than availability in the presence of network partitions. Without consistency, the information stored in the database will not be reliable and applications could make incorrect decisions based on this data.

With this background, the next section describes our experience with Cassandra as the first version of our metadata store.

Choosing Cassandra

Cassandra is a distributed masterless NoSQL database. Clients querying Cassandra use CQL (Cassandra Query Language). Some features of Cassandra that were appealing are:

  1. Higher-order column types like Maps / Lists.

  2. High-performance point queries

  3. Easy to set up, deploy and maintain

At first glance, it seemed to provide what we required making it ideal for building our Minimum Viable Product (MVP).

Challenges

As we continued using Cassandra, we started facing a few challenges. Though some of these are well-known issues now, they were not obvious when Rubrik started out and only surfaced once Rubrik reached a certain scale of customers.

Uneven distribution of data across nodes

In Cassandra, the schema of every table has a primary key, which consists of a partition key and an optional clustering key (each key can have one or more columns). Every row in the table is replicated among n (where n is configurable and is usually 3 / 5) nodes for fault tolerance (availability in the presence of node failures). The nodes selected for each row are dependent on the hash of the partition key (Cassandra uses consistent hashing). Each node owns a range of hash values.
 


When a new node is added, let’s say D, it divides one of the existing ranges into two ranges and takes ownership of one of those ranges.
 


One of the problems we faced was that depending on the random numbers generated for range allocation, some nodes would be allocated more rows (hashes) than others. For example, in the above figure, node C has ownership of a lot more data than the other nodes.

Workaround

Cassandra provides a feature where one Cassandra node can be considered as m virtual nodes. For a given value of m, say 256, it would mean that each node would own 256 separate ranges for the hash values. Setting m to 256 helped reduce the data imbalance between nodes at Rubrik.

One of the drawbacks of doing the above was that full range scans on a table caused N * m sub-queries instead of N sub-queries leading to poor performance.

Secondary Indexes

Secondary indexes are useful whenever we want to query/filter by columns that are not a prefix of our primary key. They could be useful for displaying data in our UI – where we can have custom filters – and also in application logic where we want to filter by different sets of columns.

Secondary indexes in Cassandra have some inefficiencies, some of them mentioned here

Typically, distributed databases store the secondary indexes as internal tables or a global sorted map, but Cassandra's secondary indexes are not distributed like normal tables. Each node stores only an index for the data it stores. Due to this, secondary index queries can result in getting local results from every node in the cluster, and this is especially problematic in large clusters. Every node has to be queried even if the required data is present only in a few nodes of the cluster. In large clusters, a secondary index query from any node hitting – and getting local results from – every node in the cluster had poor performance.

Workaround

To work around this, we used separate tables to store our secondary indexes. These tables are distributed as they are native Cassandra tables. For example, if the primary keys of a table are (p1, p2), and we want a secondary index on column (c1), we create a new table with columns (p1, p2, c1). This does involve the additional overhead of keeping the original table and the secondary index table in sync. 

Since applications can crash after updating one of the two tables (Cassandra does not support cross-table transactions), a separate process is required to ensure that the secondary index is always up to date. This separate process does cause additional query load to the database which could have been avoided if secondary indices were performant.

Out of Memory Crashes

Cassandra is a Java process with a fixed heap size. Cassandra on some clusters was prone to "out of memory" crashes - where a process crashes due to exceeding its defined memory limits - due to our query pattern or large row sizes. All Java apps can potentially run into "out of memory" issues if not enough heap space is given. At Rubrik, we gave sufficient heap space to Cassandra and it ran fine for most of the clusters, but there were a few clusters that ran into these errors. Root causing the queries which were responsible for these crashes was challenging.

As a part of best practices, we avoided using large partitions. Rows belonging to a single partition key were limited to at most 1 MB. We also avoided individual partitions becoming hotspots due to high read/update QPS for a partition key.

Resurrections

Finally, and most importantly, we've faced issues with Cassandra where we see deleted rows partially resurface. By partially, we mean some columns on the row become null when they return! We aptly called this problem “resurrection.” This was like a 2-ton straw that broke the camel's back and was the main problem we wanted to move away from.

When we delete a row from Cassandra, Cassandra marks the row with a "tombstone" marker and doesn't immediately delete the row. It's somewhat like a new value that overrides any previous value for the partition/column. Tombstones have a configurable lifespan, after which it is garbage collected and the row is deleted; for example, after 24 hours. 

Tombstones can make reads slower hence we try to avoid them. We avoid them by using row/range tombstones:

  1. Deleting a contiguous set of clustering keys in a "wide" partition. By wide, we mean where a partition key has a lot of cluster keys.

  2. Deleting the whole partition is just one tombstone.

The main problem we have with tombstones happens when a node goes down for a duration greater than a tombstone's lifespan. For example, if the following happens in a 3 node cluster with 3 way replicated rows.

  1. Insert a row R

  2. Node A goes down

  3. The client issues a delete with QUORUM consistency. Cassandra adds a tombstone to nodes B and C

  4. 24 hours pass. On nodes B and C the tombstone is garbage collected (via compaction) and the row is deleted.

  5. Node A comes back up. It notices that it has row R which is not present on nodes B and C. It is unaware that the row was deleted since the tombstone was garbage collected. It replicates row R back to nodes B and C. This causes the "resurrection"

Workarounds

To mitigate the problem of resurrections, we used the following:

  1. We only purge tombstones that are "repaired." This means that all replicas of a row have acknowledged the presence of the tombstone. We ran "repair" on Cassandra once a day to make sure tombstones get purged.

  2. Use CONSISTENCY_ALL for deletes if possible. Deletes are not processed in this case if the tombstone cannot be added to all replicas of a row. The drawback of this approach is that deletes will fail if a node in the cluster goes down.

  3. If Cassandra on a cluster is down for more than 24 hours (the tombstone lifespan), prevent the node from re-joining the Cassandra cluster. This will make sure that this node does not up-replicate deleted data.

Conclusion

All systems must make tradeoffs, and there is no free lunch. Though using Cassandra had its advantages, it did have its set of challenges. We worked around these by setting guidelines for best practices. We designed our schema very carefully and put in checks to avoid problems like resurrections. 

At a certain point, maintaining these practices became a lot of overhead and we decided it was time to switch to a different distributed database.

In the next few blog posts, we'll talk about

  1. Choosing the Right Metadata store

  2. Developing on top of CockroachDB