Checkpoint
The checkpoint entity forms the backbone of the coordination of information between nodes in the cluster. Abstractly, it is a "dictionary", "map", or database table data structure -- that is, a user provides an arbitrary data "key" that returns an arbitrary data "value". However a Checkpoint differs from these structures because it exists in all processes that are interested in it. A checkpoint is fully replicated to all nodes that are interested in it -- it is not a "distributed" dictionary where every node has partial data.
Uses
The primary use for checkpoint is synchronization of state between redundant programs. The "active" software writes a checkpoint, and the "standby" software reads it. This is more useful than a simple message interface because the checkpoint abstraction simultaneously presents the total state and the incremental state changes. Total state access is required when a "cold" standby is first started, when the standby fails and is restarted, and when the active fails over if the "warm" standby has not been actively tracking state changes. Incremental state changes (deltas) are required so a "hot" standby can continually update its program state in response to the active software's actions.
Checkpoint is also used whenever commonly-used data needs to be distributed throughout the cluster. Checkpoints can be single-writer, multiple-reader (more efficient), or multiple-writer, multiple-reader.
A Checkpoint is inappropriate when the data is not often used. Although a checkpoint may be written to disk for recovery during a catastrophic failure event, the entire checkpoint data set is stored in RAM. Therefore a traditional replicated database is more appropriate for a large and/or rarely used data set.
Major Features
Most major features can be selected at object creation time to optimize for speed or for utility
Replicated: Replicated efficiently to multiple nodes
Nested: Checkpoint values can be unique identifiers that automatically resolve-and-lookup in another Checkpoint
Persistent: Checkpoints can automatically store themselves to disk (Persistence)
Notifiable: You can subscribe to get notified of changes to Checkpoints (Event)
Shared memory: For efficiency, a single service per node can maintain coherency (with the rest of the cluster) of a checkpoint. To do this the checkpoint is stored in shared memory
Transactional: Checkpoint operations can be wrapped in a cluster-wide Transaction
Partial record updates: A partial record update occurs when a checkpoint entry (key,value) exists and an application writes a subset of the value. For example, write 100 bytes to offset 10000100 to 10000200. The value of a partial record update becomes compelling when the value of the checkpoint entry is very long which is why the prior example was used.
Design
In this document the term "Checkpoint" will be used to refer to the entire replicated checkpoint abstraction. The term "local replica" will be used to refer to a particular copy of the checkpoint data. "Shared replica" refers to a process that only accesses the checkpoint via shared memory.
Process Access
A Checkpoint can be located in process private memory or in shared memory based on an option when the Checkpoint is created.
A Checkpoint that is used by multiple processes on the same node should be located in shared memory for efficiency.
Checkpoint Creation
A checkpoint is always identified by a Handle. At the API layer, a string name can be used. If the latter, this name will be registered with the Name service using the checkpoint's Handle as the value.
Checkpoint Retention Timer and Deletion
Introduction
When all processes close a checkpoint wait N (configurable) seconds and then delete it from memory. This is the checkpoint "retention time".
Each checkpoint is provided with retentionDuration argument when it's opened. When the last call to checkpoint close is performed, a timer is started with retentionDuration and when the timer expires, data of this checkpoint will be deleted from memory. Let's say processes A and B have a checkpoint open which is configured for a 5 minute retention time. A exits. The checkpoint stays "alive". B exits. Now no process has the checkpoint open. The system does NOT close the checkpoint. At 3 minutes after there were no users, process C opens the checkpoint. It opens the original checkpoint data because it was retained for that time. Now process C closes the checkpoint. Again no process has it open so the timer starts. After 5 minutes the data is deleted from shared memory.
For persistent checkpoints, we should have another field "persistentRetentionTime" that configures how long the data is retained on disk.
The purpose of these retention times is to clean up unused resources, but not so quickly that a failure and restart will cause the data to be deleted.
Implementation (retention timer is a separate process)
Because each checkpoint is stored in a file in shared memory, so, we will iterate all these files to see if there is a process opening or closing a checkpoint. To do this, we'll add "lastUsed" parameter to each checkpoint header. lastUsed stores the time and will updated when one of the following operations invoked: init checkpoint, read checkpoint and write checkpoint. Local copy of lastUsed for each checkpoint is stored as soon as retention timer starts. When retention timer expires, we'll compare the its local value with the one read from the shared memory header: if the result is different, this means the checkpoint is being used, so restart the retention timer, otherwise delete the checkpoint.
The implementation can periodically update the checkpoint shared memory to get the newly added checkpoint and update. To achieve this, we must have a process (named ckptretention). It monitors all the created checkpoint as described above. This process should be started along with other SAFplus servers (safplus_amf, safplus_log,...)
clCustomization.hxx
1 SAFplusI
2 {
3 enum {
4 CkptUpdateDuration = 60, /* This is the configured duration in second for which the program update the checkpoint, means to get changed checkpoint parameters such as last used time or there is any new checkpoint added */
5 CkptRetentionDurationDefault = 28800, /* This is the default retention duration in second for the retention timer to decide if a checkpoint data is deleted from memory */
6 };
7 //...
8 };
clCkptIpi
clCkptApi.hxx
In each Checkpoint constructor, add more argument named "retentionDuration" like this:
1 Checkpoint(const Handle& handle, uint_t flags, uint64_t retentionDuration, uint_t size=0, uint_t rows=0);
2 Checkpoint(uint_t flags, uint64_t retentionDuration, uint_t size=0, uint_t rows=0);
3 Checkpoint(); // The default constructor, no argument supplied, retentionDuration will use the default value from clCustomization.hxx
4
clckpt.cxx
1 /* On checkpoint init, currently we name the shared memory file in the format: ckpt_handle.id[0]:handle.id[1]. The retention timer has no way to take the retentionDuration because it belongs to another process. So, my first suggestion, in order to take the retentionDuration, the shared memory file format should be appended with retentionDuration so that RetentionTimer process can parse the files and read it. Consequently, the new format should be: ckpt_handle.id[0]:handle.id[1]:retentionDuration */
clCkptRetention.cxx
1 /* The callback function declaration for timer handler */
2 typedef void (*TimeoutCb) (const boost::system::error_code&, void*);
3
4 /* this class wraps the boost deadline timer so that we can operate boost deadline timer via an object */
5 class Timer
6 {
7 protected:
8 boost::asio::deadline_timer timer;
9 uint64_t waitDuration;
10 boost::asio::io_service& iosvc;
11 TimeoutCb fpOnTimeout;
12 public:
13 Timer(uint64_t wd, boost::asio::io_service& _iosvc, TimeoutCb cb): waitDuration(wd), iosvc(_iosvc), fpOnTimeout(cb), timer(_iosvc, boost::posix_time::seconds(wd))
14 {
15
16 }
17 /* wait on a duration */
18 void wait(void* arg)
19 {
20 timer.async_wait(boost::bind(fpOnTimeout,boost::asio::placeholders::error, arg));
21 }
22 /* Force the timer to expire immediately */
23 void expire()
24 {
25 timer.expires_from_now(boost::posix_time::seconds(waitDuration));
26 }
27 /* Cancel the timer immediately, the timer no longer runs */
28 void cancel()
29 {
30 timer.cancel();
31 }
32 ~Timer()
33 {
34
35 }
36 };
37
38 /* this struct contains the timer pointer and retention duration for the timer */
39 struct RetentionTimerData
40 {
41 Timer timer;
42 //uint64_t retentionDuration; // in second (obsolete)
43 // And maybe a ProcGate associated with this checkpoint used to check its status
44 boost::posix_time::ptime lastUsed;
45 bool isRunning; // states that if this timer is running or not
46 RetentionTimerData(Timer t, boost::posix_time::ptime _lastUsed, bool running): timer(t), lastUsed(_lastUsed), isRunning(running)
47 {
48 }
49 };
50
51 typedef std::pair<const SAFplus::Handle, RetentionTimer> CkptTimerMapPair;
52 typedef boost::unordered_map<SAFplus::Handle, RetentionTimerData> CkptTimerMap; // a map contains ckptHandle as a key and struct RetentionTimerData as a value
53
54 /* Retention timer class: use this class for retention timer for both shared memory files and persistent files on disk */
55 class RetentionTimer
56 {
57 public:
58 CkptTimerMap ckptTimerMap;
59 /*RetentionTimer()
60 {
61 }*/
62 // the onTimeout function for ckptRetentionTimer
63 virtual void onRetentionTimeout(const boost::system::error_code& e, void* arg)=0;
64
65 // an entry for starting retention timer
66 void startTimer(CkptTimerMap::iterator& iter)
67 {
68 boost::asio::deadline_timer timer = iter->second.timer;
69 timer.wait();
70 iter->second.isRunning = true;
71 }
72 // an entry for stopping retention timer
73 void stopTimer(CkptTimerMap::iterator& iter)
74 {
75 boost::asio::deadline_timer timer = iter->second.timer;
76 if (iter->second.isRunning)
77 {
78 timer.cancel();
79 iter->second.isRunning = false;
80 }
81 }
82 // Fill out the retentionTimerMap, also update in case there is a new checkpoint created
83 void updateRetentionTimerMap(const char* path)=0;
84 };
85
86 class SharedMemFileTimer: public RetentionTimer
87 {
88 public:
89 virtual void updateRetentionTimerMap(const char* path)
90 {
91 /* TODO: using boost library to walk thru the /dev/shm/ to get all the checkpoint shared memory files following the format: cktp_handle.id[0]:handle.id[1]:retentionDuration
92 Loop thru all files to get the ckpt handle, retention duration, handle.id[1] (as a ProcGate semId)
93 */
94 // Add item to map
95 Timer timer(retentionDuration, io, onRetentionTimeout);
96 SAFplus::ProcGate gate(ckptHandle.id[1]);
97 RetentionTimerData rt(timer, gate, false);
98 CkptTimerMapPair value(ckptHandle, rt);
99 ckptTimerMap.insert(value);
100 }
101 virtual void onRetentionTimeout(const boost::system::error_code& e, void* arg)
102 {
103 if (e != boost::asio::error::operation_aborted)
104 {
105 // Timer was not cancelled, the timer has expired within retentionDuration, so delete data
106 SAFplus::Handle* ckptHandle = (SAFplus::Handle*)arg;
107 char sharedMemFile[256];
108 // TODO: construct the checkpoint name based on its handle and retention duration
109 boost::interprocess::shared_memory_object::remove(sharedMemFile);
110 // Remove this item from the map, too
111 CkptTimerMap::iterator contents = ckptTimerMap.find(ckptHandle);
112 if (contents != ckptTimerMap.end())
113 {
114 ckptTimerMap.erase(contents);
115 }
116 }
117 else
118 {
119 logDebug("CKPSVR", "TIMEOUT", "timer was cancelled because the specified checkpoint is opened");
120 }
121 }
122 };
123
124 class PersistenFileTimer: public RetentionTimer
125 {
126 public:
127 virtual void updateRetentionTimerMap(const char* path)
128 {
129
130 }
131 virtual void onRetentionTimeout(const boost::system::error_code& e, void* arg)
132 {
133 if (e != boost::asio::error::operation_aborted)
134 {
135 // Timer was not cancelled, the timer has expired within retentionDuration, so delete data
136 SAFplus::Handle* ckptHandle = (SAFplus::Handle*)arg;
137 char path[256];
138 // TODO: construct the checkpoint name based on its handle and retention duration
139 unlink(path); // remove file on disk
140 // Remove this item from the map, too
141 CkptTimerMap::iterator contents = ckptTimerMap.find(ckptHandle);
142 if (contents != ckptTimerMap.end())
143 {
144 ckptTimerMap.erase(contents);
145 }
146 }
147 else
148 {
149 logDebug("CKPSVR", "TIMEOUT", "timer was cancelled because the specified checkpoint is opened");
150 }
151 }
152 };
153
154 /* struct definition for argument for timer handler */
155 struct TimerArg
156 {
157 SharedMemFileTimer* shmTimer;
158 PersistentFileTimer* perstTimer;
159 TimerArg(SharedMemFileTimer* _shmTimer, PersistentFileTimer* _perstTimer):shmTimer(_shmTimer), perstTimer(_perstTimer)
160 {
161 }
162 };
163
164
165 /* Global variables and functions */
166
167 boost::asio::io_service io; // io service to associate with one or many deadline timers. In this case, it associates with ckptUpdateTimer and ckptRetentionTimer
168 // the onTimeout function for ckptUpdateTimer
169 void onCkptUpdateTimeout(const boost::system::error_code& e, void* arg);
170 // update timer ckpt retention data and start or stop timer accordingly
171 void updateTimer(RententionTimer* timer);
172 // Run the io service
173 void runIoService();
174
175 int main()
176 {
177 // Add work to io service so that the deadline timer continues working after it restarts
178 boost::asio::io_service::work work(io);
179
180 SharedMemFileTimer sharedMemFilesTimer;
181 PersistentFileTimer persistentFilesTimer;
182 TimerArg timerArg(&sharedMemFilesTimer, &persistentFilesTimer);
183 // start the ckpt update timer
184 Timer ckptUpdateTimer(CkptUpdateDuration, io, onCkptUpdateTimeout);
185 ckptUpdateTimer.wait(&timerArg);
186 // run the io service
187 runIoService(); // The program control blocks here
188 }
189
190 void onCkptUpdateTimeout(const boost::system::error_code& e, void* arg)
191 {
192 // get the RetentionTimer objects from arg (both sharedMemfiles and persistent files)
193
194 updateTimer(sharedMemFilesTimer);
195 updateTimer(persistentFilesTimer);
196 // start a ckpt update cycle
197 ckptUpdateTimer.wait();
198 }
199
200 void updateTimer(RententionTimer* timer)
201 {
202 timer->updateRetentionTimerMap();
203 // Loop thru the map for each ckpt: check to see if there is any process opening this checkpoint
204 for(CkptTimerMap::iterator iter = timer->ckptTimerMap.begin(); iter != timer->ckptTimerMap.end(); iter++)
205 {
206 timer->startTimer(iter);
207 }
208 }
209
210 void runIoService()
211 {
212 io.run();
213 }
Implementation (retention timer is in a calling process)
Instead of creating retention timer in a separate process, this method is to integrate it in a calling process. Retention timer implementation is taken place at Checkpoint object (clckpt.cxx): each checkpoint object has a retention timer. Retention timer will be activated when a calling process uses checkpoint object APIs. But this method may not solve the problem when a process itself is failure:
Persistent checkpoint Retention Timer and Deletion
Introduction
Persistent checkpoint is checkpoint whose data is stored in the files managed by database access layer (DBAL). The functionality of how to delete the files is similar to deleting checkpoint from shared memory: persistentRetentionDuration is provided for a checkpoint and after this time if there is no any process accessing the files, they will be deleted from disc
Persistent checkpoint
Introduction
Checkpoint data is stored on the shared memory of a node. This data is available when the node is alive. But when the node is powered off, this data is not available. In order to make it available regardless of node state (on or off) it needs to be stored on disk permanently. However, writing data to disk is quite slow, so not all of checkpoint data is written. A checkpoint data will be written to disk if:
- the checkpoint type is PERSISTENT and
- when flush() function of the checkpoint is called
But the action has some requirements:
- For writing: make sure that data on shared memory and data on disk are synchronized. For example, checkpoint write was called many times before, in which new data (new key, value) was added and old data (with old key but new data) is updated. So, when flush() is called, all those changes must be reflected to disk
- For reading: read data from shared memory first, if it's empty, read from disk but we don't need to take care of this, this is always ensured because data between shared memory and disk are synchronized, so reading from shared memory or reading from disk are similar.
Implementation
clCkptApi.hxx
- Each checkpoint identified by its handle has a corresponding table in the database (in case it's PERSISTENT), so adding member variable name dbHandle that points to its database
- Adding flush() member function that reflects all changes from checkpoint shared memory to checkpoint database table
1 class Checkpoint
2 {
3 protected:
4 ClDBHandleT dbHandle; // Database handle to manipulate with database
5
6 public:
7 void flush(); // Reflects all changes from shared memory to database
8
9 protected:
10 void syncFromDisk(); // in the case that user opens a non-existing checkpoint but its data is available on disk, so this function copies data from database to checkpoint data to ensure data between them to be synchronized
11
12 };
clckpt.cxx
1 void SAFplus::Checkpoint::init(const Handle& hdl, uint_t _flags, uint64_t retentionDuration, uint_t size, uint_t rows,SAFplus::Wakeable& execSemantics)
2 {
3 //...
4 if (ckptNotExists) // if this checkpoint doesn't exist
5 {
6 // check to see if the database of this checkpoint exists
7 rc = clDbalOpen(tempStr, tempStr, CL_DB_OPEN, maxKeySize, maxRecordSize, &dbHandle);
8 if (rc == CL_OK)
9 {
10 boost::thread(syncFromDisk, NULL); // launch a new thread that synchronizes data from database with this checkpoint data
11 }
12 else
13 {
14 if (flags&PERSISTENT)
15 {
16 // create a new database
17 rc = clDbalOpen(tempStr, tempStr, CL_DB_CREATE, maxKeySize, maxRecordSize, &dbHandle);
18 }
19 }
20 }
21 //...
22 }
23
24 void SAFplus::Checkpoint::flush()
25 {
26 if (flags&PERSISTENT)
27 {
28 // write code that reads data from shared memory, then synchronizes those with data from database
29 }
30 else
31 {
32 clDbgNotImplemeted("%s is not supported with this checkpoint type", __FUNCTION__);
33 }
34 }
35
36 void SAFplus::Checkpoint::syncFromDisk()
37 {
38 // Write code that reads data from database and write to checkpoint data
39 }
DBAL checkpoint
Intra-Node and Inter-Node Replication and Communication
There are two ways that processes can share checkpoint data: either via shared memory or via messaging. Typically, all processes on the same node will use shared memory to share checkpoint data (intra-node) and a designated process per node will handle communication with other nodes (inter-node). However, if a process opens the checkpoint without the shared memory flag, it cannot only use the messaging communication mechanism. It is in essence behaving as if it was running isolated on its own node. When this document refers to inter-node communication, messaging communication, a process being a "node leader", etc. it may refer to actual communication across nodes but it equally refers to communication to processes that have opened the checkpoint "unshared".
For example, it is possible to have a single checkpoint opened and shared by 3 processes and opened unshared by 2 processes all on the same node. In this case, there will be 3 copies of that checkpoint stored in RAM on the node; 1 copy located in shared memory and 2 copies in process local memory. As of the writing, this configuration's is mostly conceived for test and debugging, but the existence of this configuration proves that the code is written in a robust fashion.
Discovery
When a checkpoint is created, it will be opened in shared memory (if applicable). Data within the shared memory will be used to elect a "synchronization replica process" (see Election section). A process that opens an "unshared" checkpoint is defacto the "sync replica process" of its own checkpoint replica.
The job of the synchronization replica process is to keep the checkpoint synchronized with other replicas via the messaging interface. To do this, the synchronization replica process will register a new group with the Group service. It is identified by a well-known Handle or by the ClusterUniqueId returned by the Group registration. It may also be identified by a string entry in the Name service and all APIs that need a checkpoint will accept either a ClusterUniqueId or a string name.
So every checkpoint replica will have exactly one process that is a member of this checkpoint group. Each checkpoint will have a different group. So the Group service (and Name service) end up being implicitly responsible for ensuring that a particular checkpoint (identified by handle or name/handle) is shared across the cluster. In other words, Checkpoint replicas discover each other by joining the Checkpoint's group and iterating through the group members.
The active process designated in the checkpoint's group will be the checkpoint writer, in checkpoints that only allow a single writer. In this case, the process is called the "master replica".
Replication
Replication consists of copying the checkpoint and changes to the checkpoint to another node. There are several relevant cases:
- The replica has no data
- The replica has old data
- The replica is in sync
Efficient replication in the case of 2 and 3 requires that data already located in the replica be used, not retransmitted (delta replication). Two general approaches exist; differencing and time stamps. Differencing involves running a checksum over trees of data values and replicating portions with a checksum mismatch. This is a CPU-intensive approach that allows delta replication regardless of how old the local data is and simultaneously verifies correctness. It is most naturally implemented in a tree data structure, yet the fastest checkpoint lookup structure is a hash table. Timestamp replication annotates each change with a monotonically increasing change id located either in-data or in a separate log. This implementation uses timestamp replication to minimize CPU use and because verification of correctness is not necessary since nodes are trusted.
In general, there are 2 extremas of checkpoint table structure that may render replication strategies inefficient. The first use is "many small records", the second "few large records". The "many small record" extrema is space inefficient if replication requires per-record information. The "few large records" extrema is inefficient if record replication must occur atomically. But since the SA-Forum APIs allow partial record updates it is important for efficiency that a partial record update only transmit the changed portion of the record. However, identifying changes with a timestamp becomes problematic if sub-record delta replication must be supported because a record may have multiple change times.
The timestamp shall consist of 2 parts; a 4 byte "generation" and 4 byte change number stored in the checkpoint header. If the generation of the local replica does not match that of the master replica, all local replica data shall be invalidated. Each transaction that is written to the replica master shall increment the change number. The updated change number shall be written to the checkpoint header and associated with the data that was changed.
Two ways of associating change numbers to data are supported and can be selected by the user: change annotation and change logs. Change annotation allocates memory for the change number in each piece of data written. Change logs use a separate data structure -- a circular queue -- that associates the change number with the key and data sub-set that changed. Therefore, change logs are efficient for "few large partially-updated records", "rarely updated", or "many short records" checkpoint styles, whereas change annotation is efficient for rapidly updated records.
Replica Synchronization
The local replica shall begin replica synchronization as follows:
- Block all checkpoint read access.
- Subscribe to all change updates.
- Request all changes that have occurred after its change number.
- The master shall respond with all deltas between the passed change number and the master's current number. The master can respond with with a "too old" error if the generation (change annotation) does not match or the replica's change number is lower then the oldest change log record (change logs). In this case, the master shall also send the entire checkpoint data.
- The local replica shall apply changes to its database IF the record's local change number (if it exists) is less than the number passed in the synchronization message. If the record's local change number is greater, it must have been updated due to change update message.
- If in-data change numbers are used, the local replica shall apply change updates from 1 the moment they arrive. If a the change log technique is used, the local replica shall defer all updates until synchronization is complete.
Replica Updates
The local replica shall subscribe to checkpoint change updates (as described in 1 above). The master replica shall forward all changes to every subscribed replica, including the relevant change number. Replicas know that every change record has been received because the change number is monotonically increasing. If a replica receives a skipped or out-of-order change number, it can initiate synchronization as described above. At this point, it is not possible for the replica to request all change in a particular change number since a subsequent change may have overwritten some of the changes in this set (on the master). Therefore the master can only respond with all changes from a particular number to the latest (i.e. synchronization).
Replica synchronization may require a complete iteration through the keys of the checkpoint database, so effort should be made to not lose replica update messages.