Fei's Playground home


Storage Tutorial

“Inspired by Amazon wiki”

1 Introduction

Many years ago, an engineer straight from school, whose first task was simple enough – write code that updates database records. The code worked beautifully in demo. It went to production, whereupon DB latencies exploded and the application slammed into the gutter like a car that blew all four tires. The thing moved so slowly, it was effectively dead.

The issue was, nobody told him about prepared statements.

After the incident, the engineer remarked “That’s something they don’t teach you in school.”

In retrospect, those were simple times for storage. The big decision was which version of Oracle to use. Since then, things have only become more complicated. As we reach the limits of scaling and performance, traditional solutions break down. In their place, dozens of new contenders have arisen and compete for various niches. All of them are filled with lessons “they don’t teach you in school”. The choice of which solution to use, and how to use it, can be overwhelming; and the risks of choosing poorly remain as critical as ever.

1.1 Storage Is About Tradeoffs

A common reaction to the complexity is “tell me what to use”, or “is there a decision tree?” Sorry to say, neither is practical. The fact is, nobody understands your requirements better than you, and only you can select the right answer. And, a decision tree wouldn’t be a tree, but rather hundreds of interrelated constraint logic declarations that would make tax forms look like beach reading.

The difficulty arises because storage is about tradeoffs. This is so important, we’ll highlight it with itself.

Every feature added to a storage solution will popup a cost. Whether the feature outweighs the cost can be a subjective, difficult decision. This is a decision only your team can make.

The goal of this tutorial is to document the tradeoffs, hopefully in a way that enables engineers to arrive at the right decision for their application. It also introduces special considerations that are often overlooked (eg. cross-WAN replication, bulk uploads). In subsequent documents, the different implementations will be evaluated against these tradeoffs, and the hidden gotchas of these implementations will be called out.

The list of questions to consider is long. It ranges across issues like the customer experience, hardware TCO, headcount questions (can we afford two DBAs?), network reliability, and physical machine performance. Like no place else, storage is where computer science meets the road.

1.2 Myth of the Single Solution

When evaluating storage solutions, there is a tendency to seek the ONE, the product that meets all the application’s needs.

“My storage soulmate is out there, somewhere, waiting to meet…”

Unfortunately, the perfect storage partner doesn’t exist. No single solution can solve everything.

For storage, first we need a place to persist the profile. Then, business owners will want to know how many are uploaded, and whether dogs are more popular than cats. So, we need to pump the information somewhere they can get to it (we don’t want them touching the production datastore, or continually nagging us with new questions). Next, we’ll probably have software bugs where the chihuahua is accidentially classified as a rodent, or something like that, and we need the ability to adhoc query for broken chihuahuas and backfill them. Finally, the product folks want a feature to “search pets by name”, so we’ll need a search index on the profile.

Out of the gate, we already need a minimum of two solutions; a backend and frontend datastore.

The remaining problems might be solved with a versatile product like Oracle or MySQL in “swiss army knife” fashion, doing a wide range of things adequately. This can work when volumes are low.

But, suddenly, the Crazy Cat Owners Club discovers the feature. Website usage goes nuts! They upload millions of cats. They’re updating their pet status every ten seconds. Some customers have hundreds of pets and we need to introduce pagination. They want the ability to sort by pet name, or by breed. Searches for “Mr. Whiskers” begin to time out from too many results. The data outgrows a single machine. Business owners decree it a tier-1 product and want dozens of new features.

At this point, the swiss army knife breaks down. And that’s to be expected. It’s impossible for one solution to optimize for everything. As the scale grows, specialized solutions become more effective. Searches might be offloaded to a Lucene fleet, or website queries might be offloaded to a high-availability datastore, or a cache layer could be added. The greater the scale, the richer the ecosystem tends to become.

The important takeaway is that a single solution was never adequate, even at the beginning. As scale grows, the implementation fragments further.

When searching for storage, the question isn’t “which solution is right for me”, but rather “which solutions are right for me”. Recognize when different requirements are competing against each other, and be prepared to find different solutions for each.

2 Latency

Everyone wants to be fast. So, here’s good news - most solutions are fast in isolation.

The bad news: latency is a trade-off against nearly everything else. For this reason, it’s important to know the real latency requirements of your app.

Latency can be treated like a budget. A storage solution might begin at 4ms for a simple query. Then, you add partitioning (+7ms), read consistency (+30ms), or downgrade to cheaper hardware (+10ms). Knowing your limits is important. An artificially low latency budget can result in missing out on valuable features, while going over budget is just as bad.

How low is too low? How high is too high? To help answer that, we’ll describe the most common latency metrics and provide targets for typical use cases.

2.1 Measurements

tp99.9, also known as “three nines”. If a service is meeting a tp99.9 of 200 milliseconds, it means 99.9% of queries are answered in less than 200ms, and 0.1% (1 out of 1000) are answered in more than 200ms.

So, what’s the fascination with tp99.9?

It began as a way to detect customers with consistently poor experiences. Consider the “select a shipping address” page in the order pipeline. Most customers have two or three addresses and the page loads quickly. However, if a customer has 100 addresses, the page might take ten seconds to load. This is not detectable from average latencies because it’s averaged away by the sheer number of other customers. Only by examining the outliers do we see the poor experience.

Tp99.9 is representative of the worst experience, and we want the worst experience to be good.

Customers with the worst experience also tend to be our best customers. Someone has hundreds of addresses because they are placing hundreds of orders. We want them to be happy.

Over the years, tp99.9 has become a general indicator for outlier problems. It shows failure modes that might otherwise go undetected, such as sporadic lock contention, high packet loss, excessive GC pauses, and other resource constraints.

Why tp99.9, and not tp100? Because failure is unavoidable. Packet loss will occur, disks will die… Going beyond three nines merely shows the noise that is cost prohibitive to eliminate.

Back to the original question – what latencies should be measured? The answer is:

Measure reads, writes, and range queries (if applicable).

2.2 Website Latency Targets

Services providing data to the retail websites should target the following latencies:

NO Adequate Good Excellent Outstanding
tp99.9 Under 2000 ms Under 1000 ms Under 500 ms Under 200 ms
tp50 Under 200 ms Under 100 ms Under 50 ms Under 20 ms

2.3 Backend Latency Targets

NO Adequate Good Excellent Outstanding
tp99.9 Under 10,000 ms Under 5000 ms Under 2000 ms Under 1000 ms
tp50 Under 1000 ms Under 500 ms Under 200 ms Under 100 ms

Whether a service needs to be adequate or outstanding depends on how many times the service is called throughout a workflow. A service called in just about every step of a workflow is going to have different constraints than a service called only once.

2.4 Limits of tp99.9

For services called via HTTP, it is not worthwhile to aim for SLAs below what the network can provide.

Today, when clients exceed a rate of approximately 100 requests per second, they will begin seeing bands of timeouts to downstream services at 200ms and 3000ms (TCP retransmits) and 5000ms (DNS timeouts).

Another limitation exists for low request rates. Tp99.9 measures 1 failure out of 1000. If a service receives less than 1000 requests for a given time interval, it makes no sense to measure tp99.9. Use a longer time interval, or measure a lower percentile.

2.5 Tavg vs Tp50

Q: When should I use average latency (tavg) vs. median latency (tp50)?

Both numbers are lies. It depends on how you want to lie. Well, they’re not really lies, but they distill complex distributions into a single number, thereby dropping the details.

Consider a service with the following latencies:

In this distribution, the average latency may skew downward due to cache hits (maybe on the datastore, the filesystem, or an explicit cache), and fast failures (invalid input exceptions).

Once, we had a client begin calling us from the Detail Page with a missing parameter. We immediately returned an error in 1 ms. and our average latencies took a turn toward the fan-tas-tic!

Whether tp50 or tavg is more “accurate” depends on your distribution and what you want to measure.

Human nature tends to select the smallest number when showing off to others, and the biggest number when estimating throughput for capacity planning (so we can hoard more hardware). That’s where evolution got us.

2.6 Benchmarks

When evaluating storage solutions, it’s tempting to say “show me the benchmarks”. Latency benchmarks are interesting, but never completely accurate or comparable to your use cases.

Your latencies will depend on how you use (or abuse) the storage solution; the total amount of data, access patterns, lock conflicts, hardware types, etc…

When it comes down to it, you’ll have to benchmark it yourself. Select two or three candidates and build a demo usage. Test with the anticipated hardware, datasets, and access patterns. This can be a couple weeks of work, but it’s work that needs to happen. Better to find the flaws now than in production.

2.7 Relationship to Throughput

Latency != throughput. The actual throughput of the system will depend on concurrency.

For this reason, it’s important to benchmark using the anticipated concurrency rates of the production system.

3 Availablity

3.1 What is it?

Availability measures whether your service is alive and performing at 100% of its abilities.

Here classifies availability goals by tier:

The requirements placed on your app will impact the storage solutions available to you.

For example, a tier-1 service may give up useful features (like serialization or consistency) in favor of availability.

3.2 How do I know my tier?

Sometimes, tier is selected by business value. Other times, tier is thrust upon you.

Consider the following cases:

Tier-1 services used to be a rarity. Prior to the year 2000, there were none. If a critical database went down, the site would be offline for 30 minutes until a standby was fired up. Some initial Tier 1 relied primarily on replicated Oracle databases. They were expensive, complex, and an operational headache to run. They succeeded in keeping the site alive, but their owners sometimes felt half-dead. Tier-1 was a dedicated art for a few specialist teams.

As the decade progressed, new techniques brought tier-1 storage to the masses. Innovation arose in areas like replication and quorum systems. Most important was the introduction of hosted storage. Suddenly, teams could integrate with tier-1 storage in a matter of minutes (assuming a hosted solution met their needs).

Today, it is rare for website features to aim for less than tier-1 availability. Backend systems aim for tier-2. Tier-3 is becoming as rare as tier-1 used to be; mainly relegated to internal UIs that get used here-and-there, but aren’t on the critical path for anyone’s job.

Whether an application must be tier-1 is a cost/benefit decision involving the technical and business owners of the product. The operational costs of tier-1 (described below) should be weighed against the cost of outages or inconsistencies. Every product has its own constraints, and its own answer.

3.3 Tier 1 Storage Techniques

Technique 1) Hosted Solutions

Running a tier-1 storage might be easier than it once was, but is never easy. When a hosted solution meets the requirements, it’s the way to go. Hosted solutions are particularly applicable to cases of simple PUTs and GETs within a single availability zone. For example, capturing a customer preference, or persisting a JSON document. A range of options exist for this case.

Of course, hosted solutions only do what they do. When your round peg doesn’t fit in the square hole, you need to roll your own storage.

Costs of technique: Loss of flexibility. Vendor lock-in. Risk of service “going out of business”.

Technique 2) Tier-1 Read, Tier-2 Write

Not every application requires tier-1 availability across both reads and writes. Consider the tools used by third-party merchants to upload inventory and manage their orders. Ensuring the items are available on the retail website (ie. readable) is tier-1. Ensuring a merchant can upload and manage their inventory (ie. writes) is tier-2.

One of the most difficult aspects of tier-1 availability is ensuring writes happen cleanly. Punting writes to tier-2 availability vastly simplifies the problem.

Tier-1 reads might be achieved using periodic snapshots to a replicated fleet, caches atop an authoritative store, or even something as simple as a static BDB that is deployed to the fleet.

Costs of technique: Writes can still become unavailable for minutes. Lag time for writes to appear. Snapshot mechanism can be difficult to make reliable.

Technique 3) Service Redundancy

Rather than building redundant datastores for a single service, build two tier-2 services. Each is a single-point-of-failure. Call one service first. If it fails, call the other. The two tier-2 services combine to make tier-1 availability.

This relies on the principle of redundancy. If the probability of service failure is one in one thousand, the probability of a joint failure of both services is one in one million.

It may sound funny at first (why don’t they make the first service tier-1?), but it can work in practice.

Consider the case of credit card tokenization.

Because of security constraints on credit card numbers, it is not permitted to generate the token from a hash of the card number. It must be a random mapping. Guaranteeing the unique mapping requires that all requests be routed, serially, to a single master datastore. The master knows whether it’s seen the card before and can vend the same token.

By definition, a single datastore is a single point of failure and not tier-1. To attain tier-1 availability, a second service was introduced. In the event of tokenization outage, the second service vends a “temporary token”. Later, when the original service is healthy again, clients receive a callback to swap the temporary token with the permanent one.

This technique is most useful for backend services with a limited number of clients and where data is not customer facing (enabling data to change with minimal impact).

Costs of technique: Multiple services to run. Assumes no common mode failure. Might require a callback to clients.

Technique 4) Multi-Master Replication

When a system requires 100% availability across both reads and writes, the common solution is multi-master replication. Multiple datastores run in parallel and updates can go to any member of the group, who replicate changes amongst each other.

Multi-master replication is also the most complicated solution. The complications are described in the chapter on consistency. In essence, the use of multiple writers requires sacrifice. The sacrifice can include giving up consistency, increasing latencies, or increasing hardware costs by orders of magnitude. There are many decisions to be made in optimizing across these factors. In addition, for engineers unaccustomed to programming in multi-master environments, it can be a jarring shift from traditional techniques. For example, one typically has to give up serialization (see next chapter) and the benefits it provides, such as the ability to use fine-grained record locking, counters, aggregators, and uniqueness constraints.

Multi-master techniques are powerful, but introduce a major step up in engineering complexity and runtime costs. Before jumping into multi-master systems, one should ensure the business requirements justify it and that no other tier-1 techniques suffice.

Costs of technique: Large step-up in costs, complexity, or latency. May require a different style of programming. Many decisions to be made when balancing the tradeoffs.

4 Concurrency

4.1 What is it?

Concurrency means many users accessing the data at the same time, either by reading, writing, or both.

4.2 Why is it important?

If you want more than one person to use your application, concurrency affects you.

Applications with poor concurrency may sporadically slow down, lockup, or mangle data, causing your customers and teammates to volley expletives in your direction. But, with a few techniques you can keep the love of your peers.

In practice, concurrency problems are caused by three things:

  1. Faulty Read-Check-Write Logic
  2. Mishandling Partially Constructed Data
  3. Not Understanding the Locking Implementation of the Datastore.

We’ll discuss each and how to avoid the pitfalls.

4.3 Concurrency Problem 1: Faulty Read-Check-Write Logic

The classic example of Read-Check-Write logic is updating an account balance. For example, imagine we want to add $5 to a customer’s account. A typical process will:

If two workers concurrently attempt to add $5:

The ending balance should be $10, but instead it’s $5. Without concurrency controls, the account balance becomes invalid. Read-Check-Write situations arise whenever you have:

The following techniques can be used to ensure Read-Check-Write logic is executed correctly.

Single Worker

Using a single worker solves the concurrency problem by avoiding it. Only one worker is active at a time.

Theoretically, this can be effective for backend processes where execution times are controlled. For example: a daily job that sums transactions over the previous day and updates account balances. Or, a workflow that finds shipments in condition A and moves them to condition B.

While simple sounding, it can be very difficult to ensure only one worker is active at a time. The naïve approach said “we’ll just run it on one box” and assumed concurrency problems were solved. We saw this break down in several ways:

Different techniques were explored to mitigate the deficiencies. Some were hokey (“we’ll run two boxes; one working on odd-numbered IDs, the other on even-numbered IDs”). Other groups wrote complex leader election schemes, where a fleet of workers would vote amongst themselves to choose a single worker (and hope it worked perfectly! Which it usually didn’t.) Finally, to scale, multiple leaders could be elected for subsets of data, who report up to other leaders, who roll up to…

The basic idea of a single worker leads to some exceedingly difficult problems in computer science. Most of us are paid to solve other problems. For this reason, teams shifted toward the mindset of “concurrency happens” and built systems to handle it. We’ll describe those systems next.

In the research community, work continued on distributed consensus algorithms, which is the fancy way of saying “getting a bunch of processes on different machines in different locations to all agree on something”.

Whitepapers from the 1980’s have titles like “Impossibility of distributed consensus”, whereas by the 2000’s research focused on different ways to achieve consensus via the Paxos algorithm and it’s variants.


Serialization is a technique to catch concurrent requests and execute them one-by-one, so that they don’t overlap. It relies on locking to control access to a resource.

Searialization is by far the most common concurrency control in use today. This is in large part due to the popularity of transactional databases (like Oracle) and the fact that these products provide locking and serialization as part of their core functionality. For example, if you open two sessions on a transactional database and update the same record from each session, you will find the second session blocks until the first commits or rolls back.

In the early days, pessimistic locking (or explicit locking) was the norm. Pessimistic locking works like this:

Unfortunately, deadlocks were common. Processes would freeze and engineers were paged in the middle of the night. Database scripts were written to monitor for long-lived locks and send emails (“Your process has held a lock on the shipments table for 15 minutes!”). It was miserable all-around. Pessimist locking is also expensive; it acquires and holds a lock for the lifetime of the transaction. Long-held locks reduce the total throughput of the system. A move to non-blocking lock acquisition (“select … for update no wait”) solved the problem of deadlocks and allowed engineers to get a full night’s sleep, but didn’t improve the efficiency.

Eventually, it was recognized that lock conflicts were extraordinarily rare in most of our systems, and we moved toward optimistic locking. A typical implementation for optimistic locking might look like this:

// The table 
DATA               VARCHAR(200)
RECORD_VERSION_NUMBER  NUMBER(38)   NOT NULL ;Trigger maintained to increment on updates

// The code
  (data, version) = "select data, record_version_number from foo where foo_id = 1234"  		execute some logic...  
  rowsUpdated = "update foo set data = <newData> where foo_id = 1234 and record_version_number = <version>"  
  if (rowsUpdated == 0)
      throw new OptimisticLockException();
catch (OptimisticLockException ex)
  // retry once or twice, depending on SLA and use cases

Optimistic locking has served well for many years. It provides cheap serialization and allows for retries in the rare event of collision.

However, serialization has its limits. Consider the tier-1 service owner who prioritizes availability above all else. To achieve 100% availability, a tier-1 service can’t rely on just one database. A single database can die. Instead, tier-1 services typically run a fleet of databases in multi-master mode. All the databases are available to receive writes, replicating amongst each other at a delayed interval. So, what happens to serialization in this case?

While each database can serialize its own writes, how will it serialize across all nodes in the system. The problem becomes extraordinarily difficult and leads to topics like master election and two-phase commits; the graveyards of many well-intentioned products. For this reason, tier-1 services typically sacrifice serialization in deference to other goals. They may write software differently to avoid read-check-write logic (see next topic), or give up concurrency controls entirely.

Rewrite software to avoid Read-Check-Write logic

Depending on the use case, it is sometimes possible to eliminate Read-Check-Write logic, and thereby eliminate the concurrency concerns. Let’s go back to the case of incrementing an account balance. Instead of updating, it might be possible to insert transactions individually:

insert into accounts (ID=1, amount=$5, date="sept 1, 2014")
insert into accounts (ID=2, amount=$5, date="sept 1, 2014")

The daily sum can be calculated on they fly:

select sum(amount) from accounts where date="sept 1, 2014"

When updates and deletes can be transformed into inserts, concurrency problems vanish. A related trick exists for duplicate detection. Consider the case of concurrent address creation:

insert into addresses (ID=1, value="123 Main St.");
insert into addresses (ID=2, value="123 Main St.");

In the above example, IDs are generated from a sequence. Instead, consider generating IDs deterministically by hashing the address data:

"123 Main St." -> hash function -> "KSLK2LKJ3LKJ2OIJ2J3"

Now the inserts become:

insert into addresses (ID="KSLK2LKJ3LKJ2OIJ2J3", value="123 Main St.");
insert into addresses (ID="KSLK2LKJ3LKJ2OIJ2J3", value="123 Main St.");

Both inputs create the same output, eliminating the duplicate. As in many programming disciplines, concurrency problems can be avoided by adhering to two principals:

That’s the theory, at least. But, I’ll be straight with you. Never in my experience have these techniques been applicable to storage of customer data. Maybe it works in other domains. But we have too much baggage.

Much of the baggage is backwards compatibility. Our platform grew from the seed of Oracle databases and the origins show. For example, hundreds of systems persist AddressIDs as 32-bit integers. A forced migration to a 128-bit AddressID generated from a hash function is economically unjustifiable. So, we continue to generate them from sequences, and idempotent ID generation remains a dream.

It can also be difficult to maintain immutability in practice. For example, addresses are generally immutable.

But, bugs can arise. In one example, the frontend began persisting addresses with a missing dash in 9 digit US postal codes (“981090580” instead of “98109-0580”). This broke fulfillment. It became necessary to backfill postal codes. Immutability constraints normally wouldn’t allow this. We would have to delete the old address and create a new one. However, we couldn’t delete the old address without impacting several other systems that were hanging onto the AddressID. So, we broke immutability in deference to operational reality.

The misapplication of immutability and deterministic ID generation has caused numerous headaches when confronted with real world necessities. Be very careful when choosing these techniques. Are you really certain immutability and idempotence can be guaranteed, always and without exception? It takes an enormous effort to back out the wrong implementation. If in doubt, don’t do it.

Give Up

When leader election fails, when serialization breaks down, when you can’t be immutable or deterministic, there’s one option remaining: Give Up. Sometimes, it’s a great option.

Consider the case of concurrent, duplicate, address creation

What’s the impact of this race? Two duplicate addresses in the address book. Who cares! It’s not 200, or 2000. The customer can delete the duplicate, or software can hide it when displaying the address book. And how often does this happen? Almost never. If the choice was between uniqueness or availability, then availability wins by a landslide.

This doesn’t apply to everything. The cost of duplicate orders is much higher than the cost of duplicate addresses. But, when you understand the considerations, you can choose when it’s appropriate to ignore them. Faulty read-check-write logic can be a perfectly acceptable tradeoff against other goals.

4.4 Concurrency Problem 2: Mishandling Partially Constructed Data

Users of Oracle and other transactional databases may be blessedly unfamiliar with partially constructed data. Transactional databases provide isolation between users precisely to avoid this. However, not everyone is using a transactional database; and, those who do still face the problem when populating multiple databases (ie. distributed transactions).

The most common opportunity for partially constructed data is when populating an auxiliary index in another location. Consider the case of a system that is sharded into 10 partitions, plus an 11th datastore containing a routing table. A record might be inserted into partition 7, plus an additional entry into the routing table specifying the ID can be found in partition 7. These are two separate transactions. Engineers must choose the sequence of the transactions, plus prepare for partial failure of one.

You may have heard of two-phase commits as a means to address the problem of distributed transactions. They promise to make your distributed transaction appear as a single transaction, thereby freeing you from concern. Be wary. We have never seen two-phase commit work in real-time latencies and within our unreliable network. You should assume there’s an ogre rampaging through the datacenter with an axe chopping Ethernet cables at will. It feels that way. He will get yours at any moment. When he does, your two-phase commit will hang or abort uncleanly. Your database driver will probably hang too, since the commercial products we’ve used behave very poorly in the face of ogres (despite what the marketing says). An elf will repair your cable, but the damage is done to the transaction. (The ogre also lights fires and smashes cooling systems.) Two-phase commit also requires all components to support transactions, which is not always the case. The secondary system might be a search index without transactional semantics.

Without transactions, one has to prepare for partial failure. More importantly, one must gracefully handle partially constructed data when querying. In the simplest case, this is accomplished by inserting into the index first, then into the authoritative datastore. If the transaction fails midway, an error is returned to the client and the dangling index is left in place.

Subsequent index scans will join with the authoritative datastore, notice the missing data, and simply ignore the index entry.

In other cases, abandonment is impractical. Consider a merchant who uploads a product for sale. The product is inserted into a catalogue table and also into search indexes so customers can find and buy it. Data cannot be abandoned midway. An unsearchable product is not acceptable. When you get to this point, consider whether you have a workflow rather than a distributed transaction. If any steps can be performed asynchronously, they should be. Working via asynchronous, repeatable steps can be immeasurably easier to implement than a distributed transaction.

4.5 Concurrency Problem 3: Not Understanding the Locking Implementation of the Datastore

Serialization relies on locking, and every datastore can lock differently. If you jump into an implementation without understanding the locking mechanisms, you may be surprised when the system underperforms.

The first thing to consider is the granularity of the lock. Locks may occur on the database, on a table, on a page, or on a single record. Different activities may generate different locks. For example, updating a single record may require a page-level lock, whereas adding a column requires a table lock, and backups require a database level lock. So, the first thing to consider is what activities will be performed (including operational stuff like backups) and understanding what they lock.

Be especially careful with page-level locks. If a table is small, in the ballpark of 1000 entries or less, the entire table may fit within a page. Page locking effectively becomes table locking and throughput is considerably reduced on the table. The same problem occurs in heavily utilized, small, consecutive sub-ranges of a large table.

The other aspect to consider is read vs. write locks. There exists a wide range of options. Oracle dedicates 38 pages to describe the knobs and dials available for controlling read & write isolation, and the ANSI/ISO SQL standard (SQL92) defines four levels of transaction isolation with differing degrees of impact. As an application owner, it can be overwhelming. The best approach is to begin with the system defaults (there is a reason they are the defaults) and ask the following questions:

You may find a database’s locking implementation is not appropriate for you, and you can move on to other options.

In most cases, the defaults are adequate for typical use cases. The primary gotcha to lookout for is overly aggressive locking. For example, some simpler implementations will block all reads while a write is ongoing. Combine this with table-level locking to create a recipe for disaster. To illustrate, consider a small domain table receiving 1000 reads per second. If someone updates the domain table with a burst of writes, the full table is locked until the writes complete. It’s like sending a pack of cats running across a freeway. All the readers slam on their brakes while the writes proceed, serially, one-by-one. It doesn’t take long for 1000 reads-per-second to pile into a massive traffic jam.

Latencies spike, queues grow, and if timeouts aren’t configured well, the blocked requests linger and cause a multi-minute outage until everything clears out. Applications in this situation are better off using a read-only snapshot or caching the records in memory.

Lock management isn’t necessarily hard, but it’s often overlooked, resulting in applications that significantly underperform in both latency and throughput.

5 Idempotence and Resource Acquisition

### 5.1 What is Idempotence ?

An idempotent operation will produce the same results if executed once or multiple times.

In service oriented architecture, the goal is typically to avoid ill effects from duplicate requests to an API.

5.2 Why is it important?

Consider the following screenshot from a real website,

The “Submit Payment” action lacks idempotence. In this case, a double-click is treated as two payment requests, rather than one.

A billing team was frequently refunding duplicate (or triplicate) credit card charges. Duplicate shipments were sent out the door. One time, an application got stuck in a loop, sending the same email to a customer hundreds of times. The battle cry arose – “be idempotent”.

In essence, one needs to prepare for duplicate requests. Duplicates will arise from double-clicks, retries, and misbehaving clients. In addition, asynchronous messaging architectures (such as AMP) will prefer duplicating a message to possibly losing one. Duplicates are a fact of life. We must deal with them.

5.3 What is Resource Acquisition?

Resource acquisition is the travelling buddy of idempotence.

Imagine having 1 ticket to a Bon Jovi concert. Surprisingly, many people want it.

The first is Bob, asking “Can I have the ticket?” He wins.

The next is Bill, asking “Can I have the ticket?” He’s out of luck.

The next is Bob again. He REALLY loves Bon Jovi and is manic. He asks again “Can I have the ticket?”.

This is resource acquisition + idempotence. You email Bob: “Yes”. You email Bill: “Sorry”. You email Bob: “Yes” (again)

(It’s the same problem with 100 Bon Jovi tickets; just slightly more bookkeeping.)

5.4 Relationship between Idempotency and Resource Acquisition

At their core, both problems reduce to one of Duplicate Detection. The difference is how they track & respond to duplicates.

An important thing to recognize when tracking duplicates is the difference between one-shot actions and resource acquisitions.

An example one-shot action is “Send Confirmation Email for Order #002-491010-49290432”.

Regardless of who makes the request, or how many duplicates arrive, everybody can hear the same response: “Success”.

And, it’s true. The email was initiated. Everyone’s a winner.

It’s communal idempotence.

In contrast, a resource acquisition can have losers. A resource acquisition is “Buy a Wii Console”, or “Issue a $5 refund for a $100 purchase” (or “Buy Bon Jovi tickets”).

Idempotence spans both cases. But, in the latter case, idempotence is combined with resource acquisition and the responses are customized to each request. Everyone hears either “yes” or “no”, and they continue to idempotently hear the same “yes” or “no” regardless of how many times they repeat their request.

The correct behavior all comes down to identifying a duplicate request appropriately.

5.5 Recognizing Duplicate Requests

A very stupid way to track a unique purchase is with the tuple of customerID + ASIN. It might work for the simple case of selling a Bon Jovi ticket, but we don’t want to limit our customers to one lifetime purchase per product.

It does introduce the question though: what is the right way to identify duplicates? The answer isn’t always clear cut.

Inferred Duplicate Detection This is the easiest case. As mentioned above, one-shot actions are inherently recognizable as duplicates and require no special effort.



Updates to a terminal state, whether Completed or Cancelled, are one-shot. The application need only check the current state and ignore duplicates. Another Example:

createAddress(customerId="8583", name="Rufus", street="1200 12th Ave S", city="Seattle", state="WA", country="US")
createAddress(customerId="8583", name="Rufus", street="1200 12th Ave S", city="Seattle", state="WA", country="US")

Both operations result in the same outcome – an insertion into the address book for customerId “8583”.

(It can be cumbersome to compare all the fields of a request. More commonly, the fields are combined and hashed into a checksum, and the checksum constitutes a unique request ID.)

You might wonder what happens if the address is deleted between these two requests. Should the second creation succeed or fail? We’ll touch on that in a moment.

Explicit Detection

As opposed to one-shot actions, repeatable actions require an explicit identifier. Consider this account creation API:

createAccount(login=”rufus@zzz.com”, pwd=”woof”);
createAccount(login=”rufus@zzz.com”, pwd=”woof”);

Do the requests originate from one customer who double-clicked, or two customers with conflicting credentials? The answer cannot be inferred. In this case, it becomes necessary to add a requestId (or clientReferenceId) parameter.

createAccount(login=”rufus@zzz.com”, pwd=”woof”, clientReferenceId=”X”); // Return Success.
createAccount(login=”rufus@zzz.com”, pwd=”woof”, clientReferenceId=”X”); // Duplicate. Return Success.
createAccount(login=”rufus@zzz.com”, pwd=”woof”, clientReferenceId=”Y”); // Conflict. Return Error.

Clients can choose a referenceID in a way most convenient for them. A batch import job might have its own primary keys to use as a reference ID, whereas a website might embed a random string into a hidden form field. The only requirements are:

  1. it should be unique (of course)
  2. it should be defined at the point-of-entry (so it stays consistent across all systems)
  3. it should accurately identify a unique request

A common trick to help with #1 is to grant domains or namespaces to clients. Then, clients don’t need to worry about overlapping requestIDs.

But, what is the right way to answer #3? That’s the crux of the problem. And, unfortunately, every team must answer that on their own. A general rule of thumb is offered next.

5.6 But what if… ?

Idempotence discussions can slip down the theory rat hole. For example, in the createAddress() example, what if somebody deletes the address between the creation attempts. It leads to discussions like “a composition of idempotent methods isn’t necessarily idempotent”.

Don’t overcomplicate the problem. The goals are simple:

Choose the rules that work best for your situation. If customers are happy, and your teammates are happy, then you’ve got the right solution.

Generally, if you’re thinking about the problem, you’re already ahead of the game. The worst mistakes arise when duplicate detection is overlooked completely.

5.7 Solutions in Strongly Consistent Systems

Duplicate Detection is a read-check-write problem. The previous section on Concurrency described the gotchas around read-check-write logic and how to implement it safely.

To reiterate, duplicate request handling is like this:

  1. Read: Have I seen this request before?
  2. Check: Is the current request a duplicate or a conflict?
  3. Write: Apply the request

Duplicate detection is simply the act of stopping at #2 if the request has been seen before, and returning the proper result.

Duplicate detection is trivial to achieve in a strongly consistent system. Users can know the state of the world at any given time. Therefore, it is easy to answer questions like “does this record exist?”. Common solutions include uniqueness constraints or locking a resource to serialize access to it.

5.8 Solutions in Weakly Consistent Systems

Perfect duplicate detection, in the traditional sense, is unattainable in weakly consistent systems.

Consider the following account creation scenario:

Neither node is synchronized with the other. There is no way to catch the duplicates.

The end result is conflicting accounts.

There are several techniques to manage the problem, each customized to a specific use case.

5.8.1 Case 1) Writing a unique record into a local datastore


Let’s continue with the example of account creation. This is the act of writing a unique record into a local data store.

Solution A) Stop the duplicates This is the old saw that the best way to solve a problem is to avoid it.

We could prevent users from double-clicking by using Javascript to disable duplicate form submissions.

This is good practice, and helps, but doesn’t solve the resource acquisition problem. Other customers could attempt to acquire the same credentials. In addition, not everyone uses the UI. When we buy new subsidiaries, they will often write scripts to copy their existing customer base into systems. These scripts are run repeatedly as they’re being verified for accuracy and performance. Duplicates inevitably arise.

Solution B) Move to a strongly consistent system

This is another punt on the problem. Essentially, if perfect idempotence is required, then weak consistency is the wrong choice. Consider moving to something with higher consistency, such as master election or quorums. Prepare for extra downtime or higher hardware costs as a tradeoff.

The example of account creation optimizes for 100% availability, so it is willing to make tradeoffs. In addition, accounts can be created globally. Strong consistency is not feasible on a global scale (with any decent performance), so relying on consistency was never an option.

Solution C) Deterministic Inserts

This was described in the Concurrency section, in the discussion on how to “Rewrite software to avoid Read-Check-Write logic”. Essentially, if you can ensure duplicate inserts generate the exact same data, and if your data store offers insert-or-update capabilities, then you can simply “put” the record into the data store and let it naturally coalesce the duplicates.

The challenge is that systems require a Primary Key to uniquely identify a record, and it can be difficult to deterministically generate a Primary Key. Obviously, a sequence number doesn’t work, because the nodes are working independently and would fetch different sequence values. Instead, a deterministic insert is generally achieved by hashing the message contents to generate a Primary Key.

This has the side-effect of requiring immutability. Otherwise, a change in the contents will change the Primary Key, generating a different record (which is what we’re trying to avoid!) Some wiggle room is allowed for metadata, such as state management. It’s OK to have an attribute indicating the record is ACTIVE, COMPLETED, CANCELLED, etc… But, ideally, there should be no loops in the state transitions. (For example, if the state can toggle between CLOSED and OPEN, what does it mean when something tries to insert an OPEN copy of a CLOSED record. Is it an old delayed message that should be ignored, or a new state transition? By sticking to linear transitions, the question is trivial to answer: CLOSED always comes after OPEN, so CLOSED wins.)

Unfortunately, this technique doesn’t work for account creation. How can we deterministically select a unique CustomerID?

The following registration pipeline didn’t test well with customers:

Alternatively, we could generate a customerID by hashing the email address. But, that means the customer could never change their email. Another poor experience. Instead, account creation uses best-effort duplicate detection (described next).

Solution D) Best Effort Duplicate Detection

This technique relies on Best-Effort Consistency to provide duplicate detection under most circumstances. Every once in a while (if a machine crashes, for example), the perfection disappears for a couple minutes until consistency is reestablished.

If the rate of duplicates is low, and the window of inconsistency is small, then the probability of both occurring at the same time is very small. Good duplicate detection can be achieved by riding the averages.

This is the situation we have with account creation. It is rare for two users to concurrently create the same account, let alone do it during the window of inconsistency. I’ve witnessed one ticket for the problem. It was a single user who accidentally created two conflicting accounts. One of the accounts was unused, so it was deactivated. End of problem.

5.8.2 Case 2) Requesting action on a remote resource


Requesting a credit card charge, or sending a password reset email.


Separate the request acknowledgement from execution.


It may so happen that the need to request an action is tier-1, but execution is tier-2. Password reset is an example; a customer clicks a button on the website to request an email, and that button should always succeed. On other hand, the customer anticipates the email will take time to arrive.

In other words, this is a workflow. A very simple, two step workflow.

The solution is to divide the problem into two parts. First is a tier-1 workflow initiation mechanism. This could involve enqueueing a message for guaranteed delivery, or writing to a datastore with deterministic inserts. On the backend, a highly consistent system (with potentially less availability) will listen to the messages or sweep the datastore. The backend system can recognize duplicates. It is responsible for handling them correctly.

Checkout works on this principle. The “Order Now” button simply initiates a workflow. Duplicate workflow initiation messages could be delivered by the message broker, and it’s the responsibility of the order processing manager to handle them.

The implication of this technique is that immediate customer feedback is not possible. If the customer’s credit card authorization is declined, the customer will be notified later via email. While this could be frustrating to the subset of customers with credit card declines, it is preferable over forcing every customer to wait upwards of 10 seconds for a credit card authorization to process (and to see fatals when authorization is unavailable).

5.8.3 Case 3) Vending a limited resource


Selling a product with a limited amount of inventory, or offering a promotion to the first 100 customers.


Oversubscribe or undersubscribe, and be prepared to apologize.


Imagine we have a big catalogue of products for sale (it’s a stretch). The catalogue contains many popular items with limited inventory.

In addition, it is critical that the ordering system remain 100% available to accept purchases from the catalogue.

To stay highly available & consistent, we could use quorums. But, it might be cost prohibitive to replicate the huge catalogue across the multitude of replicas necessary to support quorums. Or, the latency may be undesirable. So, high availability is achieved through eventual consistency.

The unfortunate side-effect of eventual consistency is that we never know exactly how much inventory remains available for sale. On the backend, we could be carefully decrementing inventory in a serialized manner as orders flows through the pipeline. But, the frontend, to stay available, is reading potentially stale copies of the current inventory.

In the absence of perfection, we must choose how to err. Options include:

The latter option is the most common.

There can be a natural reaction of horror to this approach. How could we sell a product without knowing whether it’s available? But, it’s an illusion to believe the inventory count was ever a known quantity. As Pat Heland was fond of saying, the warehouse associate might accidentally drop the package and break it. Now, a sold item is no longer available for delivery. Or, the picker might go to retrieve the item and discover it’s missing, perhaps due to internal theft or mistaken inventory counts. The amount of inventory is always a best guess.

We must be ready to apologize for unfulfillable orders. An eventually consistent inventory continues in the same vein.

Just as we try to minimize the number of damaged/missing products in the warehouse, there are techniques we can use to minimize the amount of oversold inventory. One hybrid technique is to use eventually consistent inventory counts only when inventory is high. For example, if 5000 items are available for purchase, there is little risk of sellout. On the other hand, if an item is selling quickly or inventory has reached single digits, we might attempt a consistent read of the inventory. The risk is that a consistent read might take longer. Or, it might be unavailable, forcing us to fallback to the inconsistent view. In either case, the hybrid approach can blend the strengths of each: the majority of inventory checks are distributed across a fleet of eventually consistent storage nodes and remain highly available. In extreme cases, we attempt a consistent read, and consider the side-effects worthwhile.

6 Access Patterns

6.1 What is it?

As the name implies, access patterns describe how the data will be accessed. For example: lookups by primary key, range queries, updates, sorting…

It also includes the rate at which data is accessed. For example, high reads vs. high writes.

6.2 Why is it important?

A mismatch between access pattern and implementation can cause unacceptably high latencies, primarily due to excessive disk seeks, and sometimes also excessive CPU.

A mismatch in this vein is particularly sinister because problems only appear later, when the dataset has grown large. For example, using a full scan to retrieve a single record will perform fine when only 10 records exist. But, when the size approaches 1 million entries, performance will dropoff considerably. In other words, performance degrades exactly at the point when it’s needed most (because there are lots of users), and it’s too late to change the implementation quickly.

This chapter will survey the most common access patterns and their implementation tradeoffs.

6.3 Common Read Patterns

To help the discussion, we’ll enumerate the common patterns and define a terminology.

Definitions work best with an example, so we’ll be referring to a simplistic CUSTOMER record with the following attributes:

CUSTOMER_ID        = 12345
LOGIN              = "bonjovi" 
PASSWORD           = "xmqi28zoiw" 
FIRST_NAME         = "John" 
LAST_NAME          = "Bon Jovi" 
ABOUT_ME           = "I like cars, guitars, and New Jersey". 
CREATION_DATE      = "May 13 2013 15:32:12"
LAST_UPDATED_DATE  = "May 14 2013 01:21:07"

6.3.1 Key Lookups

This is the simplest type of query. It is based on an equality comparison against a unique key and returns zero-or-one results.

There are two flavors of key: Primary Key and Secondary Key (or Alternate Key).

Both are similar, except an auxiliary index is used to map the secondary key to the primary data location.

Some storage systems use auxiliary indices for Primary Key as well, in which case the difference reduces to one of semantics. One commonly applied semantic difference is that Secondary Keys can null, whereas Primary Key is mandatory.

In the CUSTOMER example, CUSTOMER_ID is the Primary Key and LOGIN is the Secondary Key.

Key lookups are typically achieved via a Hash Index or B-tree Index. Theoretically, hash lookups are O(1) while binary search is O(log n). But, of course, performance is always dependent on the implementation and dataset.

Despite the fact that hash lookups are generally faster, they are not the default on many implementations as they don’t support range queries (described below).

6.3.2 Range Queries

A range query retrieves records where a value is between an upper or lower bound. For example, finding all customers where CREATION_DATE is between JAN-01-2014 and JAN-31-2014.

A range query is sometimes called an ordered query or ordered range query because the results are sorted. This is because effectively implementing a range query requires the use of a sorted, searchable data structure.

The most common implementation of a range query uses B+ trees.

6.3.3 Pagination

Because Range Queries can match an unbounded number of results, it is desirable to limit and iterate over the output. There are two approaches to pagination.

Random Access Pagination

This is the type of pagination most familiar from the web. There exists a pagesize (maybe 10 elements per page), and users can jump between pages in any sequence. For example, a user can select page 1000 without going through previous pages.

The performance of random access pagination can be O(N’th page) if it requires scanning (and discarding) many results to arrive at the N’th page. Most solutions are optimized for the fact that few users go beyond the first page.

Iterative Pagination

Iterative pagination only provides forward and backward operations to move one page at a time. This can be effective when the goal is to fetch a range of results in sequential fashion. For example, fetching all customers whose name starts with ‘J’.

Iterative pagination is accomplished by returning a value to clients which indicates the position of the next element. When querying by name, an iterator could be as simple as saying the next name is “Jamie”. Clients pass the iterator back and the system fetches the next set of customers where name >= “Jaime”. (In practice, a customerID needs to be appended to differentiate multiple customers named “Jamie”.)

Iterative pagination is constant time, regardless of which page is being retrieved.

6.3.4 Full Scans

As the name implies, a full scan examines every record to answer a query. An example might be counting the number of customers created per month.

By their nature, full scans are often too slow for frontend-facing use cases. Their applicability is mainly in backend domains such as billing or business analytics.

Products like Hadoop and ElasticMapReduce are often used to accelerate full scans. They provide highly parallelized scans by partitioning the work across a fleet of hosts.

This is the action of searching against words within a document. Using the CUSTOMER example, a full text search would enable querying the ABOUT_ME field for customers who discuss “guitars new jersey”.

There are many variations on full text searching, with innumerable ways to index the document and rank query results. At their core, they depend on inverted indexes.

6.3.6 Spatial Queries

Also called multi-point range queries, these enable lookups like “Find stores within 10 miles”. Spatial queries are based on R-trees or space-filling curves.

6.3.7 Other Query Flavors

Non-Unique Key Lookup

An interesting case is where the upper and lower bounds of a range query are equal. For example, querying for customers with CREATION_DATE between 2010 and 2010. In this case, the range query reduces to what’s sometimes called a non-unique key lookup. Another example is searching for customers where FIRST_NAME=’John’.

In practice, a non-unique key lookup is really just a range query. Somewhere, the non-unique key is accompanied by a rownum or other metadata to uniquely identify the record. The hidden metadata provides the range sorting.

Compound Key Lookup

A compound key is composed of multiple attributes. In the CUSTOMER example, a compound key could exist on (LOGIN,PASSWORD). The same functionality could be achieved by adding a new column to the table:

LOGIN_AND_PASSWORD = "bonjovi|xmqi28zoiw";

However, keeping this column in sync places additional burden on programmers and consumes unnecessary space. Compound keys are a convenience offered by storage engines to save users from this burden.

Prefix Query

A prefix query is a convenience method to execute a range query where the boundaries are computed by the storage engine. For example, searching customers where (FIRST_NAME = ‘J*’) is functionally equivalent to searching customers where (FIRST_NAME >= ‘J’ and < ‘K’).

Negative Query

This is the opposite of an equality query. For example, finding all customers where FIRST_NAME != ‘John’. In practice, this is rarely effective in a system with tight latency constraints. The set of strings that don’t equal ‘John’ is so large as to be effectively infinite, and the only way to implement such a query is by a full scan. A negative query should be replaced with an equality query if the list of keys can be known ahead of time.

Self-Referential Queries

A self-referential query matches against values within the entity itself. For example, querying all customers where LAST_UPDATED_DATE > CREATION_DATE. No key is passed in. Instead, the lookup is based on comparisons of elements within the record.

Self-referential queries are achievable by a full scan, but they can also be optimized with a function-based index. The query might need to be rewritten to utilize the function-based index.

6.3.8 Other Query Enhancements

This category includes techniques like filtering, subqueries, joins, and stored procedures. None of these techniques are necessary to find data. Instead, they provide mechanisms to manipulate results, limit the result size, or minimize back-and-forth communication between the client and datastore.

These techniques can be critical to building a cost-effective & performant system. We don’t cover them here for space reasons, and because the concepts are generally well understood. For those seeking additional information, good documentation can be found in most SQL texts.

It’s worth noting that it’s possible to live without these features. For example, joins can be eliminated by denormalizing data. The tradeoff will be an increased size of the datastore, or increased write latencies. The net impact varies by application. In summary, one may find that it’s technically possible to live without these features, but not economically justifiable.

6.4 Common Write Patterns

Besides the common write operations (insert, update), there are a few others worthy of mention.


Queues have unique challenges. They need to support what would, in most circumstances, be mutually competing goals. These include:

Satisfying these goals requires specific optimizations to meet the throughput and performance requirements.

At low volume, it’s possible to use a plain vanilla b-tree & range queries as a queue.

At high scale, it’s important to pick a product specifically optimized to support queueing features.

Bulk Writes

A bulk write is the action of writing many records within a short timeframe.

Bulk writes can arise in a few situations

A storage engine optimized for website use cases (high reads/low writes) will generally fail when bulk writes are attempted through the same interface.

A common failure mode is that indexing throughput drops considerably as the index size grows, to the point that an operation expected to take days can require months. Another failure mode can occur if the logging capability, storage space, or replication system is not appropriately sized to handle the influx of writes.

In the worst case, a project can be delayed if requirements for bulk writes are overlooked. For example, if a user expects to export Oracle data accumulated over many years into a BDB, they may discover it will take 6 months to backfill via 1-by-1 inserts.

Special tools and import mechanisms are generally necessary for bulk writes. When evaluating storage solutions, look for documentation on how it supports them (if at all).

Bundled Writes

A bundled write is the ability to combine multiple writes into a single unit that is written or failed as a whole.

Transactional databases support this feature via commit/rollback capabilities. Other storage solutions may offer “bundled writes” in a non-transactional fashion. For example, a batch of writes applied eventually, but with no guarantee of timeline, sequence, or ability to rollback. The latter can be useful, but is very different from a transaction, and care should be taken not to confuse the two.

When using the latter as a pseudo-transaction, users need to be prepared for partially committed data.


For storage solutions that differentiate between inserts and updates, it is sometimes desirable to bundle the two. For example, setting 1-Click enabled for a customer requires inserting the preference if it doesn’t already exist, or modifying the preference if it does.

This can be a hassle to get right, especially when uniqueness constraints and race conditions come into play. It is a nice convenience if a storage solution offers this feature natively.

The feature is also sometimes called “upsert”, “make-it-so”, “blind writes” (because existing data isn’t read first) or simply “put”.

Update From Query

This is the ability to update multiple records based on the result of a query. For example, if we wanted to disable 1-Click on all customer addresses, we might execute (in SQL): “update addresses set one_click_enabled=false where customerID = X and one_click_enabled=true”.

This is a contrived example, and would be dangerous if the customer had 100,000 addresses. However, the technique can be useful when the result size is bounded at a known limit. It offers a performance improvement because rows don’t need to be read and then written back.

Deletes in a replicated system

Deletes require special handling in a replicated system. To understand why, consider the following sequence of operations:

Replication layers often place no guarantee on the ordering of message delivery. So, another node might hear:

A record that should be deleted now exists on the node.

To handle this, records are tombstoned instead of deleted, where the tombstone is simply a bit on the record indicating a logical deletion. The operation now becomes:

And, if another node hears the reverse:

Everything is consistent in the end.

To reclaim the space, an out-of-band sweeper will scan for tombstoned records older than the maximum replication delay (generally a week or two) and permanently delete the data.

Tombstone deletion can be a source of operational pain. One type of implementation will scan the datastore looking for tombstoned records. If data resides on magnetic disk, this can place a severe burden on the I/O layer. Users must ensure enough IOPs (I/O operations per second) are available for the sweeper. Another implementation type will add the ID of the deleted record into a pending deletion queue. This has the odd side-effect of increasing the size of the datastore upon deletion, until the record is swept.

When using deletes in a replicated system, it’s important to understand how tombstones are managed and to ensure machine resources are available for the additional overhead.

Tombstones have another weakness in that they don’t reliably support re-creation of records. If a tombstone is in the way, the creation attempt is ignored. Also, the sweepers might not be in-sync with each other, and so a race condition exists. The record might get accepted on a subset of hosts, producing inconsistent replication.

6.5 Read vs Write Rates

Storage systems are optimized for either:

Doing both in one implementation is usually impossible, or obscenely expensive. The best way to combine the features is with two systems: one accepting high writes and publishing asynchronously to a different system supporting high reads. The latter may take advantage of parallelization or bulk update features to get all the data in place.

A high read rate is generally considered to be 90/10 (reads-over-writes), although some systems can go to 70/30.

Examples of high-read applications Most customer facing data is high read. For example, the customer’s name and email is accessed many times, but rarely updated.

Example of high-write applications High-write applications are also called logging applications.

The classic logging application is metrics collection. Many individual data points are captured and then shipped off to another reporting layer which summarizes and presents the information.

Tax audit records are another example. At the time a payment occurs, upwards of 12 tax audit records can be written. These records are not accessed until later, when refunds are needed or a government initiates an audit. Most records are never read at all.

6.6 Performance Implications

At the time of this writing, the primary bottleneck in storage applications is disk I/O. It far outweighs the other bottlenecks, such as CPU, lock contention (see Concurrency) or network degradation (see Latency).

Some storage systems, notably SABLE, have attempted to sidestep the I/O problem by keeping all data resident in memory. However, this is cost prohibitive for most applications.

The name of the game in I/O tuning is reducing disk seeks. This is the time it takes to move the head of the disk to the correct physical location to read/write date. On a typical machine, disk seeks can average around 9ms.

(Minor clarification: Disk I/O latencies are a combination of seek time, rotational time, and transfer time. Seek time is the dominant factor, so people often refer to the latency simply as “disk seeks”.)

Range Query Optimizations

To highlight a worst case scenario, imagine we wish to query all customers whose last name = “Bezos” and return the first 100 results. The query starts with a range scan on a name index. Then, for each matching record, we load the customer details. If each customer record resides in a different location on disk, then loading the details requires (100 seeks * 9ms) = 900ms; or, nearly 1 second in I/O.

The principal ally in the fight against disk seeks is data locality. The goal is for data accessed together to be stored together on disk.

For range queries, data locality can be achieved by using clustered indexes. A clustered index stores the data blocks in the same order as the index itself. In other words, the customer records would be stored on disk sorted by last name, to match the index.

By it’s nature, only one clustered index can exist on a table. So, a clustered index doesn’t help if we want to search by something other than last name. For example, a search by telephone number derives no benefit from the clustered index on name.

When range queries can hit multiple dimensions of a table (name, phone, email…), disk seeks can be minimized by using covering indexes for each dimension. A covering index “copies all the data into the index”. There might be one index for name, another for phone, etc…, and each index contains a copy of the customer details.

There are two obvious costs with the latter approach. The first is explosive growth in the size of the datastore as the customer details get copied into each index. The other problem is explosive growth in the latency of writes when each modification gets applied to all the indexes.

It’s the age-old tradeoff: big indexes and low latencies, or small indexes and high latencies. There is no magic bullet, except to avoid disk seeks by keeping data in memory or using SSD drives. (As of 11/2010, both options are still cost prohibitive for most applications.) There are a couple ways to ameliorate the problem.

The high write latencies can be hidden from users by updating indexes asynchronously. In other words, if users don’t need to read the indexes immediately, it is not necessary to block the users while indexes are updated. A background process can update indexes at it’s leisure.

The other problem of huge index sizes can be minimized if the application is lucky enough to have the notion of “hot data sets”. This means that, even if lots of queries are hitting different dimensions of the data, they are all returning the same core results. For example, imagine that instead of querying all customers, we had a single customer that was querying their own personal address book. They may want to sort their addresses by first name, last name, creation date, etc… All the queries hit different dimensions (name, date…), but they all return the same information (the customer’s addresses).

In this case, it is possible to keep the hot data (the customer’s address book) spatially close together on disk, or cached in memory, and simply join the query results with this data. It is not necessary to copy the details into each index.

Write Optimizations

The clustered indexes described above are the enemy of write throughput. Instead of simply writing data wherever convenient, the storage engine needs to find where to write it, seek to that location, and perhaps even rebalance all the existing data to make room for the new addition.

Applications optimized for write throughput typically give up range queries. (To be clear, they can asynchronously push data to something that does support range queries, but synchronous indexing is out.)

A write intensive application might use something like a log structured file system. Such a system will write data sequentially, to the head of a log file. In the background, another thread will periodically scan the new data and write it to an index.

6.7 Summary of Tradeoffs

The trade-offs form a sort of tunable performance dial with “write throughput” on one end and “query complexity” on the other.

The dial starts with logging-based storage that has no query capabilities. This works perfectly for applications like metrics collection. Data is written sequentially, wherever convenient. The data is shipped elsewhere for analysis.

Turn the dial one notch and Primary Key lookups become possible. Data is written to any convenient location in the heap, but an additional hash-based index lets us find the data.

Another notch, and Primary Keys begin to use clustered Indexes. It’s now faster to read by Primary Key, at the expense of having to write new records to a specific location rather than anywhere in the heap.

Keep turning the dial and Secondary Indexes become available. There is more work for each write, and the data store is getting bigger, but there are more ways to access the data.

Past the halfway point, the dial reaches into Range Queries. Writes get more expensive, but the supported query expressions become much richer. Now we can iterate over multiple records.

Finally, we reach the other end of the dial. We have multiple Covering Indexes supporting a variety of range queries with high disk locality. Query performance is great, but we’re feeling pain on the datastore size and write latencies.

From these core settings, multiple systems can be assembled to form a complementary whole. Metrics is the classic example of a complementary system. On one extreme is metrics collection: essentially write-only storage with extremely high throughput. Periodically, the data is shipped to another system where the dial is cranked fully in the opposite direction: no writes are permitted, but users can execute a huge variety of rich queries.

The cost of a complementary system is the loss of synchronous updates, as well as the operational burden of piping data between systems.

7 Scaling

7.1 Scaling Dimensions

There are five limiting factors to datastore scalability:

  1. Write throughput, primarily limited by:
    • Lock conflicts. Multiple writes vying for the same resource become serialized. In practice, this can be the biggest drag on throughput. See #Concurrency.
    • Indexing Complexity - The more indexes, the more work upon data modification. See #AccessPatterns.
    • Replication overhead – While there are many flavors of replication, all impact write throughput to a degree. For example, synchronous replication will block until a change has been ACKed by a replication peer, which can be delayed if the peer is slow or in a less available location. Alternatively, asynchronous replication enqueues the change into a “pipeline” for later replication, but there is a limit at which the pipeline fills up. The limit is a function of the number of replication participants, as well as the quantity of data and the retention period.
    • Disk IO
  2. Read throughput
    • Primarily limited by disk IO. In particular, random access.
  3. Datastore size
    • The size of the persisted datastore in comparison to the available disk space. Or alternatively, the size of the “working set of data” in comparison to the available cache space, where frequent fallback to disk results in an unacceptable rise in latency.
  4. Overlay network
    • A storage system with many participating nodes typically maintains an overlay network, or a routing table, tracking which peers exist and the paths between them. At the extreme, every member tracks every other. In practical experience based on typical hardware, there is a limit of 500-1000 participants in such a group before the overhead of maintaining the topology impacts the performance of the datastore. You’ll see memory contention and scheduling/resource management issues. (The number of TCP connections isn’t typically a problem, assuming the machine is tuned for it. See SocketPressure).
  5. Operational support burden
    • When a storage product scales by linearly adding people to maintain it, it’s not really scalable.

7.2 Scaling Solutions

7.2.1 Bigger Hardware

A bigger machine helps scale along several dimension; more IO, more disk, more CPU, and no additional overhead for humans to maintain. “Big iron” was the way most teams scaled before other solutions were available, and has attained somewhat of a reputation as a joke. Eventually, teams arrived at the biggest machine and had nowhere else to go.

However, the technique still works for systems growing at rates under approximately 30% per year. Essentially, this is scaling by Moores Law and it’s variants. If the annual increase in computing power matches the increase in utilization, one can be in the enviable position of scaling by doing nothing more than handling lease returns.

7.2.2 Compression

Compression helps to reduce the data store size, and also helps read/write throughput by reducing IO and allowing more records to fit within a cache.

By its nature, compression is a one-shot operation and not a repeatable scaling solution. However, users often see a 50% reduction in data size and the technique is valuable.

When a storage product offers built-in compression mechanisms, it is generally wise to use it rather than rolling your own. The product can compress more things, more intelligently, than a user typically can. This includes compression of indexes and other internal structures.

The tradeoffs for compression include slightly higher CPU utilization, plus slightly higher latencies when querying non-indexed fields where many records may need to be uncompressed to test whether they match the query expression.

7.2.3 Caching

The prevailing perception of caching is that it scales read throughput beyond what a disk-backed store can support, by keeping the most frequently used data in memory.

In such an architecture, a cache failure is cascading & unrecoverable. In the event of failure, all traffic falls back to disk, which is not scaled to handle the load without the cache. IO contention hits its limit, backups accrue, and clients see timeouts. When the cache is brought back online, it is empty and queries continue to hit the disk, which continue to timeout. The only way to recover is by shedding load until traffic matches the datastore capacity, and to gradually ramp up traffic as the cache is repopulated. This is guaranteed to be a prolonged outage.

Cache failures aren’t rare. Of course, a bad deployment or bad hardware can wreak havoc. But, they can also fail if the cache is “blown out”. For example, a client can iterate over the data store, or backfill at an unsustainable rate. These operations can fill a cache with a huge volume of useless records and flush useful data from the limited space, effectively simulating a cache failure. (A Segmented LRU cache can help protect against blowouts.)

When using a cache to scale read throughput, it is critical to either:

The alternative strategy for caches is to treat them as a way to improve latencies, but not throughput. For example, if the cache fails, then requests fallback to disk and clients see higher latencies. However, clients don’t see an outage because the backing system is scaled to survive without the cache. This implies caching is no longer a scaling solution. It only helps latencies.

Another problem, from a manager’s perspective, is that it costs money. One has to pay for the entire disk-backed system to handle the load, plus a caching system to reduce latencies. And a final problem, from an engineering perspective, is that few people will effectively monitor when traffic has exceeded the capacity of the disk-backed system. Only on failure is it recognized that, oh crud, there isn’t actually enough capacity in the backing system.

Caches are scary. After human error, caches are the most likely reason for a catastrophic, prolonged failure of your system. They are also extraordinarly useful from an economic and performance angle. It is always cheaper and faster to serve data from an in-memory cache. As usual, one must recognize the risks of the tradeoff and plan for the consequences.

For reference, here are two interesting examples of cache failure:

7.2.4 Replication

Replication helps scale read throughput. It does this, of course, by making more copies of the datastore available for reads. In addition, some implementations use stickiness on a particular replica to improve cache hit rates and achieve higher throughput (with the risks of relying on cache hits, as described above).

Replication also solves problems not directly related to scaling, such as availability and backups.

Replication does not help with write throughput, and can hurt it. Replication overhead introduces additional expense on writes. Synchronous vs Asynchronous Replication

Asynchronous replication uses log shipping or some other delayed transport mechanism to apply updates to all members of the replication group. Asynchronous replication is great in that it doesn’t interfere with real-time traffic. It has problems, however. The first problem is that a machine can die, taking unreplicated writes with it. The other problem is that asynchronous replication introduces eventual consistency on the replicas, which is not a tolerable solution for everyone.

Synchronous replication attempts to propagate changes to peers in real time, blocking on the result. The tradeoff is that when a peer is unavailable or slow, users will experience a drop in availability or poor latencies.

In practice, a hybrid approach is typically used. Synchronous replication is attempted to a set of peers.

Users can tune the number of peers to block on depending on the goals for consistency, latency, and durability. If a peer fails to ACK within the allowed timeframe, an asynchronous mechanism takes responsibility for driving the updates to the remainder of the replication group. Physical versus Logical Replication

Physical replication copies the disk blocks from one machine to another, byte by byte.

In contrast, logical replication transports a set of replayable statements to a peer. For example, if a user executed the statement “update customers set email = ‘jjj@xxx.com’”, the statement would be transported and rerun on the peer.

The trouble with physical replication is that disk corruption in one machine can replicate to others like a virus. Sometimes the corruption is immediately apparent. Other times, a single corrupt block can lurk undetected for an extended period of time. Later, when a user attempts to modify the block, the entire replication group will suddenly crash simultaneously with no obvious root cause. In this case, it can be a painstaking effort to find the offending corruption and determine a course of action to fix it. Often, the only recourse is to restore from backup.

On the other side of the fence, the trouble with logical replication is that it tends to be buggy. One source of bugs is that writes must be replicated and applied in the correct order. This can be difficult to achieve atop asynchronous replication with no guaranteed message ordering. Another problem is that functions must be resolved prior to replication. For example, the statement “update customers set last_authentication_date = sysdate()” must have the sysdate() value resolved prior to replication, else each peer generates a different value. The more complex the storage system, the more likely bugs will arise within logical replication.

Logical replication tends to work best in simple key/value database products that are built from the ground up to support it. These systems have versioning and conflict resolution mechanisms baked in, and they don’t have to worry about side effects from features such as foreign key constraints and unique secondary indexes.

In practice, it’s hard to avoid physical replication. For example, when introducing a new machine to a logically replicated group, it’s infeasible to bootstrap the new machine via logical replication. The logical history isn’t maintained forever, and if it were, it would take ages to apply. Instead, a new machine is typically bootstrapped by physically copying the data from a peer, then logically applying the most recent updates. One gets to enjoy both disk corruption and inconsistencies! Inconsistency Resolution

Consistency is like inventory. It shouldn’t be wrong, but always is. To be clear, we’re talking about rare, lingering, unplanned inconsistencies between members of the replication group. The primary sources of inconsistency are bugs, conflicting concurrent updates, and machine/network failures (often in conjuction). Inconsistencies can also arise by design, typically due to compromises made to maintain availability in the event of machine/network failure.

Inconsistency resolution is sometimes called “anti-entropy” in reference to the tendency for consistency to naturally decay unless external forces are applied. There are three general approaches to resolving inconsistencies:

  1. Do nothing proactive.

This can be OK when the price of inconsistency is small and correctable. For example, an inconsistency in a customer’s 1-Click status is annoying, but not critical. The customer simply clicks a button to reenable 1-Click, which re-updates the replicas and hopefully restores consistency.

Quorum systems, because they are reading from multiple nodes simultaneously, also have the ability to detect inconsistencies and repair them at read time. This is known as “read resolve”.

  1. Get fancy

Some systems employ tricks to identity inconsistencies at a (theoretically) low cost. A well-known example is the exchange of Merkle trees between nodes. A Merkle tree contains checksums for various levels of the storage hierarchy. The idea is that one can quickly test for inconsistencies between two datastores as a whole by comparing the top-level checksum, then zoom in on the location of the inconsistency by drilling down into the lower level checksums.

These fancy solutions are problematic under high write rates. The problem is that it takes time for a node to ship its checksums to a peer for comparison. By the time it arrives at the peer, it is very likely to be stale. It is difficult for multiple nodes to agree on a single snapshot in time under high writes, especially in a multi-master environment where all nodes are receiving & propagating different updates.

  1. Brute force

The brute force approach involves one machine iterating over records one-by-one and calling out to check if another node contains the same information. When a conflict is discovered, it can choose the correct answer (in whatever way makes the most sense) and repair the broken node.

The brute force approach can be made relatively efficient by only iterating over recently modified records, and by electing a single master to perform the iteration. Along these lines, some teams designate a single member of the replication group as an “admin”. The admin machine does not serve customer facing traffic. Instead, the purpose of the machine is to run administrative tasks such as consistency checks, backups, and bootstrapping new members of the group. Using an isolated machine for these purposes ensures that customer latency is not impacted by these activities.

7.2.5 Partitioning

The purpose of partitioning is to increase the ability to handle larger write rates and datastore size by dividing the information across multiple physical machines.

Partitioning can increase the complexity of the overlay network because it introduces more bookkeeping to find the appropriate home for a record. These scaling limitations will be discussed later.

There are two primary means of partitioning: static partitions & distributed hash tables. Static Partitions

Static partitioning splits the data into N fixed size segments containing subsets of the data.

Within each partition, there can be one-or-more replicas containing identical copies of the data. Nothing is shared between partitions. This type of setup is also called sharding.

Determining the Partition

A routing layer is necessary to find the correct home for a piece of data.

Routing involves translating some sort of ID into a partition, and forwarding requests to the correct partition. The ID is called the Partitioning Key. To understand a good routing scheme, let’s begin with a bad one. Imagine using customerID as a Partition Key. Bad routing might use a modulus of the customerID to determine the partition. For example:

The first problem arises when repartitioning. Under increased load, it may be desired to grow from 4 to 5 partitions. The most efficient way to accomplish this is to move 20% of the records from each partition to a new partition. Unfortunately, under the modulus scheme, there is no way to keep 80% in place and move only 20%. This is because, under a new modulus, each key can map to a new partition. Nearly all the data must be reshuffled. This is more complicated to implement and execute.

The other problem is that CustomerIDs are not randomly distributed. In fact, customerIDs increment by 10 within a particular region.

Under the naive modulus scheme, customers are unevenly distributed across partitions. In fact, two of four partitions are completely unused.

Most routing mechanisms solve these problems with two techniques. First, the Partition Key is hashed to a uniformly distributed value. (For the sake of example, we’re going to use a hash function with 32K hash buckets, simply because 32K is a nice small number and easily divisible.)

Next, a lookup table maps the hash value to a partition.

Growing from 4 to 5 partitions is a matter of updating the lookup table. 20% of each range is carved out for the new partition.

Of course, the underlying data must be moved as well. This will be discussed later.

Formally speaking, the output of the hash function is called the keyspace. The lookup table is called the keyspace partitioning scheme. One should choose a keyspace that is safely larger than the number of potential partitions, to allow plenty of room to grow. Many applications simply use SHA-1 to hash into a 160-bit keyspace. The expense of the SHA-1 hashing is often minimal compared to other lookup costs.

Fine-Grained Partitioning Control

Sometimes, a single Partition Key can experience abnormally high volumes. For example, wishlist addresses on are owned by CustomerID 7 (it’s a long story). This means whichever partition holds customerID 7 will be out of balance compared to others. Another example is a catalog entry which happens to be an in-demand toy for a holiday season.

For this reason, it can be useful to have finer-grained control over partitioning. This typically means a “manual override” that forces a particular key into a particular location, regardless of other routing rules. The other location can be an isolated datastore that isn’t shared with any other keys.

Secondary Keys

Consider the case of a Customer entity with a primary key (CustomerID) and a secondary key (Email). Partioning is based off CustomerID. How can users query by Email?

A secondary key lookup is required to translate an Email into a CustomerID. If the number of records is small, the mapping table can fit into a single datastore. If the mappings are numerous, they may need partitioning too.

Physically routing to the correct partition

If each datastore can be fronted with an HTTP webservice, then the simplest routing implementation is to toss a load balancer in front of each partition.

Maintaining a routing table has a few benefits. First, one can “subpartition” the partitions, consistently routing to a single machine:

Consistent routing to one machine, also called stickiness, will improve the cache hit rate and can help with consistency as well. (This is discussed later in Consistency). Another benefit to maintaining a routing table is ease of scaling because adding a new partition will not require provisioning & configuring a load balancer.

As mentioned previously, there is an effective limit of 500-1000 machines in a centrally managed routing table before the overhead of maintaining the table begins to impact performance.


Building a new partitioned system is easy. Repartitioning is hard. Consider the case of growing from two to three partitions:

We need to remap 1/3 of the keyspace (or hash buckets) from each partition to the new repository. However, we cannot physically move the data in an instant. A new partition is slowly brought up to date. There will be an interval where the keyspace is jointly owned by two partitions.

To begin, we copy existing data and begin replicating new writes for the keyspace undergoing migration:

Two problems immediately present themselves. The first is that data is not neatly arranged by keyspace in the existing partitions. Finding the relevant records to copy requires a full scan. If not throttled, this will consume excessive IO resources and blow out caches by bringing inactive keys into memory. Latencies on the machine will spike, impacting customers. The other problem is that persisting new updates to both partitions requires a complex replication model that isn’t always supported by existing products. For example, partition 3 needs to participate in the replication groups for a subset of writes to partition 1 and 2. This would be totally bizarre in any other circumstance. If the replication product doesn’t support this arrangement (and most don’t), then the user bears the brunt of copying updates to the new partition.

Now, assuming partition 3 is synchronized & up-to-date, traffic can be rerouted:

Next, we need to delete the old data from the original partitions. This introduces yet more problems. Full scans are needed (again!) to find the information to delete. Then, deletes are executed, which are not cheap. (See Access Patterns, Deletes in a Replicated System.). Finally, after the records are physically deleted, we’re often left with a fragmented datastore consuming the same space.

Finally, the original partitions need to be defragmented. In some implementations, the only way to defragment is to delete all files and restore from a peer. After defragmentation, repartitioning is complete.

The important takeaways are that:

  1. Repartitioning takes time. It can take hours, days, or even weeks if the job is scheduled at low priority to avoid customer facing impacts.
  2. Repartitioning takes resources. It can put huge burdens on the systems to scan, copy, and delete large numbers of records.


When short of capacity, repartitioning doesn’t help. Repartitioning kills.

Effective monitoring is critical to ensure a system never reaches this juncture. If it does, the only recourse is to shed load, take an outage, or attempt to ride it out until traffic decreases during an off-peak window, during which emergency measures can be taken.

Repartitioning By Doubling

Because of the problems described above, a common shortcut to repartitioning is to double capacity. First, each replication group is doubled in size.

Adding new members to a replication group involves a copy operation, which is cheap. After the new members are synchronized, they can be segmented into new partitions.

After the partitions are split, half the data can be removed from each.

Like amoeba, each partition has divided into two new entities. It’s a simple mechanism that works with most replication technologies. The problem is that it scales at exponential rates. It’s OK when going from 2 to 4 machines, but is harder to justify when scaling from 1024 to 2048 partitions.

Despite this limitation, some teams have run for many years with such a scheme. This is because the time between repartitioning also tends to grow exponentially. If 1 partition lasts a year, then 2 partitions last 2 years. By the time one reaches 16 partitions, it might be good for 16 years. These teams are overprovisioned, but have made the economic choice to waste hardware cycles rather than to waste engineering cycles building and running a more complicated solution. Static Partitioning vs. Distributed Hash Tables

Problems with Static Partitioning

Historically, static partitions required lots of manual setup and configuration. One needed to calculate the number of partitions required, and then devise a way to designate whether a host is part of a partition, and then build a routing layer. There were no out-of-the-box solutions and different teams constructed different mechanisms, with partition membership often based on things like hostclass or static configuration files. With large numbers of hosts, these mechanisms can grow unwieldy to manage. Imagine a file with a thousand machine names, and keeping that file accurate during host exchanges and lease returns. This overhead placed an effective limit on the number of partitions and hosts.

In addition, scaling for writes and datastore size required repartitioning. Since it was not done often, repartitioning most likely performed poorly, lacked documentation, or didn’t exist at all.

Repartitioning also increased the size of the partitioning table exponentially. We saw this previously when going from 4 to 5 partitions. The table size doubled.

Now imagine a 160-bit keyspace where the number of partitions has grown incrementally from 2,3,4,5,6,7,8,9… to 100 partitions. The mapping table would contain 633825300114114700748351602688 entries (2^99). This places an effective limit on the number of repartitioning events. This is another reason why simpler repartitioning schemes, such as “Repartitioning By Doubling”, are popular. In simpler schemes, the lookup table grows linearly with the number of partitions.

Finally, static partitioning doesn’t allow fine-grained scaling. For example, consider this system with 4 partitions:

There are only two scaling dimensions. One could add a new partition (25% increase), or add a new host to each partition (33% increase). There is no way to scale either dimension by a smaller value. For example, it’s impossible to add one machine and have it take a little burden off all other machines across all partitions.

Finally, there is rarely a mechanism to descale. For example, in the “Partitioning by Doubling” scheme, it’s easy to split 1 node into 2. However, there is no corresponding trick to merge 2 nodes back into 1. This means it’s impractical to split partitions for holiday load, then remerge them in January when traffic comes back down.

In summary, static partitions are static (duh!). They don’t change easily. It takes human effort to scale up and down, and the effort increases with the quantity of partitions. This, together with limitations on the size of the partitioning table and the overlay network effectively limits the size of a partitioned system to 500-1000 nodes. In addition, teams will say “repartitioning is painful, so let’s scale up big so we don’t have to repartition again for a long time”. For this reason, statically partitioned systems tend to run overprovisioned, with wasteful hardware utilization.

Theoretical fixes by Distributed Hash Tables (DHTs)

DHTs are often promoted as an antidote to static partitions. DHTs strive for elasticity and unbounded scaling. For some, the dream is of a storage system that can adapt to load by automatically scaling up and down, and grow to millions of nodes without bottlenecks.

To accomplish this, DHTs eliminate the notion of statically defined partitions and the management burden that comes with them. Instead of allocating the keyspace with a static mapping table, the keyspace is typically modeled as a ring:

As machines are added, they take ownership of a range of the keyspace. This effectively “partitions” the keyspace, but the partition boundaries are fluid. The boundaries shift automatically as machines come and go. (In real life, multiple machines store redundant copies of the key space to prevent dataloss when a single machine dies. But, we’ll ignore redundancy for the sake of keeping this example simple.)

The other line of attack by DHTs is to eliminate the central routing table. There is no single authority that says “for key X, the data can be found on machine Y”.

Instead, each machine knows only its current position on the ring plus the position of a few neighbors. When a query arrives, it lands on a random machine. This machine will take the Partition Key (let’s say CustomerID 1232131), and hash it into a keyspace value (let’s say position 13932). Then, the random machine asks “do I own position 13932”? If not, it picks a neighbor closer to position 13932 and says “Hey Bob, pass this on.” In this way, the request hops across machines until eventually arriving at one that says “Yes, I own position 13932 and the query result is ”.

Because there is no central routing table, there is no theoretical limit to the number of nodes in the system. It can grow to millions without hitting a bottleneck. The tradeoff is that additional hops are required to find data. It can be an art form to devise a network topology that minimizes the number of hops (and the distance of each hop), but a good implementation can achieve lookups in O(log n) time, where n is the number of nodes in the system.


The previous illustration showed storage nodes distributed around a ring in perfect intervals. In practice, node assignment is often based on the random output of a hash function. The resulting assignments can be “clumpy”. To help balance load in a clumpy world, most solutions use the concept of multiplicity. This means a single physical machine will contain data from many logical nodes on a ring:

With enough nodes, and enough multiplicity, the clumpiness is averaged away and each physical machine carries a balanced load.

For simplicity of illustration, further examples in this chapter will continue to ignore the difference between logical and physical nodes. Readers should be aware that real implementations will be more complicated then the examples might indicate.

Problems with Distributed Hash Tables

Distributed Hash Tables originated with peer-to-peer file sharing networks. Only recently have they been adapted into datastores for website. If there is one major fault with DHTs in this context, it’s that they’re new. Life on the bleeding edge isn’t easy. One can read about the pains of Dynamo and Cassandra. Another implementation, called Project Voldemort , explicitly says “It is still a new system which has rough edges, bad error messages, and probably plenty of uncaught bugs”.

Naturally, first implementations tend to be feature-poor, too. They might only support key-value lookups (no range queries or secondary key indexes). They might also lack backup functionality and other operational tools needed in a tier-1 authoritative datastore.

However, beyond the normal growing pains, a number of critical flaws pervaded the first generation of DHTs.

The biggest flaw is that the fluid partitioning in DHTs means repartitioning occurs everytime a machine joins or leaves the group. (The pain of repartitioning was described previously.) In other words, DHTs took the most painful & expensive aspect of static partitioning and made it a common everyday event, without any effort to reduce the pain. When a machine dies in a DHT, it triggers a repartition. If a machine is added, it’s a repartition. Quarterly lease returns? That’s a fleet of repartitioning events. Q4 scaling? Better begin in September, because it will take weeks to introduce all the hosts. They need to be trickled into service slowly to avoid swamping the system with repartitioning events and impacting latencies.

In particular, it’s worth emphasizing that scaling up read capacity by adding more machines is a repartitioning event under DHTs. This is not the case under static partitioning, where adding replicas simply involves cloning the existing datastores. Because scaling for read capacity is typically the most common scaling event, it is important to recognize this difference.

Another problem with traditional DHTs is the number of hops required to find a query result. Because of tight performance goals, the practical implication is that only one hop is desired. To achieve this, every node must be aware of every other node, so requests can be routed to the correct owner in one hop. This requires a full mesh topology.

As mentioned previously, there is a limit of 500-1000 participants in a full mesh before the overhead of maintaining the topology begins to constrain system resources. Gossip protocols can be used to reduce the communication overhead and scale to larger sizes, but such a system cannot reach “unbounded scaling”.

Finally, there is the question of adding a new machine. Where should it go?

Ideally, we could reshuffle all the nodes to make room for the new addition. This would take a little load off everybody.

However, reshuffling means a global simultaneous repartitioning of the world. With the high cost of repartitioning, this would be a prolonged latency spike; possibly an outage. Instead, most DHTs use a variant of Consistent Hashing. A new node only impacts the immediate neighbors.

Still the question remains: where should the new node go? Since the goal is to avoid human intervention, an automated decision must be made. The node can be placed in a random location, or the system can make an effort to determine the “busiest” node and take load off of it. The busiest node might have the most disk utilization or the highest request rates. (Warning – if multiple machines are added simultaneously, some naive implementations will assign all hardware to the exact same busy location in the ring.) Regardless of how the decision is made, the new node will probably end up being placed midway between two other nodes.

In a well structured arrangement, all the nodes should be equally balanced; meaning they store similar amounts of data and handle similar loads. In such an arrangement, adding one machine puts the system out of balance.

A system is only scaled to the level of its weakest participants. It doesn’t matter if nodes E & B are OK if the remaining nodes are low on disk space. Multiple hosts must be added to maintain balance.

Let’s look at the final result. What effectively happened is that capacity doubled. This outcome is identical to the previous discussion on Repartitioning By Doubling. But instead of static partitions, we used a complex mechanism to achieve the same result.

So who should use DHTs, if anybody?

DHTs are not currently appropriate for small scale systems storing homogeneous data with predictable growth rates. An example would be storing Customer records. Each record is the same size and the growth rates follow a predictable pattern over the season. The entire datastore can probably run atop commodity hardware in under a dozen partitions. This is easily manageable with static partitioning. A DHT is unnecessary complexity. In addition, most DHT solutions perform poorly at low scale. They are optimized to run on fleets containing hundreds or thousands of nodes.

On the flip-side is a product like S3. It contains heterogeneous data with unpredictable request rates. Anybody can stick anything in there, and query it whenever they want. It would be extraordinarily difficult to keep a system like S3 balanced on static partitions. DHTs are a practical necessity. These systems also have huge scale, and they have dedicated engineers to work through the technical challenges.

Most of us don’t own massive, generic storage products, and don’t have engineering headcount to play with bleeding edge technologies. For this reason, DHTs aren’t useful to most engineering teams.

New Directions with Partitioning and DHTs

Despite the failures of DHTs, this doesn’t imply the problems with Static Partitioning have gone away. Solutions are still needed.

The current trends have DHTs and Static Partitioning merging into a hybrid. These hybrids often keep the simplicity of static partitions, but introduce cheaper and faster ways to rebalance.

Because the largest bottleneck in repartitioning is the random IO required to find/copy/delete subsets of data, the new approaches usually focus on ways to minimize random IO. For example, Dynamo was refined by dividing the keyspace into many tiny, static partitions. Each tiny partition was stored in its own file. A physical machine might hold 100 of these tiny partitions, in 100 individual files. During a repartitioning event, a subset of the files would be handed off to a new peer. This is cheap and eliminates the random IO.

Another approach to eliminate the random IO impact is to eliminate the spinning disk. Solid state drives can provide this, and experiments are underway.

Repartitioning remains a challenging problem for scalable storage. Research is active.

8 Consistency

Consistency problems arise from replication. It’s a problem of multiple machines, where reads can land on different machines than writes.

There are a range of consistency modes, but they generally fall into three buckets:

  1. Eventual Consistency
  2. Best Effort Consistency
  3. Strong Consistency

8.1 Eventual Consistency

Eventual Consistency was the rage in 2007, heightened by products like S3 and SimpleDB, and by Werner Vogel’s paper on the subject.

Practically speaking, eventual consistency means no useful consistency of any kind. Reads can return stale data for an undetermined duration, called the inconsistency window. The window typically has no firm SLA on the upper bound (or alternatively, the SLA is an embarrassingly large value like 7 days).

Several teams tried to adopt eventual consistency for customer-facing use cases. It didn’t work well.

For example, if a customer on a website adds a new address, the customer expects to see the address on the next page when their address book reloads. Similarly, a new address added in the order pipeline must be immediately available to all downstream services to calculate things like delivery estimates and tax. Telling a customer to wait for the inconsistency window to close is not an option.

In response to customer feedback, both S3 and SimpleDB eventually added enhanced consistency modes. The SimpleDB team gave a presentation about the problems customers experienced with eventual consistency.

Eventual consistency is only useful for applications where the delay between writes and reads is of sufficiently long duration, and where the reader can retry in the event of failure. This is typically backend applications. Eventually consistent systems also benefit from modeling data as immutable. With immutable data, there is no ambiguity over whether a record is the current version or not. Readers either see the right data or no data, and they can retry in the latter case.

8.2 Best Effort Consistency

The rules state that one cannot have both 100% availability and 100% consistency in the event of failure, where failure might be a machine dying or a network connecting severing.

Eventual Consistency sacrifices consistency all the time to attain high availability. Best Effort Consistency says “Wait a minute! We only need to sacrifice consistency during failures, not all the time.” These mechanisms make a best effort to remain consistent in the steady state. However, if a box dies or falls off the network, there might be a brief window of inconsistency (typically on the order of a few seconds) before consistency is reestablished.

8.2.1 Sleep Statements

Let’s assume an inconsistency window is around 50 milliseconds at the four-nines level, meaning 9999 out of 10000 writes replicate within 50 milliseconds. An application can choose to block for 50 milliseconds after writing, but before responding to a caller. Therefore, by the time the caller hears a response, replication has probably completed and any subsequent reads retrieve the current data.

This approach has been used, often as an emergency response to the problems of eventual consistency. It’s not the greatest response, however. First, it adds latency, which is a dirty word. Second, it’s tough to pick a sleep duration that is long enough to minimize inconsistencies, but short enough to escape notice by clients. In practice, clients either see an annoying number of inconsistencies, or annoyingly high latencies for writes. It’s a no-win game.

8.2.2 Consistent Cache

Using the time honored tradition of combining two systems to make a whole, a highly consistent cache can be combined with an eventually consistent datastore to make an overall system that is both consistent and available. New writes are written to both the cache and the datastore. Subsequent reads hit the cache first. By the time the record expires from the cache, replication will have completed. Clients only see inconsistencies when the cache fails before replication is complete, which is a rare event.

What makes the solution attractive is that many applications already run caches, like memcached, as a cheap means of scaling reads. The consistency gains are a free bonus. Some teams have chosen to enhance the caches further by adding quorum reads/writes (quorums are discussed later). This makes consistency more resilient because a single failure in the cache no longer causes an impact to consistency.

Recently, this technique has been trumpeted as an amazing innovation. An overblown example comes from GenieDB, which announced a white paper titled Beating the CAP theorem. Their website exclaims “GenieDB is a revolutionary layered database technology that is changing the way we think about scaling the web.” It’s just a cache over a replicated database.

8.2.3 Stickiness

Stickiness takes a key (SessionID, CustomerID, etc…) and maps it to a physical machine. All reads and writes for the same key are routed to the same machine, thereby ensuring consistency. For example, if data for customer XYZ is written to foo-6001.iad6., then subsequent reads for customer XYZ will also be routed to foo-6001.iad6.com. This provides consistency as long as the machine stays alive. If the machine disappears, there can be an interval of inconsistency while customer XYZ is mapped to a new home.

One problem with stickiness is that it conflicts with load balancing. If a huge burst of concurrent requests arrive for a single customer, they are routed to a single machine and can cause a localized DoS event. Stickiness is most appropriate when caching isn’t feasible and the risk of traffic spikes for a single key can be minimized. An example of this is address books. Customers can have thousands of addresses and query them across many dimensions (sorted by name/date, starting at letter “P”, jump to page 12). It’s infeasible to pre-cache every query dimension. Fortunately, customers don’t make multiple concurrent requests to view their address books. This makes stickiness an effective way to enable multiple query dimensions against address books while ensuring that recently added addresses appear in the query results.

Both stickiness and caching are in use as best-effort consistency mechanisms for customer data on websites. Customers don’t want frequent inconsistencies, but they show tolerance to once-in-a-blue-moon inconsistencies, as long as the problem disappears upon retry.

8.3 Strong Consistency

#### 8.3.1 Master Election

Master election is similar to stickiness, except that an algorithm tries very hard to get distributed consensus on the “right” host. Should the host die, the system becomes unavailable until a new master is elected. Master election isn’t common . One reason is that distributed consensus is a hard problem, only recently solved by services like ALF. The other problem is that if the master dies or is unreachable, the system becomes unavailable for writes until a new master is elected.

Some master election schemes attempt to maintain availability by allowing two masters to run temporarily.

Eventually, the masters are coalesced back into one. This can be considered an overcomplicated implementation of stickiness. The only benefit they claim is automatic resolution of conflicting writes when the masters are coalesced. Unfortunately, the resolution methods usually aren’t helpful. For example, BDB-HA simply throws away all the writes from one of the masters.

8.3.2 Quorum

Quorum requests are blasted out to several machines concurrently. When over 50% of the machines respond, the action is considered complete (the actual percentage is usually configurable). If the action was a write, then everything is finished. If the action was a read, then all results are compared and the most popular answer is the “right” answer.

The SABLE team uses quorums and gave a good presentation on how it’s implemented.

One challenge of quorums is to decide the number of members. Two machines can technically form a quorum, but isn’t a safe choice. If one machine dies, that leaves only 50% of capacity. Quorum reads require over 50% to be available for consensus on the right answer. Therefore, the entire system is considered unavailable if less than 50% of the fleet is remaining.

Three machines is a safer choice, but still scary; We aim to scale for a data center outage plus at least one machine being out of service. To achieve that, the magic number turns out to be 9. Having 9 members in a quorum means that 3 hosts can disappear during a datacenter outage, and one machine can be a lemon, and that still leaves over 50% of the machines available for quorum.

Quorums are a great way to achieve both strong consistency and strong availability. The problem is the expense. It can require 3X more hardware to use quorums instead of cheaper best-effort consistency mechanisms.

For this reason, quorum usage is limited to applications where consistency is highly desired and the cost is justifiable. This is primarily the shopping cart and the checkout process.

Because quorums are so expensive, efforts are often made to expire or archive data out of the quorum when the consistency requirements are no longer required. For example, after an order is completed, it is immutable and rarely accessed. These orders can be archived in a cheap location like S3.

Beside the expense, there are two common knocks against quorums. The first is that they’re slower. This is because requests are sent to many machines and the result is gated on the slowest response. In practice, most quorum implementation offer tunable knobs to control the number of responses to block on, depending on the application goals for latency and consistency. Turning down the knob is often called “sloppy quorum”. If very strong consistency is a requirement, then the application needs to block for more responses and latencies will indeed rise to that of the slowest participant.

The other knock against quorums is that they sacrifice availability in the event that less than 50% of the fleet is reachable. In practice, this isn’t a common event.

First, few applications would stay alive if over 50% of capacity were annihilated. We scale our applications to lose a little over 1/3 of capacity, but not 50%. So, quorum or not, the application would fail if over 50% of hardware disappeared. (Fortunately, losing 50% of datacenter capacity is an almost unheard of event.) The other scenario often mentioned is a network partitioning event, where a subset of the fleet is still reachable by website traffic but isolated from its peers. The isolated group can’t reach quorum and becomes effectively unavailable. This is a failure scenario that is much discussed but rarely experienced.

9 Cross-WAN Replication

### 9.1 Overview

To see cross-WAN replication in action, go to www.zzz.com. (The website is hosted from the US.) Create a customer account and some addresses Now wait a few minutes and go to www.zzz.co.uk. (The website is hosted from the UK.) Log in with the same account and look at your address book. You have just witnessed cross-WAN replication in action.

There are two facts about cross-WAN replication:

  1. It’s a pain-in-the-behind to implement.
  2. It’s going to become more common.

As of 2010, only Identity data is replicated globally. (This includes Customers, Addresses, Payment Instruments, and a subset of Merchant data.)

In contrast, if one were to order something on zzz.com, the order would not be visible in the UK. Orders and Catalog entries are not replicated between websites.

There is a pent-up business objective called “List Once, Sell Globally”. The goal is to allow third-parties to upload inventory in one region and offer it for sale around the world. If this initiative is funded, it will have implications on the number of teams requiring cross-WAN replication.

9.2 Implementations

Global Replication

Global replication is simple to understand. Take a local replication group:

and extend it to replicate to other regions:

Queries can fetch data locally.

While simple to understand, global replication carries repercussions.

The first impact is to the amount of data stored per region. To explain the problem, consider the case where regions are isolated (the common situation today). Not all regions are equally sized. Let’s say a service holds 500G of data in NA, which is the busiest region. EU is the next busiest, with 200 G of data. Tiny FE holds 50 G. For the sake of example, we’ll say the service stores data in shards with 50G per partition.

If replicated globally, it means every region now holds the combined sum of all regions.

The total number of partitions has increased from 15 to 45. That’s a 3X increase in storage costs. Even more troubling is that most data is never accessed outside the region in which it was created. For example, how many people really create accounts on zzz.com and then use them to shop on zzz.co.uk? It’s less than 1%. Much of the data is never accessed.

Another repercussion of global replication is that replication cannot be synchronous. The WAN links are too slow and flaky. Availability would plummet if an application attempted to block writes until data had been copied to remote regions.

The implication is that remote regions are eventually consistent. One cannot write in NA and immediately read from EU. This will cause problems if consumers are unaware of the limitation. In addition, because replication is eventually consistent, it opens the door to conflicting writes:

Applications must be prepared for conflicts and have resolution plans in place.

Homing & Cross-WAN Requests

The alternative to global replication is “homing”. Data lives in a single authoritative location.

To explain, consider two international sports stars. Michael Jordan creates an account on zzz.com. His account resides in a database in Virginia. David Beckham creates an account on zzz.co.uk. His account resides in a database in Ireland.

A globally replicated table indicates where each customer is homed.

David Beckham is traded to the Los Angeles Galaxy. He begins shopping on zzz.com. However, his data remains homed in the EU. Fetching his data requires a real-time cross-WAN lookup.

This is implemented by a service delegating to itself.

For example, if a client called the North American instance of AddressService.getAddressById(), it would delegate to the European instance of AddressService.getAddressById() and pass back the result.

There are a number of complications to this setup. The first is latency. Cross-WAN lookups add an additional 200ms to every request. As David proceeds through the order pipeline, there will be an additional 200ms to authenticate, 200ms to load his address book, and 200ms to load his payment instruments. There could even be 200ms on nearly every page to fetch the name and display “Hello David”. Aggressive caching is required to minimize cross-WAN lookups.

Strange things can happen, too. For reasons out of our control, cross-WAN traffic can temporarily be routed through France instead of direct to Ireland. This causes cross-WAN latencies to jump with no obvious reason why.

Beyond the connectivity issues, there are a number of programming challenges.

First, a customerID isn’t always available. During sign-in, we only have a login and password. When fetching an address, we may only have an addressID. We need customerID to find the geographic home, so secondary key mappings are required to translate from email/address to customerID.

If David creates a new address, where should the secondary keys be inserted? Logically, one would think in the home region.

However, this doesn’t work. If David is shopping in the US, the next lookup will come from the US. Remote data won’t have replicated back to NA soon enough.

This means that secondary keys must be inserted in the current region.

There is a similar story with caching. Caches must be invalidated in all regions.

To overemphasize what should be obvious by now, cross-WAN lookups are not simple.

It’s worth noting that because of the globally replicated mapping tables, this solution also suffers the same eventual consistency problems as the previous solution. Even though the data has an authoritative home, that home isn’t discoverable until the mapping tables replicate. In addition, the mapping tables must be used to enforce global uniqueness constraints, such as customer logins. (It’s impractical to check all remote regions in real time to determine if a login is already used by another customer.) The mapping tables are eventually consistent between regions. So, just like the previous solution, two different customers can concurrently create the same login/password in two different regions. The conflicts must be caught & dealt with.

To keep piling on additional problems, cross-WAN lookups are a headache when it comes to worker utilization. Consider all the participants in a cross-WAN lookup.

Several resources are tied up for a long duration. Below is a picture of what happens if cross-WAN availability degrades. Stare at it long enough and you’ll see the opportunity for a global outage. The outage will effect all clients, not just those making cross-WAN calls.

To prevent such an outage, brownout detection is required on both sides of the fence. This prevents all workers from blocking on a single resource. Unfortunately, brownout detection thresholds easily fall out of date as scaling continually changes the number of applications on both sides of the WAN.

Finally, cross-WAN lookups introduce complexity to coding and testing. Each incoming request must be checked to see whether it’s a local or remote query. Based on the outcome, the request proceeds down two different code paths. This means every API must be implemented twice; once for local lookups, and again for remote lookups. On top of that, both code paths must be tested, meaning the test cases double as well. In practice, a significant number of bugs arise from this. One code path always seems to hit a different mapping or translation layer, resulting in different outputs depending on where the data is homed. It also means that deployments cannot be isolated to a particular region. A deployment to the EU fleet, for example, will leak into NA query results due to cross-WAN lookups.

9.3 Choosing a Solution

Currently, Identity Services use Homing & Cross-WAN Lookups. This was a regretted decision.

While normally loath to dictate a solution, this is one area in which people are comfortable saying: USE GLOBAL REPLICATION.

What about the wasted hardware costs? This is one of the primary factors that drove the Identity Team to use cross-WAN lookups. However, in retrospect, wasted engineering resources outweighed the hardware costs. Engineering time was consumed by implementing a cross-WAN lookup framework, modifying every API to use it, doubling the test cases, and then spending years fixing all the bugs that arose (especially in edge cases like caching).

In addition, the Identity team continues to field questions from clients wondering why latencies are so high for certain customers, or apologizing for outages & latency spikes due to cross-WAN flakiness. This operational burden will never disappear.

Instead, the hardware costs should be attacked by finding cheaper storage solutions. Because much of the globally replicated data is rarely accessed, it is a perfect candidate for archived storage. At the time of this writing, the cost of storing 750G in S3 ranges between $41 and $105 per month. That’s nothing. (And that’s the public pricing.) Services like Cascadia are trying to make tiered storage easy. They automatically move data to cheaper storage locations as it ages.

In spite of this, some teams still feel determined to make cross-WAN lookups work. A few counterproposals often arise.

The first is that we should lease dedicated cross-WAN links. That might help with stability, but unfortunately the speed of light places a firm limit on latencies. It’s a tough sell, too, as Networking has no desire to lease dedicated links and they actively discourage users from relying on cross-WAN connections for real-time activities.

Another proposal is to relocate customers to a new home if they are frequent cross-WAN users. This turns a complicated problem (cross-WAN lookups) into three complicated problems: (1) cross-WAN lookups, (2) cross-WAN frequency detection, and (3) rehoming. In addition, it doesn’t prevent a bad customer experience. It only responds to a bad customer experience. And, if the customer continues to shop in multiple regions, the problem can’t be solved by rehoming.

Finally, another proposal is to use cross-WAN lookups for most customers, but move the frequent cross-WAN users into a special pool that’s globally replicated. Similar to the previous solution, this transforms one problem into four problems: (1) cross-WAN lookups, (2) cross-WAN frequency detection, (3) rehoming, and (4) global replication. This is the worst of all solutions.

The recommendation is to globally replicate. It provides the best customer experience, and disks are cheap.

What about the eventual consistency problem? Well, one has to deal with it. Eventual consistency is unavoidable when replicating between geographically distant data centers. There is no magic solution to avoid it.

10 Record Size

When choosing a storage solution, an important consideration is the size of the individual records that will be stored.

There isn’t an absolute value on what’s considered big vs. small. However, as a rule of thumb, one needs to start thinking carefully when individual records approach 4K in size. This is the size of a typical disk page. When records span multiple pages, multiple disk seeks may become necessary and latencies will increase.

It’s time to get seriously worried if the size of individual records approaches 1/100000 of the available RAM on a machine, in particular if the application will be performing range queries. This is because range queries can return many results and these results tend to get mapped repeatedly. For example, the results might go through an ORM translation, and then a BSF marshalling translation. Each translation layer copies data, meaning that memory utilization is multiplied. If the range query contains one hundred objects of very large size, and the translations make multiple copies of the data, and there are multiple queries running on the box, it is very likely that the process will run out of memory.

For this reason, hosted solutions generally put limits on the size of individual records, plus on the maximum results per query. These limits are non-negotiable. They exist to maintain quality of service. They prevent one client from swamping the resource with huge volumes of data and degrading the experience of all other clients.

Finally, while most problems relate to large records, there can be quirks with small records as well. For example, a datastore with many small records will often have a higher “bloat factor”; the variance between the raw data size and the datastore containing it. For example, backfilling 1G into a BDB might produce a file that is 2G in size. The bloat comes from indexes and from the metadata associated with each record. The higher the number of individual records, the higher the amount of metadata, and therefore the bigger the bloat. A database containing one record of size 2G will obviously requires less overhead than a database containing 2147483648 records of 1 byte each.

Another quirk of small records is that deleting them doesn’t always free disk space. If many records fit within a page, and only a few records are deleted per page, then it just creates holes. A defragmentation/compaction tool needs to be run to reclaim disk space.

11 Durability

Durability means preventing data loss. It is a promise that, when a write is accepted, it won’t subsequently disappear.

In the olden days, a database was one machine. The failure modes in this case are obvious: power failure, disk failure, fire, tornado, cooling system breakdown… All have the potential to wipe data from existence. Common tricks arose to prevent this.

First and foremost, durability required data be written to disk before acknowledging a write as successful.

Otherwise, the data wasn’t really persisted. Simply unplugging the machine would wipe it. However, it’s expensive to write to disk. One has to write the data itself, plus indexes, plus multiple disk seeks to the various locations. To achieve both durability and low latency, the trick is to write the minimum amount as cheaply as possible, and to batch up the expensive stuff to perform later out-of-band. This is typically achieved via write-ahead logging (commonly called “redo logs” from Oracle terminology). These logs contain a history of insert/update/delete events that can be replayed to reapply changes to a database. When a write is executed, the database structures are modified in-memory and a very short redo log is written to disk. Periodically, the actual database modifications are flushed to disk. Should the machine lose power, the database can be restored by starting with the old files and reapplying the redo logs since the last flush.

This solved the problem of power loss, but didn’t solve permanent disk failure. Drives can still crash and send the files to database heaven. To address this, redundancy is required. Data must be written to more than one disk. The simplest redundancy uses RAID, or some variation of it, to write a change to multiple physical drives. If one drive fails, copies of the data remain safely available on other disks. Advanced implementations even allow disks to be hot swapped in-and-out without shutting down the system.

RAID supports durability in the event of single drive failure, but has the unfortunate property of keeping all disks in one physical location. If a tornado strikes, or Godzilla rampages across the townscape, all disks are wiped out together. RAID is highly susceptible to correlated failures. Achieving the next level of durability requires replicating to geographically separated machines.

The simplest remote replication mechanism is called asynchronous replication. Periodically, changes are sent to a peer. This could be as simple as shipping the redo logs to another machine every 5 seconds, and applying the recent changes to a copy of the database. In terms of durability, the trouble with asynchronous replication is the delay. If a tornado strikes and destroys the machine, there is the opportunity to lose all writes that occurred since the last log shipment.

Synchronous replication improves durability by sending changes to peers immediately and blocking until a peer acknowledges the event. In this way, every write is guaranteed to be written to at least two physical machines before the change is considered committed. An unfortunate side-effect is that blocking will impact latency and availability if the other machine is slow or unreachable. So, in practice, systems only block for a limited time and fallback to asynchronous replication mode if the peer is unavailable. Some systems also run larger replication groups where changes are broadcast to many participants at once. The broadcaster only needs to block until at least one peer (the fastest) acknowledges the change. In this way, it’s unnecessary to wait for the slowest peer.

Synchronous replication is the best durability technique available, but still isn’t perfect. A tornado could rip through a datacenter, sever the connection between the datastores (cutting the replication pipeline), and then destroy the primary datastore a few seconds later. All writes occurring within those few seconds would be lost because they were never copied to a peer.

Perfect durability is impossible. Somebody with a hammer, scissors, and a knowledge of where to cut can cause permanent data loss. But, with synchronous replication, the odds of such an event occurring can be made extraordinarily improbable.

The final safety net in the durability game is backups. These are periodic snapshots archived in a separate location from the primary datastore. In the event of catastrophic failure, they cap the damage. For example, if all datastore instances became corrupted, the system can restore from backup. An hour of writes might be lost (or, however long since the last backup), but that is better than losing all data permanently.

Options As durability techniques have evolved, some of the earlier approaches cease to be required.

For example, in a synchronously replicated system, RAID is no longer necessary. In the era of elastic capacity, it’s easier to swap a lemon host for a new machine (and restore from a peer) than to try to recover a failed drive. Peers provide the same redundancy as RAID. In fact, they are more durable than RAID if peers are geographically separated.

Another interesting question is whether disk persistence also remains a durability requirement in a synchronously replicated system. Could the redo logging be eliminated? Or, maybe even eliminate the disk entirely and hold everything in memory? It is definitely feasible to run this way, although a number of factors make it rare. The first is that it’s much easier to cause a failure scenario without disk-backed persistence. All one has to do is restart an application. A poorly executed deployment could bounce all machines simultaneously and lose all copies of the data. Or, a bug could cause all applications to fatal and restart simultaneously in response to a common event (such as a corrupt write replicated to every machine). The higher the probability of single failure events, the higher the probability they combine into one big catastrophe. In addition, another factor is that life is simply easier with disk backed storage. One can hold more data at lower cost.

Deployments are faster, because they don’t require a full restore from a peer. And other processes (such as backups) can run independently and read from the file.

Finally, the question arises whether backups remain necessary. The answer is emphatically YES. Even with a replicated datastore, a number of events can require a restore from backup. The first is man-made corruption. A software team could accidentally deploy a release that writes incorrect or malformed data. Or, a user could accidentally upload the wrong data. In these situations, it can be extraordinarily useful to have a historical snapshot of clean data.

Another reason for backup is that, even with a replicated system, it is possible for all machines to crash. True story:

Regarding backups, there’s a common maxim that backups aren’t the goal. Restore from backup is the goal, and restore is the thing that has to work. Unfortunately, because restore is rarely executed, there are many stories of teams who discover too late that backup formats are unusable, or that restore scripts have been neglected for years and no longer run. To assure that restoring from backup works, it is necessary to run periodic firedrills to test the functionality. Even better is if restoration can be incorporated into a regular part of the application lifecycle. For example, bringing a new machine online might restore from backup first, then sync the latest update from a peer. In this way, restoration is frequently exercised.

12 Security and Auditability

12.1 Customer Data Protection

Customer Data includes any of the following:

Customer Data must not be retrieved or persisted by any application without undergoing a security review. In addition, your application will be considered “red”, meaning it will undergo increased scrutiny to ensure customer data is protected at all times. If you do not need customer data, the best option is to avoid using it.

12.2 SOX Compliance

The Sarbanes-Oxley act requires “an adequate internal control structure and procedures for financial reporting”. This, of course, means nothing to an engineer implementing a replicated storage solution with a caching layer and Data Warehouse integration. What controls need to go where?

SOX implementation is open to interpretation, and guidance continues to evolve.

Basically, the goal is to provide an audit trail for changes to financial data, and for transactions that lead to financial data. When SOX became law, we stored almost everything in Oracle databases that shared the same password. This was obviously not an auditable situation. The quick fix was to remove permission for engineers to modify databases except through special accounts. The special accounts had a limited lifetime and had to be tied to a trouble ticket for auditing purposes. Many databases were probably caught up in the exercise that didn’t need to be, but it was better to err toward being overly cautious, and time was limited.

Since then, many teams have adopted non-Oracle storage technologies. What do these teams need to do for SOX compliance?

This isn’t the forum for legal advice. However, it is probably safe to say that any team storing financial information will want to restrict access and maintain an audit log of manual changes. This is simply a best practice, regardless of congressional mandates. (You’re already doing this, right?) Following such practices will go a long way toward satisfying the spirit of SOX.

13 Cost of Ownership

Ownership and costing models fall into buckets.

(1) Run it yourself

This is a solution where you acquire hardware, install a product, and operate the database under you’re own control. Examples include third-party products such as Oracle, MySQL, and BDB.

The cost factors to consider are the hardware, product licensing, support contracts, and operational burden.

Product licensing and support contracts are generally negotiated by the company and hidden from individual engineers. When amortized over the company as a whole, the cost per team is low enough as to be ignorable compared to other expenses.

Hardware cost is a matter of how much you want to pay to reduce latencies. The cheapest machine will have small amounts of RAM and a slow drive, forcing up latencies as all requests vie for scarce resources. You can spend your way out of the problem by purchasing bigger machines with multiple drives and huge amounts of memory for cacheability. At the extreme, one can store all data in memory and avoid disk seeks entirely.

Something to note is that when a datastore is small (say 10G) and request volumes are light, run-it-yourself solutions will have poor hardware utilization. This makes them less cost effective than hosted solutions where many services can colocate data on shared machines.

Finally, one of the most overlooked costs is the operational burden. In practice, at least 1 fulltime headcount will be devoted to running the database. There will be ongoing performance tuning, scaling, and patches that need to be applied, plus the typcial oncall responsibilities like dealing with bad boxes, lease returns, and other runtime headaches. Simply diagnosing a high tp99.9 latency can be a painstaking effort, requiring weeks to troubleshoot and resolve.

The benefit of all these costs, of course, is the ability to control all aspects of the datastore and configure it exactly as desired.

(2) Managed solution

Managed solutions are a relatively new development, popularized by RDS.

They seek to eliminate the most boring and time-consuming administrative tasks, such as backups and database upgrades, while still giving users direct access to the database.

There is a slight loss in flexibility as users must choose from a fixed list of databases and configurations. However, the list of available options is growing.

(3) Hosted solution - pay per use

Hosted solutions eliminate the operational burden of running a database. Problems still arise, such as availability drops or high latencies, but the problems can be transferred to the storage team for resolution.

Providers of hosted storage will talk about the 80/20 rule; the idea that 80% of people have simple requirements that can be satisfied by a hosted solution. The remaining 20% require custom functionality. There is some truth to this, but there’s a variation on this rule too. From the consumers point of view, hosted storage products will typically provide 80% of the needed functionality and make the other 20% harder to achieve. For example, a hosted solution may offer a low latency key/value store, but not support reverse lookups or cross-WAN replication. It could be renamed the 80/80 rule: solving 80% of the problems for 80% of the people.

In other words, even when using hosted solutions, it is often necessary to extend the feature set with customized code, or else sacrifice functionality.

Pay-per-use services, such as S3 and SimpleDB, charge a published rate for various operations. Users can add up the costs and compare against the run-your-own solutions to judge whether the sacrifice in functionality, or the cost of building extensions, is justified by overall cost savings.

Today, internal clients don’t receive a bill for AWS usage. This will change. In the future, clients will receive a TCO summary for AWS products in the same manner that infrastructure currently provides for hardware utilization. There won’t actually be an exchange of money, but teams will be accountable for their resource utilization.

Fork me on GitHub