| Size: 15526 Comment:  |  ← Revision 83 as of 2015-09-18 02:36:06  ⇥ Size: 27693 Comment:  | 
| Deletions are marked like this. | Additions are marked like this. | 
| Line 62: | Line 62: | 
| ==== Implementation ==== Because each checkpoint is stored in a file in shared memory, so, we will iterate all these files to see if there is a process opening or closing a checkpoint. To do this, we'll use process semaphore functions. The implementation can periodically try_lock() (and give() right away). If it returns true, some process still has the checkpoint open. If it returns false, start the retentionTimer. To achieve this, we must create a process (temporarily named RetentionTimer). It monitors all the created checkpoint by periodically checking if a checkpoint open in a process and start the retention timer accordingly. | ==== Implementation (retention timer is a separate process) ==== Because each checkpoint is stored in a file in shared memory, so, we will iterate all these files to see if there is a process opening or closing a checkpoint. To do this, we'll add "lastUsed" parameter to each checkpoint header. lastUsed stores the time and will updated when one of the following operations invoked: init checkpoint, read checkpoint and write checkpoint. Local copy of lastUsed for each checkpoint is stored as soon as retention timer starts. When retention timer expires, we'll compare the its local value with the one read from the shared memory header: if the result is different, this means the checkpoint is being used, so restart the retention timer, otherwise delete the checkpoint. The implementation can periodically update the checkpoint shared memory to get the newly added checkpoint and update. To achieve this, we must have a process (named ckptretention). It monitors all the created checkpoint as described above. This process should be started along with other SAFplus servers (safplus_amf, safplus_log,...) | 
| Line 75: | Line 75: | 
| CkptUseCheckDuration = 60, /* This is the configured duration in second for which the program checks to see if there is any process with a checkpoint open */ | CkptUpdateDuration = 60, /* This is the configured duration in second for which the program update the checkpoint, means to get changed checkpoint parameters such as last used time or there is any new checkpoint added */ | 
| Line 77: | Line 77: | 
| } | }; | 
| Line 79: | Line 79: | 
| }; }}} ===== clCkptIpi ===== {{{#!highlight cpp class CkptBufferHeader { public: //... uint64_t retentionDuration; // how long (in seconds) a checkpoint data should be retained on disk if there is no call to checkpoint open boost::posix_time::ptime lastUsed; // the last time (in seconds) a checkpoint is used by a process | |
| Line 93: | Line 107: | 
| }}} ===== clckpt.cxx ===== {{{#!highlight cpp /* On checkpoint init, currently we name the shared memory file in the format: ckpt_handle.id[0]:handle.id[1]. The retention timer has no way to take the retentionDuration because it belongs to another process. So, my first suggestion, in order to take the retentionDuration, the shared memory file format should be appended with retentionDuration so that RetentionTimer process can parse the files and read it. Consequently, the new format should be: ckpt_handle.id[0]:handle.id[1]:retentionDuration */ }}} ===== clCkptRetention.cxx ===== {{{#!highlight cpp /* The callback function declaration for timer handler */ typedef void (*TimeoutCb) (const boost::system::error_code&, void*); /* this class wraps the boost deadline timer so that we can operate boost deadline timer via an object */ class Timer { protected: boost::asio::deadline_timer timer; uint64_t waitDuration; boost::asio::io_service& iosvc; TimeoutCb fpOnTimeout; public: Timer(uint64_t wd, boost::asio::io_service& _iosvc, TimeoutCb cb): waitDuration(wd), iosvc(_iosvc), fpOnTimeout(cb), timer(_iosvc, boost::posix_time::seconds(wd)) { } /* wait on a duration */ void wait(void* arg) { timer.async_wait(boost::bind(fpOnTimeout,boost::asio::placeholders::error, arg)); } /* Force the timer to expire immediately */ void expire() { timer.expires_from_now(boost::posix_time::seconds(waitDuration)); } /* Cancel the timer immediately, the timer no longer runs */ void cancel() { timer.cancel(); } ~Timer() { } }; | |
| Line 94: | Line 159: | 
| struct { boost::asio::deadline_timer* timer; uint64_t retentionDuration; } RetentionTimer; | struct RetentionTimerData { Timer timer; //uint64_t retentionDuration; // in second (obsolete) // And maybe a ProcGate associated with this checkpoint used to check its status boost::posix_time::ptime lastUsed; bool isRunning; // states that if this timer is running or not RetentionTimerData(Timer t, boost::posix_time::ptime _lastUsed, bool running): timer(t), lastUsed(_lastUsed), isRunning(running) { } }; | 
| Line 100: | Line 172: | 
| typedef boost::unordered_map<SAFplus::Handle, RetentionTimer> CkptTimerMap; extern CkptTimerMap ckptTimerMap; // a map contains ckptHandle as a key and struct RetentionTimer as a value. This map is defined in clckpt.cxx | typedef boost::unordered_map<SAFplus::Handle, RetentionTimerData> CkptTimerMap; // a map contains ckptHandle as a key and struct RetentionTimerData as a value /* Retention timer class: use this class for retention timer for both shared memory files and persistent files on disk */ class RetentionTimer { public: CkptTimerMap ckptTimerMap; /*RetentionTimer() { }*/ // the onTimeout function for ckptRetentionTimer virtual void onRetentionTimeout(const boost::system::error_code& e, void* arg)=0; // an entry for starting retention timer void startTimer(CkptTimerMap::iterator& iter) { boost::asio::deadline_timer timer = iter->second.timer; timer.wait(); iter->second.isRunning = true; } // an entry for stopping retention timer void stopTimer(CkptTimerMap::iterator& iter) { boost::asio::deadline_timer timer = iter->second.timer; if (iter->second.isRunning) { timer.cancel(); iter->second.isRunning = false; } } // Fill out the retentionTimerMap, also update in case there is a new checkpoint created void updateRetentionTimerMap(const char* path)=0; }; class SharedMemFileTimer: public RetentionTimer { public: virtual void updateRetentionTimerMap(const char* path) { /* TODO: using boost library to walk thru the /dev/shm/ to get all the checkpoint shared memory files following the format: cktp_handle.id[0]:handle.id[1]:retentionDuration Loop thru all files to get the ckpt handle, retention duration, handle.id[1] (as a ProcGate semId) */ // Add item to map Timer timer(retentionDuration, io, onRetentionTimeout); SAFplus::ProcGate gate(ckptHandle.id[1]); RetentionTimerData rt(timer, gate, false); CkptTimerMapPair value(ckptHandle, rt); ckptTimerMap.insert(value); } virtual void onRetentionTimeout(const boost::system::error_code& e, void* arg) { if (e != boost::asio::error::operation_aborted) { // Timer was not cancelled, the timer has expired within retentionDuration, so delete data SAFplus::Handle* ckptHandle = (SAFplus::Handle*)arg; char sharedMemFile[256]; // TODO: construct the checkpoint name based on its handle and retention duration boost::interprocess::shared_memory_object::remove(sharedMemFile); // Remove this item from the map, too CkptTimerMap::iterator contents = ckptTimerMap.find(ckptHandle); if (contents != ckptTimerMap.end()) { ckptTimerMap.erase(contents); } } else { logDebug("CKPSVR", "TIMEOUT", "timer was cancelled because the specified checkpoint is opened"); } } }; class PersistenFileTimer: public RetentionTimer { public: virtual void updateRetentionTimerMap(const char* path) { } virtual void onRetentionTimeout(const boost::system::error_code& e, void* arg) { if (e != boost::asio::error::operation_aborted) { // Timer was not cancelled, the timer has expired within retentionDuration, so delete data SAFplus::Handle* ckptHandle = (SAFplus::Handle*)arg; char path[256]; // TODO: construct the checkpoint name based on its handle and retention duration unlink(path); // remove file on disk // Remove this item from the map, too CkptTimerMap::iterator contents = ckptTimerMap.find(ckptHandle); if (contents != ckptTimerMap.end()) { ckptTimerMap.erase(contents); } } else { logDebug("CKPSVR", "TIMEOUT", "timer was cancelled because the specified checkpoint is opened"); } } }; /* struct definition for argument for timer handler */ struct TimerArg { SharedMemFileTimer* shmTimer; PersistentFileTimer* perstTimer; TimerArg(SharedMemFileTimer* _shmTimer, PersistentFileTimer* _perstTimer):shmTimer(_shmTimer), perstTimer(_perstTimer) { } }; /* Global variables and functions */ boost::asio::io_service io; // io service to associate with one or many deadline timers. In this case, it associates with ckptUpdateTimer and ckptRetentionTimer // the onTimeout function for ckptUpdateTimer void onCkptUpdateTimeout(const boost::system::error_code& e, void* arg); // update timer ckpt retention data and start or stop timer accordingly void updateTimer(RententionTimer* timer); // Run the io service void runIoService(); int main() { // Add work to io service so that the deadline timer continues working after it restarts boost::asio::io_service::work work(io); SharedMemFileTimer sharedMemFilesTimer; PersistentFileTimer persistentFilesTimer; TimerArg timerArg(&sharedMemFilesTimer, &persistentFilesTimer); // start the ckpt update timer Timer ckptUpdateTimer(CkptUpdateDuration, io, onCkptUpdateTimeout); ckptUpdateTimer.wait(&timerArg); // run the io service runIoService(); // The program control blocks here } void onCkptUpdateTimeout(const boost::system::error_code& e, void* arg) { // get the RetentionTimer objects from arg (both sharedMemfiles and persistent files) updateTimer(sharedMemFilesTimer); updateTimer(persistentFilesTimer); // start a ckpt update cycle ckptUpdateTimer.wait(); } void updateTimer(RententionTimer* timer) { timer->updateRetentionTimerMap(); // Loop thru the map for each ckpt: check to see if there is any process opening this checkpoint for(CkptTimerMap::iterator iter = timer->ckptTimerMap.begin(); iter != timer->ckptTimerMap.end(); iter++) { timer->startTimer(iter); } } void runIoService() { io.run(); } }}} ==== Implementation (retention timer is in a calling process) ==== Instead of creating retention timer in a separate process, this method is to integrate it in a calling process. Retention timer implementation is taken place at Checkpoint object (clckpt.cxx): each checkpoint object has a retention timer. Retention timer will be activated when a calling process uses checkpoint object APIs. But this method may not solve the problem when a process itself is failure: === Persistent checkpoint Retention Timer and Deletion === ==== Introduction ==== Persistent checkpoint is checkpoint whose data is stored in the files managed by database access layer (DBAL). The functionality of how to delete the files is similar to deleting checkpoint from shared memory: persistentRetentionDuration is provided for a checkpoint and after this time if there is no any process accessing the files, they will be deleted from disc === Persistent checkpoint === ==== Introduction ==== Checkpoint data is stored on the shared memory of a node. This data is available when the node is alive. But when the node is powered off, this data is not available. In order to make it available regardless of node state (on or off) it needs to be stored on disk permanently. However, writing data to disk is quite slow, so not all of checkpoint data is written. A checkpoint data will be written to disk if: * the checkpoint type is PERSISTENT and * when flush() function of the checkpoint is called But the action has some requirements: * For writing: make sure that data on shared memory and data on disk are synchronized. For example, checkpoint write was called many times before, in which new data (new key, value) was added and old data (with old key but new data) is updated. So, when flush() is called, all those changes must be reflected to disk * For reading: read data from shared memory first, if it's empty, read from disk but we don't need to take care of this, this is always ensured because data between shared memory and disk are synchronized, so reading from shared memory or reading from disk are similar. ==== Implementation ==== ===== clCkptApi.hxx ===== * Each checkpoint identified by its handle has a corresponding table in the database (in case it's PERSISTENT), so adding member variable name dbHandle that points to its database * Adding flush() member function that reflects all changes from checkpoint shared memory to checkpoint database table {{{#!highlight cpp class Checkpoint { protected: ClDBHandleT dbHandle; // Database handle to manipulate with database public: void flush(); // Reflects all changes from shared memory to database protected: void syncFromDisk(); // in the case that user opens a non-existing checkpoint but its data is available on disk, so this function copies data from database to checkpoint data to ensure data between them to be synchronized }; | 
| Line 110: | Line 389: | 
| CkptTimerMap ckptTimerMap; /* On checkpoint construction, add an item {ckptHandle, {NULL, retentionDuration}} to this map so that clRetentionTimer.cxx will reuse this map: it gets checkpoint handle from shared memory files then looks up item from the map, then create a timer object. The timer is started if there is no process opening it */ }}} ===== clRetentionTimer.cxx ===== | void SAFplus::Checkpoint::init(const Handle& hdl, uint_t _flags, uint64_t retentionDuration, uint_t size, uint_t rows,SAFplus::Wakeable& execSemantics) { //... if (ckptNotExists) // if this checkpoint doesn't exist { // check to see if the database of this checkpoint exists rc = clDbalOpen(tempStr, tempStr, CL_DB_OPEN, maxKeySize, maxRecordSize, &dbHandle); if (rc == CL_OK) { boost::thread(syncFromDisk, NULL); // launch a new thread that synchronizes data from database with this checkpoint data } else { if (flags&PERSISTENT) { // create a new database rc = clDbalOpen(tempStr, tempStr, CL_DB_CREATE, maxKeySize, maxRecordSize, &dbHandle); } } } //... } void SAFplus::Checkpoint::flush() { if (flags&PERSISTENT) { // write code that reads data from shared memory, then synchronizes those with data from database } else { clDbgNotImplemeted("%s is not supported with this checkpoint type", __FUNCTION__); } } void SAFplus::Checkpoint::syncFromDisk() { // Write code that reads data from database and write to checkpoint data } }}} === DBAL checkpoint === | 
Checkpoint
The checkpoint entity forms the backbone of the coordination of information between nodes in the cluster. Abstractly, it is a "dictionary", "map", or database table data structure -- that is, a user provides an arbitrary data "key" that returns an arbitrary data "value". However a Checkpoint differs from these structures because it exists in all processes that are interested in it. A checkpoint is fully replicated to all nodes that are interested in it -- it is not a "distributed" dictionary where every node has partial data.
Uses
The primary use for checkpoint is synchronization of state between redundant programs. The "active" software writes a checkpoint, and the "standby" software reads it. This is more useful than a simple message interface because the checkpoint abstraction simultaneously presents the total state and the incremental state changes. Total state access is required when a "cold" standby is first started, when the standby fails and is restarted, and when the active fails over if the "warm" standby has not been actively tracking state changes. Incremental state changes (deltas) are required so a "hot" standby can continually update its program state in response to the active software's actions.
Checkpoint is also used whenever commonly-used data needs to be distributed throughout the cluster. Checkpoints can be single-writer, multiple-reader (more efficient), or multiple-writer, multiple-reader.
A Checkpoint is inappropriate when the data is not often used. Although a checkpoint may be written to disk for recovery during a catastrophic failure event, the entire checkpoint data set is stored in RAM. Therefore a traditional replicated database is more appropriate for a large and/or rarely used data set.
Major Features
Most major features can be selected at object creation time to optimize for speed or for utility
- Replicated: Replicated efficiently to multiple nodes 
- Nested: Checkpoint values can be unique identifiers that automatically resolve-and-lookup in another Checkpoint 
- Persistent: Checkpoints can automatically store themselves to disk (Persistence) 
- Notifiable: You can subscribe to get notified of changes to Checkpoints (Event) 
- Shared memory: For efficiency, a single service per node can maintain coherency (with the rest of the cluster) of a checkpoint. To do this the checkpoint is stored in shared memory 
- Transactional: Checkpoint operations can be wrapped in a cluster-wide Transaction 
- Partial record updates: A partial record update occurs when a checkpoint entry (key,value) exists and an application writes a subset of the value. For example, write 100 bytes to offset 10000100 to 10000200. The value of a partial record update becomes compelling when the value of the checkpoint entry is very long which is why the prior example was used. 
Design
In this document the term "Checkpoint" will be used to refer to the entire replicated checkpoint abstraction. The term "local replica" will be used to refer to a particular copy of the checkpoint data. "Shared replica" refers to a process that only accesses the checkpoint via shared memory.
Process Access
A Checkpoint can be located in process private memory or in shared memory based on an option when the Checkpoint is created.
A Checkpoint that is used by multiple processes on the same node should be located in shared memory for efficiency.
Checkpoint Creation
A checkpoint is always identified by a Handle. At the API layer, a string name can be used. If the latter, this name will be registered with the Name service using the checkpoint's Handle as the value.
Checkpoint Retention Timer and Deletion
Introduction
When all processes close a checkpoint wait N (configurable) seconds and then delete it from memory. This is the checkpoint "retention time".
Each checkpoint is provided with retentionDuration argument when it's opened. When the last call to checkpoint close is performed, a timer is started with retentionDuration and when the timer expires, data of this checkpoint will be deleted from memory. Let's say processes A and B have a checkpoint open which is configured for a 5 minute retention time. A exits. The checkpoint stays "alive". B exits. Now no process has the checkpoint open. The system does NOT close the checkpoint. At 3 minutes after there were no users, process C opens the checkpoint. It opens the original checkpoint data because it was retained for that time. Now process C closes the checkpoint. Again no process has it open so the timer starts. After 5 minutes the data is deleted from shared memory.
For persistent checkpoints, we should have another field "persistentRetentionTime" that configures how long the data is retained on disk.
The purpose of these retention times is to clean up unused resources, but not so quickly that a failure and restart will cause the data to be deleted.
Implementation (retention timer is a separate process)
Because each checkpoint is stored in a file in shared memory, so, we will iterate all these files to see if there is a process opening or closing a checkpoint. To do this, we'll add "lastUsed" parameter to each checkpoint header. lastUsed stores the time and will updated when one of the following operations invoked: init checkpoint, read checkpoint and write checkpoint. Local copy of lastUsed for each checkpoint is stored as soon as retention timer starts. When retention timer expires, we'll compare the its local value with the one read from the shared memory header: if the result is different, this means the checkpoint is being used, so restart the retention timer, otherwise delete the checkpoint.
The implementation can periodically update the checkpoint shared memory to get the newly added checkpoint and update. To achieve this, we must have a process (named ckptretention). It monitors all the created checkpoint as described above. This process should be started along with other SAFplus servers (safplus_amf, safplus_log,...)
clCustomization.hxx
   1 SAFplusI 
   2 {
   3   enum {
   4     CkptUpdateDuration = 60, /* This is the configured duration in second for which the program update the checkpoint, means to get changed checkpoint parameters such as last used time or there is any new checkpoint added */
   5     CkptRetentionDurationDefault = 28800, /* This is the default retention duration in second for the retention timer to decide if a checkpoint data is deleted from memory */
   6   };
   7   //...
   8 };
clCkptIpi
clCkptApi.hxx
In each Checkpoint constructor, add more argument named "retentionDuration" like this:
   1 Checkpoint(const Handle& handle, uint_t flags, uint64_t retentionDuration, uint_t size=0, uint_t rows=0);
   2 Checkpoint(uint_t flags, uint64_t retentionDuration, uint_t size=0, uint_t rows=0);
   3 Checkpoint(); // The default constructor, no argument supplied, retentionDuration will use the default value from clCustomization.hxx 
   4 
clckpt.cxx
   1 /* On checkpoint init, currently we name the shared memory file in the format: ckpt_handle.id[0]:handle.id[1]. The retention timer has no way to take the retentionDuration because it belongs to another process. So, my first suggestion, in order to take the retentionDuration, the shared memory file format should be appended with retentionDuration so that RetentionTimer process can parse the files and read it. Consequently, the new format should be:  ckpt_handle.id[0]:handle.id[1]:retentionDuration */
clCkptRetention.cxx
   1 /* The callback function declaration for timer handler */
   2 typedef void (*TimeoutCb) (const boost::system::error_code&, void*);
   3 
   4 /* this class wraps the boost deadline timer so that we can operate boost deadline timer via an object */
   5 class Timer
   6 {
   7 protected:
   8   boost::asio::deadline_timer timer;
   9   uint64_t waitDuration;
  10   boost::asio::io_service& iosvc;
  11   TimeoutCb fpOnTimeout;
  12 public:
  13   Timer(uint64_t wd, boost::asio::io_service& _iosvc, TimeoutCb cb): waitDuration(wd), iosvc(_iosvc), fpOnTimeout(cb), timer(_iosvc, boost::posix_time::seconds(wd))
  14   {
  15    
  16   }
  17   /* wait on a duration */
  18   void wait(void* arg)
  19   {   
  20     timer.async_wait(boost::bind(fpOnTimeout,boost::asio::placeholders::error, arg));    
  21   }  
  22   /* Force the timer to expire immediately */
  23   void expire()
  24   {
  25     timer.expires_from_now(boost::posix_time::seconds(waitDuration));
  26   }
  27   /* Cancel the timer immediately, the timer no longer runs */
  28   void cancel()
  29   {
  30     timer.cancel();
  31   }
  32   ~Timer()
  33   {
  34     
  35   }
  36 };
  37 
  38 /* this struct contains the timer pointer and retention duration for the timer */
  39 struct RetentionTimerData 
  40 {
  41   Timer timer;
  42   //uint64_t retentionDuration; // in second (obsolete)
  43   // And maybe a ProcGate associated with this checkpoint used to check its status
  44   boost::posix_time::ptime lastUsed; 
  45   bool isRunning; // states that if this timer is running or not 
  46   RetentionTimerData(Timer t, boost::posix_time::ptime _lastUsed, bool running): timer(t), lastUsed(_lastUsed), isRunning(running)
  47   {
  48   }
  49 };
  50 
  51 typedef std::pair<const SAFplus::Handle, RetentionTimer> CkptTimerMapPair;
  52 typedef boost::unordered_map<SAFplus::Handle, RetentionTimerData> CkptTimerMap; // a map contains ckptHandle as a key and struct RetentionTimerData as a value
  53 
  54 /* Retention timer class: use this class for retention timer for both shared memory files and persistent files on disk */
  55 class RetentionTimer
  56 {
  57 public:
  58   CkptTimerMap ckptTimerMap;
  59   /*RetentionTimer()
  60   {
  61   }*/
  62   // the onTimeout function for ckptRetentionTimer
  63   virtual void onRetentionTimeout(const boost::system::error_code& e, void* arg)=0;
  64   
  65   // an entry for starting retention timer
  66   void startTimer(CkptTimerMap::iterator& iter)
  67   {    
  68     boost::asio::deadline_timer timer = iter->second.timer;   
  69     timer.wait();
  70     iter->second.isRunning = true;  
  71   }
  72   // an entry for stopping retention timer
  73   void stopTimer(CkptTimerMap::iterator& iter)
  74   {
  75     boost::asio::deadline_timer timer = iter->second.timer;
  76     if (iter->second.isRunning) 
  77     {
  78       timer.cancel();
  79       iter->second.isRunning = false;
  80     }  
  81   }
  82   // Fill out the retentionTimerMap, also update in case there is a new checkpoint created
  83   void updateRetentionTimerMap(const char* path)=0;  
  84 };
  85 
  86 class SharedMemFileTimer: public RetentionTimer
  87 {
  88 public:
  89   virtual void updateRetentionTimerMap(const char* path)
  90   {
  91     /* TODO: using boost library to walk thru the /dev/shm/ to get all the checkpoint shared memory files following the format: cktp_handle.id[0]:handle.id[1]:retentionDuration
  92      Loop thru all files to get the ckpt handle, retention duration, handle.id[1] (as a ProcGate semId)
  93     */
  94     // Add item to map
  95     Timer timer(retentionDuration, io, onRetentionTimeout);
  96     SAFplus::ProcGate gate(ckptHandle.id[1]);
  97     RetentionTimerData rt(timer, gate, false);
  98     CkptTimerMapPair value(ckptHandle, rt);
  99     ckptTimerMap.insert(value);
 100   }
 101   virtual void onRetentionTimeout(const boost::system::error_code& e, void* arg)
 102   {
 103     if (e != boost::asio::error::operation_aborted)
 104     {
 105       // Timer was not cancelled, the timer has expired within retentionDuration, so delete data    
 106       SAFplus::Handle* ckptHandle = (SAFplus::Handle*)arg;
 107       char sharedMemFile[256];
 108       // TODO: construct the checkpoint name based on its handle and retention duration   
 109       boost::interprocess::shared_memory_object::remove(sharedMemFile);
 110       // Remove this item from the map, too
 111       CkptTimerMap::iterator contents = ckptTimerMap.find(ckptHandle);
 112       if (contents != ckptTimerMap.end())
 113       {     
 114         ckptTimerMap.erase(contents);
 115       }
 116     }
 117     else
 118     {
 119       logDebug("CKPSVR", "TIMEOUT", "timer was cancelled because the specified checkpoint is opened");
 120     }
 121   }
 122 };
 123 
 124 class PersistenFileTimer: public RetentionTimer
 125 {
 126 public:
 127   virtual void updateRetentionTimerMap(const char* path)
 128   {
 129     
 130   }
 131   virtual void onRetentionTimeout(const boost::system::error_code& e, void* arg)
 132   {
 133     if (e != boost::asio::error::operation_aborted)
 134     {
 135       // Timer was not cancelled, the timer has expired within retentionDuration, so delete data    
 136       SAFplus::Handle* ckptHandle = (SAFplus::Handle*)arg;
 137       char path[256];
 138       // TODO: construct the checkpoint name based on its handle and retention duration   
 139       unlink(path); // remove file on disk
 140       // Remove this item from the map, too
 141       CkptTimerMap::iterator contents = ckptTimerMap.find(ckptHandle);
 142       if (contents != ckptTimerMap.end())
 143       {     
 144         ckptTimerMap.erase(contents);
 145       }
 146     }
 147     else
 148     {
 149       logDebug("CKPSVR", "TIMEOUT", "timer was cancelled because the specified checkpoint is opened");
 150     }
 151   }
 152 };
 153 
 154 /* struct definition for argument for timer handler */
 155 struct TimerArg
 156 {
 157   SharedMemFileTimer* shmTimer;
 158   PersistentFileTimer* perstTimer;
 159   TimerArg(SharedMemFileTimer* _shmTimer, PersistentFileTimer* _perstTimer):shmTimer(_shmTimer), perstTimer(_perstTimer)
 160   {
 161   }
 162 };
 163 
 164 
 165 /* Global variables and functions */
 166 
 167 boost::asio::io_service io; // io service to associate with one or many deadline timers. In this case, it associates with ckptUpdateTimer and ckptRetentionTimer
 168 // the onTimeout function for ckptUpdateTimer
 169 void onCkptUpdateTimeout(const boost::system::error_code& e, void* arg); 
 170 // update timer ckpt retention data and start or stop timer accordingly
 171 void updateTimer(RententionTimer* timer);
 172 // Run the io service
 173 void runIoService();
 174 
 175 int main()
 176 {
 177   // Add work to io service so that the deadline timer continues working after it restarts
 178   boost::asio::io_service::work work(io);
 179   
 180   SharedMemFileTimer sharedMemFilesTimer;
 181   PersistentFileTimer persistentFilesTimer;
 182   TimerArg timerArg(&sharedMemFilesTimer, &persistentFilesTimer);
 183   // start the ckpt update timer
 184   Timer ckptUpdateTimer(CkptUpdateDuration, io, onCkptUpdateTimeout);
 185   ckptUpdateTimer.wait(&timerArg);
 186   // run the io service
 187   runIoService();  // The program control blocks here
 188 }
 189 
 190 void onCkptUpdateTimeout(const boost::system::error_code& e, void* arg)
 191 {
 192   // get the RetentionTimer objects from arg (both sharedMemfiles and persistent files)
 193   
 194   updateTimer(sharedMemFilesTimer);
 195   updateTimer(persistentFilesTimer);
 196   // start a ckpt update cycle
 197   ckptUpdateTimer.wait();
 198 }
 199 
 200 void updateTimer(RententionTimer* timer)
 201 {
 202   timer->updateRetentionTimerMap();
 203   // Loop thru the map for each ckpt: check to see if there is any process opening this checkpoint
 204   for(CkptTimerMap::iterator iter = timer->ckptTimerMap.begin(); iter != timer->ckptTimerMap.end(); iter++)
 205   {    
 206     timer->startTimer(iter);    
 207   }
 208 }
 209 
 210 void runIoService()
 211 {
 212   io.run();
 213 }
Implementation (retention timer is in a calling process)
Instead of creating retention timer in a separate process, this method is to integrate it in a calling process. Retention timer implementation is taken place at Checkpoint object (clckpt.cxx): each checkpoint object has a retention timer. Retention timer will be activated when a calling process uses checkpoint object APIs. But this method may not solve the problem when a process itself is failure:
Persistent checkpoint Retention Timer and Deletion
Introduction
Persistent checkpoint is checkpoint whose data is stored in the files managed by database access layer (DBAL). The functionality of how to delete the files is similar to deleting checkpoint from shared memory: persistentRetentionDuration is provided for a checkpoint and after this time if there is no any process accessing the files, they will be deleted from disc
Persistent checkpoint
Introduction
Checkpoint data is stored on the shared memory of a node. This data is available when the node is alive. But when the node is powered off, this data is not available. In order to make it available regardless of node state (on or off) it needs to be stored on disk permanently. However, writing data to disk is quite slow, so not all of checkpoint data is written. A checkpoint data will be written to disk if:
- the checkpoint type is PERSISTENT and
- when flush() function of the checkpoint is called
But the action has some requirements:
- For writing: make sure that data on shared memory and data on disk are synchronized. For example, checkpoint write was called many times before, in which new data (new key, value) was added and old data (with old key but new data) is updated. So, when flush() is called, all those changes must be reflected to disk
- For reading: read data from shared memory first, if it's empty, read from disk but we don't need to take care of this, this is always ensured because data between shared memory and disk are synchronized, so reading from shared memory or reading from disk are similar.
Implementation
clCkptApi.hxx
- Each checkpoint identified by its handle has a corresponding table in the database (in case it's PERSISTENT), so adding member variable name dbHandle that points to its database
- Adding flush() member function that reflects all changes from checkpoint shared memory to checkpoint database table
   1 class Checkpoint 
   2 {
   3 protected:
   4   ClDBHandleT dbHandle; // Database handle to manipulate with database
   5 
   6 public:
   7   void flush(); // Reflects all changes from shared memory to database
   8 
   9 protected:
  10   void syncFromDisk(); // in the case that user opens a non-existing checkpoint but its data is available on disk, so this function copies data from database to checkpoint data to ensure data between them to be synchronized
  11 
  12 };
clckpt.cxx
   1 void SAFplus::Checkpoint::init(const Handle& hdl, uint_t _flags, uint64_t retentionDuration, uint_t size, uint_t rows,SAFplus::Wakeable& execSemantics)
   2 {
   3   //...    
   4   if (ckptNotExists) // if this checkpoint doesn't exist
   5   {
   6     // check to see if the database of this checkpoint exists
   7     rc = clDbalOpen(tempStr, tempStr, CL_DB_OPEN, maxKeySize, maxRecordSize, &dbHandle);
   8     if (rc == CL_OK)
   9     {
  10       boost::thread(syncFromDisk, NULL); // launch a new thread that synchronizes data from database with this checkpoint data
  11     }
  12     else
  13     {
  14       if (flags&PERSISTENT)
  15       {
  16         // create a new database
  17         rc = clDbalOpen(tempStr, tempStr, CL_DB_CREATE, maxKeySize, maxRecordSize, &dbHandle);
  18       }
  19     }    
  20   }
  21   //...
  22 }
  23 
  24 void SAFplus::Checkpoint::flush()
  25 {
  26   if (flags&PERSISTENT)
  27   {
  28     // write code that reads data from shared memory, then synchronizes those with data from database
  29   }
  30   else
  31   {
  32     clDbgNotImplemeted("%s is not supported with this checkpoint type", __FUNCTION__);
  33   }
  34 }
  35 
  36 void SAFplus::Checkpoint::syncFromDisk()
  37 {
  38   // Write code that reads data from database and write to checkpoint data
  39 }
DBAL checkpoint
Intra-Node and Inter-Node Replication and Communication
There are two ways that processes can share checkpoint data: either via shared memory or via messaging. Typically, all processes on the same node will use shared memory to share checkpoint data (intra-node) and a designated process per node will handle communication with other nodes (inter-node). However, if a process opens the checkpoint without the shared memory flag, it cannot only use the messaging communication mechanism. It is in essence behaving as if it was running isolated on its own node. When this document refers to inter-node communication, messaging communication, a process being a "node leader", etc. it may refer to actual communication across nodes but it equally refers to communication to processes that have opened the checkpoint "unshared".
For example, it is possible to have a single checkpoint opened and shared by 3 processes and opened unshared by 2 processes all on the same node. In this case, there will be 3 copies of that checkpoint stored in RAM on the node; 1 copy located in shared memory and 2 copies in process local memory. As of the writing, this configuration's is mostly conceived for test and debugging, but the existence of this configuration proves that the code is written in a robust fashion.
Discovery
When a checkpoint is created, it will be opened in shared memory (if applicable). Data within the shared memory will be used to elect a "synchronization replica process" (see Election section). A process that opens an "unshared" checkpoint is defacto the "sync replica process" of its own checkpoint replica.
The job of the synchronization replica process is to keep the checkpoint synchronized with other replicas via the messaging interface. To do this, the synchronization replica process will register a new group with the Group service. It is identified by a well-known Handle or by the ClusterUniqueId returned by the Group registration. It may also be identified by a string entry in the Name service and all APIs that need a checkpoint will accept either a ClusterUniqueId or a string name.
So every checkpoint replica will have exactly one process that is a member of this checkpoint group. Each checkpoint will have a different group. So the Group service (and Name service) end up being implicitly responsible for ensuring that a particular checkpoint (identified by handle or name/handle) is shared across the cluster. In other words, Checkpoint replicas discover each other by joining the Checkpoint's group and iterating through the group members.
The active process designated in the checkpoint's group will be the checkpoint writer, in checkpoints that only allow a single writer. In this case, the process is called the "master replica".
Replication
Replication consists of copying the checkpoint and changes to the checkpoint to another node. There are several relevant cases:
- The replica has no data
- The replica has old data
- The replica is in sync
Efficient replication in the case of 2 and 3 requires that data already located in the replica be used, not retransmitted (delta replication). Two general approaches exist; differencing and time stamps. Differencing involves running a checksum over trees of data values and replicating portions with a checksum mismatch. This is a CPU-intensive approach that allows delta replication regardless of how old the local data is and simultaneously verifies correctness. It is most naturally implemented in a tree data structure, yet the fastest checkpoint lookup structure is a hash table. Timestamp replication annotates each change with a monotonically increasing change id located either in-data or in a separate log. This implementation uses timestamp replication to minimize CPU use and because verification of correctness is not necessary since nodes are trusted.
In general, there are 2 extremas of checkpoint table structure that may render replication strategies inefficient. The first use is "many small records", the second "few large records". The "many small record" extrema is space inefficient if replication requires per-record information. The "few large records" extrema is inefficient if record replication must occur atomically. But since the SA-Forum APIs allow partial record updates it is important for efficiency that a partial record update only transmit the changed portion of the record. However, identifying changes with a timestamp becomes problematic if sub-record delta replication must be supported because a record may have multiple change times.
The timestamp shall consist of 2 parts; a 4 byte "generation" and 4 byte change number stored in the checkpoint header. If the generation of the local replica does not match that of the master replica, all local replica data shall be invalidated. Each transaction that is written to the replica master shall increment the change number. The updated change number shall be written to the checkpoint header and associated with the data that was changed.
Two ways of associating change numbers to data are supported and can be selected by the user: change annotation and change logs. Change annotation allocates memory for the change number in each piece of data written. Change logs use a separate data structure -- a circular queue -- that associates the change number with the key and data sub-set that changed. Therefore, change logs are efficient for "few large partially-updated records", "rarely updated", or "many short records" checkpoint styles, whereas change annotation is efficient for rapidly updated records.
Replica Synchronization
The local replica shall begin replica synchronization as follows:
- Block all checkpoint read access.
- Subscribe to all change updates.
- Request all changes that have occurred after its change number.
- The master shall respond with all deltas between the passed change number and the master's current number. The master can respond with with a "too old" error if the generation (change annotation) does not match or the replica's change number is lower then the oldest change log record (change logs). In this case, the master shall also send the entire checkpoint data.
- The local replica shall apply changes to its database IF the record's local change number (if it exists) is less than the number passed in the synchronization message. If the record's local change number is greater, it must have been updated due to change update message.
- If in-data change numbers are used, the local replica shall apply change updates from 1 the moment they arrive. If a the change log technique is used, the local replica shall defer all updates until synchronization is complete.
Replica Updates
The local replica shall subscribe to checkpoint change updates (as described in 1 above). The master replica shall forward all changes to every subscribed replica, including the relevant change number. Replicas know that every change record has been received because the change number is monotonically increasing. If a replica receives a skipped or out-of-order change number, it can initiate synchronization as described above. At this point, it is not possible for the replica to request all change in a particular change number since a subsequent change may have overwritten some of the changes in this set (on the master). Therefore the master can only respond with all changes from a particular number to the latest (i.e. synchronization).
Replica synchronization may require a complete iteration through the keys of the checkpoint database, so effort should be made to not lose replica update messages.
