Identity/CryptoIdeas/05-Queue-Sync

From MozillaWiki
Jump to: navigation, search
Last updated: 2013/10/29

Queue-Based Data Synchronization

  • Chris Karlof, Brian Warner, May-2013


Summary: like Identity/CryptoIdeas/04-Delta-Sync but more stream-oriented than whole-version -oriented.

(this borrows ideas liberally from Chromium, so there is some terminology overlap with that project)

Some more real-time discussion/questions are on https://id.etherpad.mozilla.org/picl-queue-sync-server

Syncable Service, Sync Mediator, Registration

A "Syncable Service" is any service that wants to synchronize data with a PICL account (and thus with corresponding services on other devices). Bookmarks, passwords, open-tabs, etc, are all examples of Syncable Services.

Each Syncable Service is required to register with the "Sync Mediator" at browser startup. In the registration call, the service includes parameters to identify:

  • the name of the service (this allows a service on one device to connect with the same service on other devices: both must use the same name)
  • whether this service uses one-collection-per-device or one-shared-collection
  • whether this service's data goes into class-A or class-B


the service also provides callback functions as follows:

  • mergeDataAndStartSyncing
  • something for downstream changes


Registration returns a function for the service to call when it has upstream changes that need delivery to the server.

Changes vs Records

PICL models the local datastore as a collection of "records", each of which has a globally-unique key (GUID) and some arbitrary value. The server must be able to supply a full set of (encrypted) records at any time (both for new clients which are not yet in sync, and for existing clients that fall out-of-sync for whatever reason).

Once clients are "in sync", they exchange "changes" instead of records. However, if we use a record-based storage format (as opposed to a field-based format, see below), then these are almost identical. The significant difference is that "changes" can also represent a deleted record, and the memory of that deletion needs to stick around for a while.

Suppose the client browser performs the following actions:

  • step 1: create record key=1, value=A
  • step 2: create record key=2, value=B
  • step 3: create record key=3, value=C
  • step 4: modify record key=1 to value=D
  • step 5: delete record key=3
  • step 6: modify record key=1 to value=E


The "current dataset" after each step is:

  • after step 1: (1=A)
  • after step 2: (1=A, 2=B)
  • after step 3: (1=A, 2=B, 3=C)
  • after step 4: (1=D, 2=B, 3=C)
  • after step 5: (1=D, 2=B)
  • after step 6: (1=E, 2=B)


The changes we deliver to implement each step are:

  • step 1: ADD/SET 1=A
  • step 2: ADD/SET 2=B
  • step 3: ADD/SET 3=C
  • step 4: ADD/SET 1=D
  • step 5: DELETE 3
  • step 6: ADD/SET 1=E

Queues

For each service, the Mediator maintains two queues. The "upstream" or "outbound" queue contains local changes that were made to the native datastore (e.g. the Places database), in response to user actions. The upstream queue holds these changes until:

  • network connectivity is available
  • some batching timeout has expired (a Nagle-like algorithm to improve efficiency by sending infrequent large updates instead of frequent tiny updates)
  • any pending downstream changes have been applied and merged into the local datastore


After upstream entries have been sent to the server, they may remain in the queue until the server acknowledges receipt, at which point they are finally deleted. If the server receives an update from some other device (which has not yet been seen by the local device), the server sends a NACK instead, at which point the client will try to merge the other change into the local datastore. Entries in the upstream queue may be removed or modified before transmission as a result of merge actions.

The "downstream" or "inbound" queue contains changes that arrive from the server which have not yet been fully applied to the local datastore.

Each queue contains plaintext changes. The client exchanges only encrypted records/changes with the server. Upstream changes are encrypted just before transmission, and downstream changes are decrypted before being added to the queue.

Server Data Model

Concepts: collection version numbers combined change/record rows, tombstones, "fetch changes since X", hash chain, conflict detection

Browsers send encrypted change records up to the server. Each change record contains the following fields:

  • record id: hash of header
  • "header": (version number, PreviousRecordId, unencrypted key (GUID), hash of encrypted value)
  • encrypted value (or "DELETE")
  • signature: HMAC (using a client-managed key) of record id


Each change record represents an ADD/SET or a DELETE of a specific key. A complete collection is represented by a bunch of ADD/SET changes for non-overlapping keys (exactly one change per key).

The "collection version number" is a collection-wide sequential integer, incremented with each change. It will eventually get get very large (think 8-byte storage). For any given version number, there is a specific set of key/value pairs which make up that version (although the server may not be able to produce that set for arbitrary version numbers). Each version differs by exactly one key (note: this is a significant constraint, and needs more discussion). Each change record has a copy of the collection version number that first includes the new change. Version numbers are generated by the client, when it produces (and hashes/signs) a new change record for delivery to the server.

These change records form a hash chain: each record id validates all previous records back to the very first one (which has a PreviousRecordId of all zeros). Clients which are "in-sync" and receiving only new records will require valid signatures, sequential version numbers, and matching PreviousRecordId values. These clients cannot be made to accept false records, or be tricked into omitting a valid record (some attacks are still possible during a "resync", see below).

The server receiving upstream records cannot check the (symmetric) signature, but it validates all the other fields. It then stores the various fields in a database. The server schema needs to support two kinds of read operations:

  • read op 1: "please give me all changes from version number N to the present"
  • read op 2: "please give me all current records"


If the server does not have enough history to answer the first kind of read with a contiguous set of changes for the requested range, it should return a distictive error, causing the client to perform a resync. But the server must always be able to answer the second kind of read.

The server should use whatever schema proves to be most efficient, but one possible approach would be:

  • columns: userid, collectionId, buildNumber, guid, isCurrent (bool), recordId, header, encryptedValueOrDELETE, signature
  • UNIQUE INDEX (userid, collectionId, buildNumber)
  • for any (userid, collectionId, guid), there is exactly one row with isCurrent=True
  • read #1: SELECT buildNumber>N ORDER BY buildNumber
  • read #2: SELECT isCurrent=True and value!=DELETE
  • deleted keys are represented by "tombstones" with encryptedValue set to a special "DELETE" token
  • writes that modify or delete an existing key will clear isCurrent from the old row and set isCurrent on the new row.
  • servers can periodically compress their datasets. They can remove rows where isCurrent=false, and also rows where isCurrent=true and encryptedValue=DELETE. Ideally, servers will retain enough history (including encryptedValue=DELETE) to return changes to bring all known clients up to date, but if a client has not caught up for a long time, servers can compress away their history anyway, and that client will perform a resync if/when it comes back.


With such a schema, the example above would result in the following rows:

  • ver=1 current=False key=1 val=A
  • ver=2 current=True key=2 val=B
  • ver=3 current=False key=3 val=C
  • ver=4 current=False key=1 val=D
  • ver=5 current=True key=3 val=DELETE
  • ver=6 current=True key=1 val=E


When the server compresses out old rows, it would be left with just:

  • ver=2 current=True key=2 val=B
  • ver=6 current=True key=1 val=E


Servers must be able to answer these queries with paginated responses, rather than attempting to hold the entire response in memory or transmitting it in a single network message.

Server Conflict Detection

When the server receives upstream records, it compares the submitted versionNumber and PreviousRecordId against its current record: the versionNumber should be one greater, and the PreviousRecordId should match the current RecordId. The server will NACK the upstream message and discard the record unless these match. If both match, the server will store the upstream record (thus incrementing its current buildNumber) and ACK the message.

When two clients race to submit different records, the first will be accepted, and the second will be NACKed. The second client will receive both the NACK and the announcement of the other record, causing it to merge the other record and then re-submit its own (possibly modified as a result of the merge).

Initial Merge / Re-Sync

concepts: mergeDataAndStartSyncing, per-datatype merge functions, race detection and merge. Large upstream change stream.

A Queue-Sync client is generally in one of two states: "in-sync" or "out-of-sync". Newly configured clients are initially out-of-sync, then perform a "re-sync" operation to get in-sync. They fall back to the out-of-sync state when they crash during certain critical code windows. We can reduce the frequency of de-synchronization by introducing more durability and linking transactions between different databases, at the cost of code complexity and potentially stalling the UI.

A re-sync begins with the client fetching the server's entire dataset (all current records). Then this dataset is submitted to the Syncable Service's "mergeDataAndStartSyncing" method. This should compare the server dataset against the local datastore, by walking both lists and looking for records that are the present in both, only on the server, or only on the client. Server-only records are added to the client's datastore. Client-only records are scheduled for upload to the server. Records that are the same in both datasets are ignored.

When records with the same GUID are present in both datasets, but with different contents, then a merge must be performed. Hopefully the records will contain enough information (e.g. deltas from the previous version) to preserve user changes in most situations. If not, some fallback policy must be used, like "always use server" or "always use local", and data some data loss will be expected.

Per-record version numbers may help determine how to merge: if the server's record version is higher than the client's, then letting the server's version "win" is slightly more likely to be correct. A bitmap of which fields were changed is also useful: you combine the two records, preferring the modified fields of each. However neither tool is perfect, especially when one side has experienced multiple changes since the records were last in-sync. In general, only a complete list of the semantic changes on each side (which might be deduced from a complete series of deltas on each side) will be enough to merge with confidence. Most of the schemes we're discussing are basically cheaper compressed forms of this essential history.

These two-record merges are handled by the "syncable service", using a function it provides during registration. Different datatypes will need to perform different kinds of merges, so this functionality cannot live in the generic Sync Mediator. Merges may also require awareness of the rest of the dataset (e.g. to maintain invariants like bookmarks that must always have a parent folder), so the merge function is actually specified to accept the whole dataset at once.

Note that the re-sync process will not, in general, be complete: some changes may be lost (especially deletes: the client is likely to resurrect data that the some other client had tried to delete). So the basic goal is to avoid re-sync merges as much as possible, given reasonable limits on code complexity.

The re-sync process may generate some number of upstream changes. These are queued until the process is complete and then delivered to the server in batches. If this delivery completes successfully, then the client is said to be "in-sync".

Staying In-Sync, Durability Requirements

To remain in-sync, the client needs to make sure it does not lose any updates, in either direction. Downstream changes are fairly easy to manage, as the local queues will live in a database, and the server provides collection version numbers on each one (so we can re-fetch them until they've been safely committed to the downstream queue).

Ideally, for the upstream direction, all UI-generated local changes would share an atomic transaction with the corresponding entry in the upstream queue, which would persist until the server acknowledged the change, or it was merged with a winning change from some other client (possibly replacing the upstream change with a new version, etc). If we could do this, then only two things would cause us to drop out-of-sync: and extended period offline (such that one side stopped holding enough changes to deliver all intermediate states to the other), or a server consistency failure (i.e. database rollback).

However, adding two Sync queues to an existing database (e.g. Places DB) sounds neither easy nor clean. We can decouple these two components by putting the Sync queues into their own database, joined with an in-memory list of new changes. As long as this in-memory list is empty when the browser quits (or crashes), we will still be in-sync the next time the browser starts. So we can reduce the problem to storing a single additional sequence number in both e.g. Places DB and the sync queue DB, and compare the values at startup. If they are different, a re-sync must be performed. Only a crash during the short window that follows a UI-generated change will cause a re-sync.

Another option is to unconditionally perform a re-sync at every browser boot. This avoids the dependence on a sequence number. The network hit of doing this sync can be mitigated by retaining a local cache of the server's dataset (updated atomically as change records are received from the server). The re-sync can be done lazily (a minute or two after boot), but we must still consider the local disk IO and CPU costs, to read both datasets and perform the merge on each boot.


Race Conditions

We are still trying to identify the race conditions that must be handled. Things are pretty simple if the re-sync can be atomic, but this is unlikely to be achievable in practice. If the merge takes a while and the UI allows local changes to be made before it is done, we need to detect these changes and find a way to treat them as if they occurred after the merge finished. Downstream changes that arrive while the merge is running can be deferred until the merge is complete (upstream changes produced by the merge will then conflict, and must be merged again).

The easiest option may simply be to respond to any re-sync -time conflict by restarting the whole re-sync operation.

Upstream Change Delivery

Concepts: new collection-version-number calculation, hash-chain calculation, ACK/NACK.

Once the client is in-sync, each local change (generated by the local UI) must cause a new entry to be added to the upstream/outbound queue. This change record should include the full contents of the modified record, along with a bitmap of which fields were modified (which essentially compresses the delta that a more detailed system might send). The record includes a per-record version number (which starts at 0 for each GUID and is incremented every time the client modifies that record).

The client may choose to not send the change right away: it can use a Nagle-like algorithm to accumulate changes until some amount of time has passed, or some high-priority change has been made. This can reduce network bandwidth, especially if high-churn datatypes (like open-tabs) are modifying the same key multiple times in quick succession. The longer the client waits, the more likely it is that some other client will make a change that must be merged. In addition, it increases the risk that the client will crash or be disconnected before the change is published. Different datatypes will need different heuristics to decide when to publish the batched changes.

Once the downstream queue is empty (i.e. any necessary merging is complete), and the client decides to publish its changes, the process looks like:

  • the client speculatively calculates the next sequential collection-version number and assigns it to the entry
  • it also computes the hash of the record, and combines it with the previous record's hash-based ID, to compute the new record's ID. This forms a hash-chain, which is sufficient to confirm that the client produced the new record will full knowledge of the previous records (i.e. there were no intervening changes).
  • finally, it creates an HMAC signature on this new record ID


The client then sends the signed records to the server, using a batching protocol (rather than always trying to send all records in a single request). For each record, the server either responds with a positive ACK, or a negative NACK. It ACKs the record if no other client submitted a competing record first, which means the speculative collection-version number was exactly one greater than the server's previous collection, and the previous record ID matches (i.e. the hash-chain is intact). The server commits the new record to durable storage before returning the ACK, then distributes the new record to all other connected clients (using some push-notification protocol that hasn't been defined yet).

If either of these conditions fail, the server will NACK the update instead: this indicates that the client was out-of-date, probably because some other client had submitted a competing record and won the ensuing race condition. If one record is rejected, the client can expect that all subsequent records in the same batch will be rejected too (since they build upon the same hashed record ID). The client can also expect to hear about the competing record(s) at about the same time, depending upon what sort of push mechanism is being used. The client will need to incorporate the new downstream records into the local datastore, possibly modify the pending upstream records, then generate new collection-version-numbers and hashes, and finally attempt to deliver them again.

It may be easiest to have the client perform a full re-sync if this occurs. This is lossy, but perhaps it will occur infrequently enough that the code simplicity is worth the tradeoff.

Downstream Change Application

Concepts: race detection and merge, scanning/modifying the upstream queue. Filtering downstream changes from the upstream observer.

Once a client is in-sync, its downstream queue will be populated by new change records from the server. These records come from other clients (if they exist: an account which has just one client will never send downstream records to that client). The downstream records will appear some time after the other client produces them, including delays in the push-notification approach we use, intentional delays added by the client (to improve network efficiency), and unavoidable network propagation delays.

The client will use a batching protocol to fetch these records, which may spread the records out over time. However the order will be maintained, and the client should be able to apply earlier records without yet knowing about later ones.

Each downstream record should be checked for validity:

  • the HMAC signature must be correct
  • the record ID hash must match the record contents
  • the previous record ID hash must match the previous record
  • the collection-version-number must be one greater than the previous record
  • i.e. the hash-chain must be intact


If the client's upstream queue is empty, then there are no conflicts or merges necessary. The client can apply the changes to the local dataset, and mark the change as applied so it does not re-fetch or re-apply it again in the future. The client must somehow refrain from echoing these changes back to the server (to reduce traffic and avoid an infinite loop), either by recognizing its own changes as they reappear on the outbound queue (and remove them before transmission), adding some flag that will prevent them from being delivered to the observer, or by something ugly and race-prone like disabling the observer while they are being applied.

If the client's upstream queue is *not* empty, the client is faced with a merge. If the downstream record's GUID does not match anything in the upstream queue, then the records do not directly overlap, and a merge is only necessary for more complex datatypes like bookmarks (e.g. the downstream change deletes a folder, while the upstream change moves a bookmark into that folder).

When a conflict is detected, client should compare the downstream record with the local datastore's current contents and the corresponding upstream record to decide on a merged version. Several possibilities exist:

  • the client might leave the upstream record alone, especially if the changes did not conflict
  • if the downstream record produced the same effect, it will delete the now-redundant upstream change record
  • cross-record invariants might require the creation of new records, e.g. the un-deletion of a folder that is no longer empty, or modification of child pointers in a folder to match a new bookmark


In any case, the upstream records will need new collection-version numbers and new hash-chains, to base them upon the most-recently-received downstream record.

Downstream Cache

For simplicity (in particular to decouple transactionality between the native datastore and the downstream queue), we may have the browser perform a "resync" at every boot. To avoid re-fetching the entire server dataset each time, we can maintain a full copy of the server's dataset in a "Downstream Cache". This is updated when we receive downstream changes, with a transaction that simultaneously updates the cached data and the new build number. With this, we can safely request only new changes each time. In the ideal case (where nothing has changed on the server), a single roundtrip (returning or confirming the current build number) is enough to make sure we're up-to-date.