Checkpoint
The checkpoint entity forms the backbone of the coordination of information between nodes in the cluster. Abstractly, it is a "dictionary", "map", or database table data structure -- that is, a user provides an arbitrary data "key" that returns an arbitrary data "value". However a Checkpoint differs from these structures because it exists in all processes that are interested in it. A checkpoint is fully replicated to all nodes that are interested in it -- it is not a "distributed" dictionary where every node has partial data.
Uses
The primary use for checkpoint is synchronization of state between redundant programs. The "active" software writes a checkpoint, and the "standby" software reads it. This is more useful than a simple message interface because the checkpoint abstraction simultaneously presents the total state and the incremental state changes. Total state access is required when a "cold" standby is first started, when the standby fails and is restarted, and when the active fails over if the "warm" standby has not been actively tracking state changes. Incremental state changes (deltas) are required so a "hot" standby can continually update its program state in response to the active software's actions.
Checkpoint is also used whenever commonly-used data needs to be distributed throughout the cluster. Checkpoints can be single-writer, multiple-reader (more efficient), or multiple-writer, multiple-reader.
A Checkpoint is inappropriate when the data is not often used. Although a checkpoint may be written to disk for recovery during a catastrophic failure event, the entire checkpoint data set is stored in RAM. Therefore a traditional replicated database is more appropriate for a large and/or rarely used data set.
Major Features
Most major features can be selected at object creation time to optimize for speed or for utility
- Replicated: Replicated efficiently to multiple nodes 
- Nested: Checkpoint values can be unique identifiers that automatically resolve-and-lookup in another Checkpoint 
- Persistent: Checkpoints can automatically store themselves to disk (Persistence) 
- Notifiable: You can subscribe to get notified of changes to Checkpoints (Event) 
- Shared memory: For efficiency, a single service per node can maintain coherency (with the rest of the cluster) of a checkpoint. To do this the checkpoint is stored in shared memory 
- Transactional: Checkpoint operations can be wrapped in a cluster-wide Transaction 
- Partial record updates: A partial record update occurs when a checkpoint entry (key,value) exists and an application writes a subset of the value. For example, write 100 bytes to offset 10000100 to 10000200. The value of a partial record update becomes compelling when the value of the checkpoint entry is very long which is why the prior example was used. 
Design
In this document the term "Checkpoint" will be used to refer to the entire replicated checkpoint abstraction. The term "local replica" will be used to refer to a particular copy of the checkpoint data. "Shared replica" refers to a process that only accesses the checkpoint via shared memory.
Process Access
A Checkpoint can be located in process private memory or in shared memory based on an option when the Checkpoint is created.
A Checkpoint that is used by multiple processes on the same node should be located in shared memory for efficiency.
Checkpoint Creation
A checkpoint is always identified by a Handle. At the API layer, a string name can be used. If the latter, this name will be registered with the Name service using the checkpoint's Handle as the value.
Checkpoint Retention Timer and Deletion
Introduction
When all processes close a checkpoint wait N (configurable) seconds and then delete it from memory. This is the checkpoint "retention time".
Each checkpoint is provided with retentionDuration argument when it's opened. When the last call to checkpoint close is performed, a timer is started with retentionDuration and when the timer expires, data of this checkpoint will be deleted from memory. Let's say processes A and B have a checkpoint open which is configured for a 5 minute retention time. A exits. The checkpoint stays "alive". B exits. Now no process has the checkpoint open. The system does NOT close the checkpoint. At 3 minutes after there were no users, process C opens the checkpoint. It opens the original checkpoint data because it was retained for that time. Now process C closes the checkpoint. Again no process has it open so the timer starts. After 5 minutes the data is deleted from shared memory.
For persistent checkpoints, we should have another field "persistentRetentionTime" that configures how long the data is retained on disk.
The purpose of these retention times is to clean up unused resources, but not so quickly that a failure and restart will cause the data to be deleted.
Implementation
Because each checkpoint is stored in a file in shared memory, so, we will iterate all these files to see if there is a process opening or closing a checkpoint. To do this, we'll use process semaphore functions (ProcGate in clThreadApi.hxx).
The implementation can periodically try_lock() (and give() right away). If it returns true, some process still has the checkpoint open. If it returns false, start the retentionTimer. To achieve this, we must have a process (temporarily named RetentionTimer). It monitors all the created checkpoint by periodically checking if a checkpoint open in a process and start the retention timer accordingly. This process should be started along with other SAFplus servers (safplus_amf, safplus_log,...)
clCustomization.hxx
   1 SAFplusI 
   2 {
   3   enum {
   4     CkptUseCheckDuration = 60, /* This is the configured duration in second for which the program checks to see if there is any process with a checkpoint open */
   5     CkptRetentionDurationDefault = 28800, /* This is the default retention duration in second for the retention timer to decide if a checkpoint data is deleted from memory */
   6   };
   7   //...
   8 };
clCkptApi.hxx
In each Checkpoint constructor, add more argument named "retentionDuration" like this:
   1 Checkpoint(const Handle& handle, uint_t flags, uint64_t retentionDuration, uint_t size=0, uint_t rows=0);
   2 Checkpoint(uint_t flags, uint64_t retentionDuration, uint_t size=0, uint_t rows=0);
   3 Checkpoint(); // The default constructor, no argument supplied, retentionDuration will use the default value from clCustomization.hxx 
   4 
clckpt.cxx
   1 /* On checkpoint init, currently we name the shared memory file in the format: ckpt_handle.id[0]:handle.id[1]. The retention timer has no way to take the retentionDuration because it belongs to another process. So, my first suggestion, in order to take the retentionDuration, the shared memory file format should be appended with retentionDuration so that RetentionTimer process can parse the files and read it. Consequently, the new format should be:  ckpt_handle.id[0]:handle.id[1]:retentionDuration */
clRetentionTimer.cxx
   1 #include <clCustomization.hxx>
   2 
   3 /* this struct contains the timer pointer and retention duration for the timer */
   4 struct RetentionTimer {
   5   boost::asio::deadline_timer* timer;
   6   //uint64_t retentionDuration; // in second (obsolete)
   7   // And maybe a ProcGate associated with this checkpoint used to check its status
   8   SAFplus::ProcGate gate;
   9   bool isRunning; // states that if this timer is running or not 
  10 };
  11 
  12 typedef std::pair<const SAFplus::Handle, RetentionTimer> CkptTimerMapPair;
  13 typedef boost::unordered_map<SAFplus::Handle, RetentionTimer> CkptTimerMap; // a map contains ckptHandle as a key and struct RetentionTimer as a value
  14 
  15 boost::asio::io_service io; // io service to associate with one or many deadline timers. In this case, it associates with ckptUseStatusCheckTimer and ckptRetentionTimer
  16 boost::asio::deadline_timer ckptUseStatusCheckTimer(io, boost::posix_time::seconds(CkptUseCheckDuration)); // the timer is to periodically check the use status of a checkpoint
  17 // the onTimeout function for ckptUseStatusCheckTimer
  18 void onCkptUseStatusCheckTimeout(const boost::system::error_code& e); 
  19 // the onTimeout function for ckptRetentionTimer
  20 void onRetentionTimeout(const boost::system::error_code& e, const SAFplus::Handle& ckptHandle); 
  21 // an entry for starting retention timer
  22 void startTimer(const SAFplus::Handle& ckptHandle, uint32_t retentionDuration);
  23 // an entry for stopping retention timer
  24 void stopTimer(const SAFplus::Handle& ckptHandle);
  25 // Run the io service
  26 void runIoService();
  27 // Fill out the retentionTimerMap, also update in case there is a new checkpoint created
  28 void updateRetentionTimerMap();
  29 
  30 int main()
  31 {
  32   // Add work to io service so that the deadline timer continues working after it restarts
  33   boost::asio::io_service::work work(io);
  34   // start the ckpt use status checking
  35   ckptUseStatusCheckTimer.async_wait(onCkptUseStatusCheckTimeout);
  36   // run the io service
  37   runIoService();  
  38 }
  39 
  40 void onCkptUseStatusCheckTimeout(const boost::system::error_code& e)
  41 {
  42   updateRetentionTimerMap();
  43   // Loop thru the map for each ckpt: check to see if there is any process opening this checkpoint
  44   for(CkptTimerMap::iterator iter = ckptTimerMap.begin(); iter != ckptTimerMap.end(); iter++)
  45   {
  46     SAFplus::Handle& ckptHandle = iter->first;
  47     SAFplus::ProcGate& gate = iter->second.gate;
  48     if (gate.try_lock()) // there is a process opening this ckpt
  49     {
  50       stopTimer(ckptHandle);
  51     }
  52     else
  53     {
  54       startTimer(ckptHandle);
  55     }
  56   }
  57   // start a new status check circle
  58   ckptUseStatusCheckTimer.async_wait(onCkptUseStatusCheckTimeout);
  59 }
  60 
  61 void updateRetentionTimerMap()
  62 {
  63   /* TODO: using boost library to walk thru the /dev/shm/ to get all the checkpoint shared memory files following the format: cktp_handle.id[0]:handle.id[1]:retentionDuration
  64      Loop thru all files to get the ckpt handle, retention duration, handle.id[1] (as a ProcGate semId)
  65   */
  66   // Add item to map
  67   boost::asio::deadline_timer* timer = new boost::asio::deadline_timer(io, boost::posix_time::seconds(retentionDuration));
  68   SAFplus::ProcGate gate(ckptHandle.id[1]);
  69   RetentionTimer rt(timer, gate);
  70   CkptTimerMapPair value(ckptHandle, rt);
  71   ckptTimerMap.insert(value);
  72 }
  73 
  74 void onRetentionTimeout(const boost::system::error_code& e, const SAFplus::Handle& ckptHandle)
  75 {
  76   if (e != boost::asio::error::operation_aborted)
  77   {
  78     // Timer was not cancelled, the timer has expired within retentionDuration, so delete data    
  79     char sharedMemFile[256];
  80     // TODO: construct the checkpoint name based on its handle and retention duration   
  81     boost::interprocess::shared_memory_object::remove(sharedMemFile);
  82     // Remove this item from the map, too
  83     CkptTimerMap::iterator contents = ckptTimerMap.find(ckptHandle);
  84     if (contents != ckptTimerMap.end())
  85     {
  86       // delete the timer first
  87       delete contents->second.timer;
  88       ckptTimerMap.erase(contents);
  89     }
  90   }
  91   else
  92   {
  93     logDebug("CKPSVR", "TIMEOUT", "timer was cancelled because the specified checkpoint is opened");
  94   }
  95 }
  96 
  97 void startTimer(const SAFplus::Handle& ckptHandle)
  98 {
  99   CkptTimerMap::iterator contents = ckptTimerMap.find(ckptHandle);
 100   if (contents != ckptTimerMap.end())
 101   {
 102     boost::asio::deadline_timer* timer = contents->second;   
 103     timer->async_wait(boost::bind(onRetentionTimeout, boost::asio::placeholders::error, ckptHandle));
 104     contents->second.isRunning = true;
 105   }
 106   else
 107   {
 108     // Warning: there is no timer associated with the specified handle in the map
 109   }
 110 }
 111 
 112 void stopTimer(const SAFplus::Handle& ckptHandle)
 113 {
 114   CkptTimerMap::iterator contents = ckptTimerMap.find(ckptHandle);
 115   if (contents != ckptTimerMap.end())
 116   {
 117     boost::asio::deadline_timer* timer = contents->second;
 118     if (contents->second.isRunning) 
 119     {
 120       timer->cancel();
 121       contents->second.isRunning = false;
 122     }
 123   }
 124   else
 125   {
 126     // Warning: there is no timer associated with the specified handle in the map
 127   }
 128 }
 129 
 130 void runIoService()
 131 {
 132   io.run();
 133 }
Intra-Node and Inter-Node Replication and Communication
There are two ways that processes can share checkpoint data: either via shared memory or via messaging. Typically, all processes on the same node will use shared memory to share checkpoint data (intra-node) and a designated process per node will handle communication with other nodes (inter-node). However, if a process opens the checkpoint without the shared memory flag, it cannot only use the messaging communication mechanism. It is in essence behaving as if it was running isolated on its own node. When this document refers to inter-node communication, messaging communication, a process being a "node leader", etc. it may refer to actual communication across nodes but it equally refers to communication to processes that have opened the checkpoint "unshared".
For example, it is possible to have a single checkpoint opened and shared by 3 processes and opened unshared by 2 processes all on the same node. In this case, there will be 3 copies of that checkpoint stored in RAM on the node; 1 copy located in shared memory and 2 copies in process local memory. As of the writing, this configuration's is mostly conceived for test and debugging, but the existence of this configuration proves that the code is written in a robust fashion.
Discovery
When a checkpoint is created, it will be opened in shared memory (if applicable). Data within the shared memory will be used to elect a "synchronization replica process" (see Election section). A process that opens an "unshared" checkpoint is defacto the "sync replica process" of its own checkpoint replica.
The job of the synchronization replica process is to keep the checkpoint synchronized with other replicas via the messaging interface. To do this, the synchronization replica process will register a new group with the Group service. It is identified by a well-known Handle or by the ClusterUniqueId returned by the Group registration. It may also be identified by a string entry in the Name service and all APIs that need a checkpoint will accept either a ClusterUniqueId or a string name.
So every checkpoint replica will have exactly one process that is a member of this checkpoint group. Each checkpoint will have a different group. So the Group service (and Name service) end up being implicitly responsible for ensuring that a particular checkpoint (identified by handle or name/handle) is shared across the cluster. In other words, Checkpoint replicas discover each other by joining the Checkpoint's group and iterating through the group members.
The active process designated in the checkpoint's group will be the checkpoint writer, in checkpoints that only allow a single writer. In this case, the process is called the "master replica".
Replication
Replication consists of copying the checkpoint and changes to the checkpoint to another node. There are several relevant cases:
- The replica has no data
- The replica has old data
- The replica is in sync
Efficient replication in the case of 2 and 3 requires that data already located in the replica be used, not retransmitted (delta replication). Two general approaches exist; differencing and time stamps. Differencing involves running a checksum over trees of data values and replicating portions with a checksum mismatch. This is a CPU-intensive approach that allows delta replication regardless of how old the local data is and simultaneously verifies correctness. It is most naturally implemented in a tree data structure, yet the fastest checkpoint lookup structure is a hash table. Timestamp replication annotates each change with a monotonically increasing change id located either in-data or in a separate log. This implementation uses timestamp replication to minimize CPU use and because verification of correctness is not necessary since nodes are trusted.
In general, there are 2 extremas of checkpoint table structure that may render replication strategies inefficient. The first use is "many small records", the second "few large records". The "many small record" extrema is space inefficient if replication requires per-record information. The "few large records" extrema is inefficient if record replication must occur atomically. But since the SA-Forum APIs allow partial record updates it is important for efficiency that a partial record update only transmit the changed portion of the record. However, identifying changes with a timestamp becomes problematic if sub-record delta replication must be supported because a record may have multiple change times.
The timestamp shall consist of 2 parts; a 4 byte "generation" and 4 byte change number stored in the checkpoint header. If the generation of the local replica does not match that of the master replica, all local replica data shall be invalidated. Each transaction that is written to the replica master shall increment the change number. The updated change number shall be written to the checkpoint header and associated with the data that was changed.
Two ways of associating change numbers to data are supported and can be selected by the user: change annotation and change logs. Change annotation allocates memory for the change number in each piece of data written. Change logs use a separate data structure -- a circular queue -- that associates the change number with the key and data sub-set that changed. Therefore, change logs are efficient for "few large partially-updated records", "rarely updated", or "many short records" checkpoint styles, whereas change annotation is efficient for rapidly updated records.
Replica Synchronization
The local replica shall begin replica synchronization as follows:
- Block all checkpoint read access.
- Subscribe to all change updates.
- Request all changes that have occurred after its change number.
- The master shall respond with all deltas between the passed change number and the master's current number. The master can respond with with a "too old" error if the generation (change annotation) does not match or the replica's change number is lower then the oldest change log record (change logs). In this case, the master shall also send the entire checkpoint data.
- The local replica shall apply changes to its database IF the record's local change number (if it exists) is less than the number passed in the synchronization message. If the record's local change number is greater, it must have been updated due to change update message.
- If in-data change numbers are used, the local replica shall apply change updates from 1 the moment they arrive. If a the change log technique is used, the local replica shall defer all updates until synchronization is complete.
Replica Updates
The local replica shall subscribe to checkpoint change updates (as described in 1 above). The master replica shall forward all changes to every subscribed replica, including the relevant change number. Replicas know that every change record has been received because the change number is monotonically increasing. If a replica receives a skipped or out-of-order change number, it can initiate synchronization as described above. At this point, it is not possible for the replica to request all change in a particular change number since a subsequent change may have overwritten some of the changes in this set (on the master). Therefore the master can only respond with all changes from a particular number to the latest (i.e. synchronization).
Replica synchronization may require a complete iteration through the keys of the checkpoint database, so effort should be made to not lose replica update messages.
