2. Flow consistency - read-your-writes consistency

 

2. Consistency, Availability and low Latency in Distributed system

(workarounding the CAP/PACELC theorems)

 

Introduction: "1. Cache and Data Consistency in Distributed systems (CAP/PACELC/CRDT)"

 
TL;DR
Full strong Consistency in geographically Distributed systems can only be achieved by sacrificing Availability (per CAP theorem) and with prohibitive Latency costs (per PACELC theorem).
However, we can still design consistent enough systems that continue to function when one geographical region is down and without paying the inter-region latency most of the time.

While eventual Consistency is OK many times, there are still cases when we want a strong read-after-write consistency for certain read-after-write flows. There is an optimum design that assures strong Consistency inside read-after-writes flows. Arguably, this is the highest Consistency level that can be assured without a prohibitive impact on Availability and Latency.

The trick begins with full asynchronous replication between geographical regions. This assures eventual consistency for the type of reads that can tolerate it, with minimum latency (local reads). Such replication can happen in less than 1 second normally. Reads that can tolerate eventual consistency will always use the local data directly.

For flows that need read-after-write (strong) consistency, we design a mechanism to detect the rare cases when the local value is older than previously written data in that flow. For this, the flow needs to carry a small consistency information.

I say that any other solution that will not carry consistency information in the flow cannot achieve lower latency, however it will achieve higher latency in certain scenarios.

This solution to flow consistency requires to assign a certain increasing "version" for each updating value. When a flow writes a data, it receives the (always increasing) version of the updated data. That version must replicate asynchronously with the value in all geographical nodes, overwriting the old value and old version - this can be atomic with minimum cost.

When a subsequent read is done in another geographical node, the request flow will present the version received after the previous write. If the replication was not fast enough, the node performing the read request can detect that it has an older version and wait some more milliseconds for replication to catch up. Alternatively, the inquired node can do a request directly to the writing node.

If the writing node gets unreachable in that short period, the read can fail or fallback to eventual consistency - depending on business requirements. What is important is that the eventual consistent requests will continue to be served with minimum latency, assuring the highest possible Availability.

This solution assumes that it is OK to sometimes receive even newer data - from other concurrent writes in other flows. Assuring isolation between flows is not covered here, however it might be addressable with a similar design based on more strict requirements on version, similar to detecting a merge conflict in git.


Implementation

If the write is always done in a single node ("main region"), it is enough to set for each value the updating timestamp. Timestamp can be incremented sometimes so it always increases. The timestamp is returned to the writer. The writer presents the timestamp when reading from another region. The reading region decides if the replicated data has at least that timestamp. If replication is not finished yet, poll data until timeout or request value from the main region. 

There are solutions to assure local write in any region. The design is similar; however the "version" handling gets a little more complex.

 

 

And now the details

 

Real life problem

In a distributed system, we want to use local cached data as much as possible to assure low latency and partial availability in case of network problems. However, to not serve stale data, we need a mechanism to assure that a business flow that writes something in one region and reads that data from another region will not get stale data due to caching.
Assuring this level of consistency often implies cache invalidation that adds inter-region latency to many requests, in the same time reducing the cache efficiency. We want to maximize cache efficiency and minimize the additional request latency due to consistency protocol.

Solution
1. For the start, replicate data/cache asynchronously with (fast) eventual consistency
We will propagate data replicas as much as possible. Local cache on read is the simple way. However, we can assure even lower average latency if we have all data fully replicated asynchronously on all regions - think CRDT. Unlike other solutions, the presented solution will not add a latency cost when data is replicated or cached in more regions.
In the rest of the article, "replicated" data and "cached" data will be used interchangeably.
Various databases provide fast asynchronous replication (<1-2 seconds). Cassandra, Redis can do this, even Mysql with some throughput limitations. Replication must be asynchronous to not block the local write while data is propagated. Until now we only have eventual consistency - we will build on that.

2. Assign a new, monotonic version at each update
When someone reads from a geographical node, the system needs to decide if the local data is new enough. Because eventual consistency can propagate in less than 1 second usually, most of the time the cached data should be sufficiently new. However, for that tiny percent when data was just updated recently, we need a way to detect this.
We don't want to block each write until all the other nodes are updated. As the write finishes before the write is fully replicated, a read-after-write business flow can immediately read from another region that has the old version of the data. We want to assure that a read-after-write in certain business flows will never get a value older than the write.
Note: you may ask why reading something that was already written, and why not using the same node where consistency is easier to ensure. The answer is that read and write is often done by different actors in a flow. For example, one user is paying a service in the US portal, the provider's server is configuring the service, then user is redirected to a local EU service instance that checks the user's enabled services. The user expects to see its new service configured in EU.
To enforce read-your-writes consistency, we need to associate an "update version" for each data unit - like SQL row, NOSQL key/value, etc. The value version should be monotonic, any new update should result in an increased version.
 
The synchronization point issue 
An important observation is that, when we write and propagate asynchronously then read in another region, the "knowledge" about the previous write needs to travel somehow to the node where the read is done. To achieve this, we need a way to assure that any previous write in that flow was propagated before serving a subsequent read in the flow.
The classical implementation for read-your-writes consistency requires the read and the write to visit a "synchronization point" that validates if the new read must necessarily see that write. This requires to pay in total at least the latency between regions, for each consistent read-after-write. While the read-after-write flow spans across region, that latency is already payed. One issue is that we often need to pay it twice - one more in the read request itself. The bigger issue is that, in this approach, we need to pay that latency even if the read is made multiple times, long after the information was replicated.
Requesting the flow itself to carry the consistency data (about the previous write) takes the role of the synchronizations point. This allows to serve many times a consistent read from already replicated data. In the corner case (<1second) when data was not replicated yet, the read will simply poll until the data is received, basically paying only the natural time that is necessary for the write to propagate across regions.
Any solution that would not carry some consistency data will often show higher latency. When using an external synchronization point, that point will be too far from either the write node or from the read node. The most efficient synchronization point is the one that "goes with the flow" :), as it is always at minimum distance from both the write and read nodes.
A simpler implementation would be to use a kind of "pragma-no-cache" flag on read, that would force the request to forcefully bypass any cache or local data and go to the writing region - that would be the synchronization point. The issue with this solution is that you must always pay the latency on read, even if the read is long after the write. Also, the "no-cache" solution has reduced Availability.

 
Simplified case: write in main region, read anywhere

If the write is always done in a single "main region", it is enough for each value to have a "version" number that increases at each write.
A simple solution is to keep a global counter that increments at each write and copy the value for each written value. While this has advantages in some scenarios, it can become a bottleneck and a SPOF (Single Point of Failure).
The scalable solution is to keep a separate counter for each value and increment it only when that value is updated.
No matter how the version increase is done, at any value update the new version must be stored along with the new data. The version will be also returned to the requesting flow, that will pass it to the next request. In parallel, the version will replicate asynchronously with the data to the other geographical nodes.

When reading from another region, the flow will provide the version received when writing. If the local data has a lower version, the node needs to poll until a timeout to have the data replicated. For the cases when the replication is lagging, the node can do a homing to the main region, to retrieve the data out-of-order. In many cases it is acceptable to receive an even newer version than was written in the flow, while you can have a separate handling for such events - based on the version.
When the writing node becomes unavailable, we can fail the transaction or retry it. In some cases, business can decide that on such rare situations it is better to serve stale data than failing - and have a later process to conciliate inconsistencies.
The good thing is that only particular flows will be affected. Even if we read from a node that gets disconnected from the "main region", we can still read the last known version of any data. We can choose different compromises for different data. Some flows might prefer to fallback to stale data on partitioning, some might prefer to fail and retry when the system can answer consistently.
 
Additional caching on each server's memory
You can additionally cache data on each server instance without the need of a separate invalidation mechanism. If the request needs flow consistency, when the cached data is too old (by version), the server will ignore it and read it from the replicated storage.
If the replicated storage is also too old (as version), it will wait for data to be replicated or do a request to "main region". After reading the newer data, server's cache will also be updated. On the other hand, requests that can tolerate some eventual consistency can be served directly from the server cache without even querying the replicated storage - improving resiliency and Latency.
The nice thing is that you always have a choice based on the particular scenario. When there is an outage, what can work with eventual consistency will work. If the outage is very long, you can decide to manually switch other flows from strong flow consistency to eventual consistency - and deal with inconsistencies conciliation later.

 

Using a timestamp as version

Instead on incrementing a counter, we can assign the current timestamp to the new value. If the current timestamp is smaller or equal to the previous timestamp, we might need to increment the timestamp - to make it to always increase. Of course, that timestamp should update atomically to prevent reusing the same value.

If we use milliseconds, we can have a maximum 1000 updates per second, that is usually enough for each single key. We can use the more expensive nanosecond time for higher update rate.

Having the version as a timestamp brings some advantages. First, we can always know how old is that data. When we do additional caching of the data, the value can lag in the cache way more than the replication latency. Having the exact update time allows to make more granular decisions for cached data. Even without caching, the replication can lag sometimes, and we can use the timestamp to decide how old is too old.

When we want to enforce strong flow consistency, no older data is good enough. When we settle for eventual consistency, we favor Availability by allowing stale data, as long as that data is not too old. Different business flows might have different age requirements for the same data.

Note that you cannot have synchronized clocks across regions. The clock can only be consistent with order inside one region - when taken from a single server. When you write in multiple regions, you cannot reliable compare clocks when the distance between the timestamps is small (milliseconds).


Example of age trade-offs

Unlike versions, you can use timestamp more creatively, based on the "age" of the cached data. For example, even when the game player's rank updates each 5 seconds, you can read with a maximum age of 3 minutes during the game - to allow aggressive caching and low latency. 

However, after each game you can require a maximum 6 seconds age to have the best chance to receive the last updated value. If local data is older than this, you can just show "data is not available". The important thing is that you will not block that request too long, even if there are network issues between nodes.

Some data like age, name can be cached for days - maximizing Availability, and only the flows that updates them should require a very fresh value - enforcing Consistency.

 

Global counter versus local counter

Having a monotonic version that is global to the data repository opens the possibility for additional consistency requirements. For example some flows might want to never read a data that is older than any data that was written or read before in the flow. This can assure consistency over multiple chunks of data even if the replication order is not guarantied. The flow must simply present the maximum of versions obtained for previous reads and writes.

However, the global counter can become a bottleneck for the system. The more scalable approach is to have a separate counter for each value, that only increases when that value updates. In this case you cannot compare versions between data, however the system becomes more scalable.

 

Bigger consistency domains

Note that you can assign version at any granularity is required for your case. If your data needs to be consistent across multiple keys, you can assign a version for all those keys. You only need to assure that, locally in each region, the version is updated atomically with all the data it guards. 

Just remember that using bigger strong-consistency domains results in higher latency - because there is a bigger chance that another flow to update the same data and require an even higher version than this flow would need.

You can theoretically create very long consistent flows. The drawback is that the latency increases with dependencies and it is harder to deal with concurrent writes from other flows.

At limit, if you try to make all your data a single consistency domain... then you end up with the CAP/PACELC limitations that you wanted to avoid. The trick is to isolate small consistency domains for each flow that has a business need in read-your-writes consistency.

 

Writing in multiple regions

A similar strategy can be applied for writing in multiple regions. The difficulty is to create monotonic versions across regions without coordination, for example using vector clocks. This should be the subject of a future article.

 

Read also:

3. Data Consistency and the "Theory of relativity"


Please share this article if you find it interesting.

Subscribe by email or RSS feed if you want to receive future articles.

Thank you.

Comments