The tricky part at this point is coordinating the start of the streaming of data at the recovering partition and source partition so that they are able to agree on a txn id without stalling when only heartbeats are running through the system. A txn id is agreed upon by having the recovering partition suggesting a txn id (or heartbeat txn id), that streaming of recovery data should start at, to the source partition. The source partition responds with the next >= txn id for the next txn it hasn't executed if one exists or the minimum safe txn id based on heartbeats. This guarantees that recovering partition has access to all of the procedure invocations necessary to resume at the point in the global order that the partitions data was streamed at.
The process of streaming the data is pretty simple once started. It uses the serialization format and code for tuple data that is used in the wire protocol. Code for loading blocks of raw tuples into a table already exists for restoring tables from a file and bulk loading. All that was necessary was to add code for breaking up a table into blocks, serializing it, and send/receiving the serialized block at each partition. The RecoverySiteProcessor class and its two implementations, RecoveryDestinationProcessor and RecoverySourceProcessor, take care of this. The JNI method for enabling COW mode for online snapshots was changed to activateTableStream which handles activating various forms of table streaming for both snapshots and recovery. The serializeMore method was renamed to tableStreamSerialize more and extended to handle serializing data for multiple kinds of streams.
A secondary design goal (secondary because this is not a 1.2 feature) was to leave the API open for "pauseless" online recovery. To facilitate this secondary goal the execution engine at each partition can also serializes progress and type information into the header in addition to tuple data. This allows the execution engine to encode things like deletes and updates as well as the straight tuple insertions that occur in the initial transfer.
In a nonblocking scheme the source partition continues to process requests while maintaining a COW style log of changed data. In the worst case the partition will have to throttle so that the COW style log shipping can xfer data faster than than it is mutated, but that is just another special case of an under-provisioned cluster. The nice thing about this COW log as opposed to the actual COW log used for online snapshots is that either the primary key (for deletes) or a 4-byte integer index (for inserts/updates) needs to be logged for each change as opposed to entire tuples.
In a nonblocking scheme the source partition continues to process requests while maintaining a COW style log of changed data. In the worst case the partition will have to throttle so that the COW style log shipping can xfer data faster than than it is mutated, but that is just another special case of an under-provisioned cluster. The nice thing about this COW log as opposed to the actual COW log used for online snapshots is that either the primary key (for deletes) or a 4-byte integer index (for inserts/updates) needs to be logged for each change as opposed to entire tuples.
Failure handling, the thing that tends to keep us up night, for failures concurrent with rejoin use the existing failure detection hammer. Timeouts ensure that progress is made towards completion and the recovering node committing hara kiri upon detecting a node failure allow the existing failure detection hammer to take care of everything we have come up with so far.
Statements and opinions presented here are not those of VoltDB Inc. unless specifically presented as such.
Statements and opinions presented here are not those of VoltDB Inc. unless specifically presented as such.
No comments:
Post a Comment