Checkpoint
The checkpoint entity forms the backbone of the coordination of information between nodes in the cluster. Abstractly, it is a "dictionary", "map", or database table data structure -- that is, a user provides an arbitrary data "key" that returns an arbitrary data "value". However a Checkpoint differs from these structures because it exists in all processes that are interested in it. A checkpoint is fully replicated to all nodes that are interested in it -- it is not a "distributed" dictionary where every node has partial data.
Uses
The primary use for checkpoint is synchronization of state between redundant programs. The "active" software writes a checkpoint, and the "standby" software reads it. This is more useful than a simple message interface because the checkpoint abstraction simultaneously presents the total state and the incremental state changes. Total state access is required when a "cold" standby is first started, when the standby fails and is restarted, and when the active fails over if the "warm" standby has not been actively tracking state changes. Incremental state changes (deltas) are required so a "hot" standby can continually update its program state in response to the active software's actions.
Checkpoint is also used whenever commonly-used data needs to be distributed throughout the cluster. Checkpoints can be single-writer, multiple-reader (more efficient), or multiple-writer, multiple-reader.
A Checkpoint is inappropriate when the data is not often used. Although a checkpoint may be written to disk for recovery during a catastrophic failure event, the entire checkpoint data set is stored in RAM. Therefore a traditional replicated database is more appropriate for a large and/or rarely used data set.
Major Features
Most major features can be selected at object creation time to optimize for speed or for utility
- Replicated: Replicated efficiently to multiple nodes 
- Nested: Checkpoint values can be unique identifiers that automatically resolve-and-lookup in another Checkpoint 
- Persistent: Checkpoints can automatically store themselves to disk (Persistence) 
- Notifiable: You can subscribe to get notified of changes to Checkpoints (Event) 
- Shared memory: For efficiency, a single service per node can maintain coherency (with the rest of the cluster) of a checkpoint. To do this the checkpoint is stored in shared memory 
- Transactional: Checkpoint operations can be wrapped in a cluster-wide Transaction 
- Partial record updates: A partial record update occurs when a checkpoint entry (key,value) exists and an application writes a subset of the value. For example, write 100 bytes to offset 10000100 to 10000200. The value of a partial record update becomes compelling when the value of the checkpoint entry is very long which is why the prior example was used. 
Design
In this document the term "Checkpoint" will be used to refer to the entire replicated checkpoint abstraction. The term "local replica" will be used to refer to a particular copy of the checkpoint data. "Shared replica" refers to a process that only accesses the checkpoint via shared memory.
Process Access
A Checkpoint can be located in process private memory or in shared memory based on an option when the Checkpoint is created.
A Checkpoint that is used by multiple processes on the same node should be located in shared memory for efficiency.
Checkpoint Creation
A checkpoint is always identified by a Handle. At the API layer, a string name can be used. If the latter, this name will be registered with the Name service using the checkpoint's Handle as the value.
Checkpoint Retention Timer and Deletion
Introduction
When all processes close a checkpoint wait N (configurable) seconds and then delete it from memory. This is the checkpoint "retention time".
Each checkpoint is provided with retentionDuration argument when it's opened. When the last call to checkpoint close is performed, a timer is started with retentionDuration and when the timer expires, data of this checkpoint will be deleted from memory. Let's say processes A and B have a checkpoint open which is configured for a 5 minute retention time. A exits. The checkpoint stays "alive". B exits. Now no process has the checkpoint open. The system does NOT close the checkpoint. At 3 minutes after there were no users, process C opens the checkpoint. It opens the original checkpoint data because it was retained for that time. Now process C closes the checkpoint. Again no process has it open so the timer starts. After 5 minutes the data is deleted from shared memory.
For persistent checkpoints, we should have another field "persistentRetentionTime" that configures how long the data is retained on disk.
The purpose of these retention times is to clean up unused resources, but not so quickly that a failure and restart will cause the data to be deleted.
Implementation
Because each checkpoint is stored in a file in shared memory, so, we will iterate all these files to see if there is a process opening or closing a checkpoint. To do this, we'll use process semaphore functions.
The implementation can periodically try_lock() (and give() right away). If it returns true, some process still has the checkpoint open. If it returns false, start the retentionTimer. To achieve this, we must create a process (temporarily named RetentionTimer). It monitors all the created checkpoint by periodically checking if a checkpoint open in a process and start the retention timer accordingly.
clCustomization.hxx
   1 SAFplusI 
   2 {
   3   enum {
   4     CkptUseCheckDuration = 60, /* This is the configured duration in second for which the program checks to see if there is any process with a checkpoint open */
   5     CkptRetentionDurationDefault = 28800, /* This is the default retention duration in second for the retention timer to decide if a checkpoint data is deleted from memory */
   6   };
   7   //...
   8 };
clCkptApi.hxx
In each Checkpoint constructor, add more argument named "retentionDuration" like this:
   1 Checkpoint(const Handle& handle, uint_t flags, uint64_t retentionDuration, uint_t size=0, uint_t rows=0);
   2 Checkpoint(uint_t flags, uint64_t retentionDuration, uint_t size=0, uint_t rows=0);
   3 Checkpoint(); // The default constructor, no argument supplied, retentionDuration will use the default value from clCustomization.hxx 
   4 
   5 /* this struct contains the timer pointer and retention duration for the timer */
   6 struct RetentionTimer {
   7   boost::asio::deadline_timer* timer;
   8   uint64_t retentionDuration; // in second
   9 };
  10 
  11 typedef std::pair<const SAFplus::Handle, RetentionTimer> CkptTimerMapPair;
  12 typedef boost::unordered_map<SAFplus::Handle, RetentionTimer> CkptTimerMap;
  13 
  14 extern CkptTimerMap ckptTimerMap; // a map contains ckptHandle as a key and struct RetentionTimer as a value. This map is defined in clckpt.cxx
  15 
clckpt.cxx
   1 CkptTimerMap ckptTimerMap; /* On checkpoint construction, add an item {ckptHandle, {NULL, retentionDuration}} to this map so that clRetentionTimer.cxx will reuse this map: it gets checkpoint handle from shared memory files then looks up item from the map, then create a timer object. The timer is started if there is no process opening it */
clRetentionTimer.cxx
#include <clCustomization.hxx> #include <clCkptApi.hxx>
boost::asio::io_service io; // io service to work with one or many deadline timers boost::asio::deadline_timer ckptUseTimer(io, boost::posix_time::seconds(CkptUseCheckDuration)); // the timer is to periodically check the use status of a checkpoint // the onTimeout function for ckptUseTimer void onCkptUseTimeout(const boost::system::error_code& e, const SAFplus::Handle& ckptHandle); // the onTimeout function for ckptRetentionTimer void onRetentionTimeout(const boost::system::error_code& e, const SAFplus::Handle& ckptHandle); // an entry for starting retention timer void startTimer(const SAFplus::Handle& ckptHandle, uint32_t retentionDuration); // an entry for stopping retention timer void stopTimer(const SAFplus::Handle& ckptHandle); // Run the io service void runIoService();
Intra-Node and Inter-Node Replication and Communication
There are two ways that processes can share checkpoint data: either via shared memory or via messaging. Typically, all processes on the same node will use shared memory to share checkpoint data (intra-node) and a designated process per node will handle communication with other nodes (inter-node). However, if a process opens the checkpoint without the shared memory flag, it cannot only use the messaging communication mechanism. It is in essence behaving as if it was running isolated on its own node. When this document refers to inter-node communication, messaging communication, a process being a "node leader", etc. it may refer to actual communication across nodes but it equally refers to communication to processes that have opened the checkpoint "unshared".
For example, it is possible to have a single checkpoint opened and shared by 3 processes and opened unshared by 2 processes all on the same node. In this case, there will be 3 copies of that checkpoint stored in RAM on the node; 1 copy located in shared memory and 2 copies in process local memory. As of the writing, this configuration's is mostly conceived for test and debugging, but the existence of this configuration proves that the code is written in a robust fashion.
Discovery
When a checkpoint is created, it will be opened in shared memory (if applicable). Data within the shared memory will be used to elect a "synchronization replica process" (see Election section). A process that opens an "unshared" checkpoint is defacto the "sync replica process" of its own checkpoint replica.
The job of the synchronization replica process is to keep the checkpoint synchronized with other replicas via the messaging interface. To do this, the synchronization replica process will register a new group with the Group service. It is identified by a well-known Handle or by the ClusterUniqueId returned by the Group registration. It may also be identified by a string entry in the Name service and all APIs that need a checkpoint will accept either a ClusterUniqueId or a string name.
So every checkpoint replica will have exactly one process that is a member of this checkpoint group. Each checkpoint will have a different group. So the Group service (and Name service) end up being implicitly responsible for ensuring that a particular checkpoint (identified by handle or name/handle) is shared across the cluster. In other words, Checkpoint replicas discover each other by joining the Checkpoint's group and iterating through the group members.
The active process designated in the checkpoint's group will be the checkpoint writer, in checkpoints that only allow a single writer. In this case, the process is called the "master replica".
Replication
Replication consists of copying the checkpoint and changes to the checkpoint to another node. There are several relevant cases:
- The replica has no data
- The replica has old data
- The replica is in sync
Efficient replication in the case of 2 and 3 requires that data already located in the replica be used, not retransmitted (delta replication). Two general approaches exist; differencing and time stamps. Differencing involves running a checksum over trees of data values and replicating portions with a checksum mismatch. This is a CPU-intensive approach that allows delta replication regardless of how old the local data is and simultaneously verifies correctness. It is most naturally implemented in a tree data structure, yet the fastest checkpoint lookup structure is a hash table. Timestamp replication annotates each change with a monotonically increasing change id located either in-data or in a separate log. This implementation uses timestamp replication to minimize CPU use and because verification of correctness is not necessary since nodes are trusted.
In general, there are 2 extremas of checkpoint table structure that may render replication strategies inefficient. The first use is "many small records", the second "few large records". The "many small record" extrema is space inefficient if replication requires per-record information. The "few large records" extrema is inefficient if record replication must occur atomically. But since the SA-Forum APIs allow partial record updates it is important for efficiency that a partial record update only transmit the changed portion of the record. However, identifying changes with a timestamp becomes problematic if sub-record delta replication must be supported because a record may have multiple change times.
The timestamp shall consist of 2 parts; a 4 byte "generation" and 4 byte change number stored in the checkpoint header. If the generation of the local replica does not match that of the master replica, all local replica data shall be invalidated. Each transaction that is written to the replica master shall increment the change number. The updated change number shall be written to the checkpoint header and associated with the data that was changed.
Two ways of associating change numbers to data are supported and can be selected by the user: change annotation and change logs. Change annotation allocates memory for the change number in each piece of data written. Change logs use a separate data structure -- a circular queue -- that associates the change number with the key and data sub-set that changed. Therefore, change logs are efficient for "few large partially-updated records", "rarely updated", or "many short records" checkpoint styles, whereas change annotation is efficient for rapidly updated records.
Replica Synchronization
The local replica shall begin replica synchronization as follows:
- Block all checkpoint read access.
- Subscribe to all change updates.
- Request all changes that have occurred after its change number.
- The master shall respond with all deltas between the passed change number and the master's current number. The master can respond with with a "too old" error if the generation (change annotation) does not match or the replica's change number is lower then the oldest change log record (change logs). In this case, the master shall also send the entire checkpoint data.
- The local replica shall apply changes to its database IF the record's local change number (if it exists) is less than the number passed in the synchronization message. If the record's local change number is greater, it must have been updated due to change update message.
- If in-data change numbers are used, the local replica shall apply change updates from 1 the moment they arrive. If a the change log technique is used, the local replica shall defer all updates until synchronization is complete.
Replica Updates
The local replica shall subscribe to checkpoint change updates (as described in 1 above). The master replica shall forward all changes to every subscribed replica, including the relevant change number. Replicas know that every change record has been received because the change number is monotonically increasing. If a replica receives a skipped or out-of-order change number, it can initiate synchronization as described above. At this point, it is not possible for the replica to request all change in a particular change number since a subsequent change may have overwritten some of the changes in this set (on the master). Therefore the master can only respond with all changes from a particular number to the latest (i.e. synchronization).
Replica synchronization may require a complete iteration through the keys of the checkpoint database, so effort should be made to not lose replica update messages.
