SAFplus 7 Feature Discussion
SAFplus Messaging
Advanced Socket Layer
The socket abstraction (MsgSocket class) presents a simple read/write API for multiple scatter gather messages, consisting of receive, send, and flush APIs. At the message transport layer, an instance of the socket object directly communicates with the Linux level socket via system calls. However additional socket objects can be created that implement additional functionality, such as traffic shaping, message segmentation and reassembly and reliable messaging. These message sockets can modify the messages sent and received and then call a lower level MsgSocket instance.
Functionality can therefore be selected at socket construction time by instantiation of a stack or tree of MsgSocket-derived classes, and a single implementation of an abstract concept (such as traffic shaping) can be applied to any message transport type.
After socket construction, application code is unaware that it is accessing layered sockets rather than a direct socket implementation, making it easy to change add or remove functionality as application requirements change.
The diagram above shows 3 applications with different socket stacks. Application 1 is using UDP transport, traffic shaping and allows large messages (segmentation and reassembly). Application 2 does not need large messages; it is just using traffic shaping. Application 3 shows a complex configuration. Its node supports both UDP and TIPC transports, so the first layer above the transport ("splitter") uses the destination nodeId to determine which transport to route the message to. Large messages are supported so the "Segmentation And Reassembly" object is next. After that, the application may need both reliable and unreliable messages, so a "Joiner" object is added that is capable of providing 2 independent MsgSocket interfaces and combining them into one (by prefixing a "message type" character to the front of every message, for example). Finally, a "reliable message" MsgSocket object provides reliable messaging. The application code sees 2 MsgSocket interfaces; one for unreliable and one for reliable messages.
Note that is is expected that the receiving application will have the same socket stack, in cases where the MsgSocket objects modify message content.
SCTP transport plugin
SCTP stands for Stream Control Transmission Protocol. SCTP is the extension of TCP, therefore, it has some typical features:
- Data transfer is reliable
- Multi-stream support
- Multi-homing support
- Ordered delivery is not strictly enforced
Because of the data transport in stream, so there are 2 separate methods to open socket at server and client:
- Server:
- open a socket
- set sctp socket options
- bind the opened socket with IP address and port
- listen connections
- accept connections (optional)
- receive/send from/to clients
- Client:
- open a socket
- set sctp socket options
- connect to the listening server socket
- send/receive to/from server socket
However, to implement the peer-to-peer model using SCTP, the model has to be refined:
- Each client supports its own listen socket derived from the "port" supplied to the messaging transport, so that another client can connect directly to its peer. For example, assuming there are 4 nodes: A(s1), B(s2), C(s3), D(s4). s1,s2,s3,s4 are the opened listen sockets (on the specified port) on each node. Assuming A wants to send message to B, C and D, we have to open 3 client sockets (on the same port as servers, of course) that connect to s2,s3,s4 respectively. After that, from A, messages can be sent to s2,s3,s4 as well as received from them. Communications sockets are opened on an as-needed basis. That is, the socket from A to C is not opened until a message is sent from A to C or from C to A. The communication socket is then but left open and reused for the duration of the application.
* Node IP addresses
- The initial implementation uses the last 8 bits (D) of the A.B.C.D IP address as the node ID (See UDP example). Isolate the code that transforms node IDs into IP addresses into a function call so that we can change the node to IP address translation algorithm easily for "cloud mode" application (see below).
Network Address Mapping
There are two techniques to convert node IDs into network addresses. "Direct mapping" and "cloud mode".
- The direct mapping technique is useful in a private backplane network. In this case, the network address is algorithmically derived from the node ID and some statically configured environment variable data. For example, for IP addresses the environment variable is defined:
SAFPLUS_BACKPLANE_INTERFACE=eth0:1
and network and node ID are optionally defined:
SAFPLUS_BACKPLANE_NETWORK=169.254.26/9 SAFPLUS_NODE_ID=10
If SAFPLUS_BACKPLANE_NETWORK and SAFPLUS_NODE_ID are defined, the system derives and IP address from this pair (in this case, 169.254.26.10) and assigns it to the interface overriding an existing different IP address assignment.
If these variables are not defined, the interface $SAFPLUS_BACKPLANE_INTERFACE MUST have an IP address already assigned (otherwise throw exception). That IP address and netmask is read and SAFPLUS_BACKPLANE_NETWORK and SAFPLUS_NODE_ID extracted.
So given a node ID (in this example, 10) any node in the cluster can construct an IP address for the node (in this example, 169.254.26.10)
How does the node discover its own ID? It uses the SAFPLUS_NODE_ID environment variable:
[Hung] Do you mean SAFPLUS_BACKPLANE_NETWORK is the unique value throughout the cluster? If so, we can retrieve an IP address of any node by this method. With above example also, let say we're in node 1 (NodeID:1) and we want to retrieve the IP address of node 7 (NodeID:7). So, from node 1, we also get the value of SAFPLUS_BACKPLANE_NETWORK (= 169.254.26/9) and the IP address of node 7 should be 169.254.26.7. Is this correct?
[stone] Yes, but note that SAFPLUS_BACKPLANE_NETWORK may either be an environment variable OR derived from and existing IP address assignment (see revised description above). msgUdp DOES NOT do it correctly right now... can you implement the above logic for all IP based transports: UDP, SCTP and TCP...
- Cloud mode assumes that network addresses are arbitrary and contain more bits than the node ID. Therefore it is impossible to algorithmically transform a node ID into a network address.
In this mode, every node is configured via environment variable with the IP address of "well known" nodes in the cluster. Best practices are to make this at least 2: SAFPLUS_CLOUD_PEERS=54.23.54.13,mycluster.mycompany.com Note that domain names are allowed. The new node will contact one of these nodes to request:
- A node ID (the new node can supply a preferred ID and indicate whether it MUST have this ID -- i.e return error if ID is already taken, or whether it is merely a preference)
- the mapping between node IDs and addresses.
Upon each new registration, the cloud mode server will notify all other nodes of the existence of this node.
This is a generic mapping service so can be implemented once for all transports. However, the service should use the underlying transport (perhaps via a special port/ socket) so that part may be tricky :-).
SAFplus Management
Management Access APIs
We need a c++ API to issue management object set and gets from any application. This will allow the management data to be generically manipulated and so we will not need to support a lot of application specific APIs like log stream creation or Dynamic HA operations.
Applications can also use these APIs to implement a new northbound protocol (like SNMP and NETCONF) if they need to.
Proposal:
mgtSet(const string& pathSpec, const string& value, Transaction& t); string mgtGet(const string& pathSpec); void mgtCreate(const string& pathSpec); void mgtDelete(const string& pathSpec);
All APIs raise an exception if they cannot complete.
For example:
mgtSet("/Log/stream["mystream"]/filename", "/var/log/myapplog")
Management CLI Directory
The current management CLI's "directory" structure from the root looks like:
*--- root |--- Application |--- ServiceUnit # this is ServiceUnit["su0"] but there's no way to determine this |--- ServiceUnit # this is ServiceUnit["su1"]
The corresponding YANG (see SAFplusAMF.yang) is:
module SAFplusAMF { list ServiceUnit { } }
- It would be great if a CLI "plugin" could reorganise the directory structure. That is, it should be able to create new subdirectories and take any existing subdirectory and move it somewhere else. In other words, the directory structure that the CLI presents does not need to have any relationship with the actual YANG structure. The first step here is to abstract the CLI directory structure from the YANG one with a translation layer, and a nice API to move things around.
- The current "default" directory structure is confusing. Instead the default should be:
*--- root |--- SAFplusAMF # This is the 'prefix' field defined in the module or the name of the module if prefix is not provided. |--- Application |--- ServiceUnit # A "list" in Yang creates a directory and the list's elements are items in the directory |--- su0 |--- su1
Standard YANG Implementations
SAFplus Management will provide reference implementations for many standard YANG modules.
- Each module will be implemented as a separate shared library that can be linked into an application that needs to provide that YANG data.
- All modules will have the same basic initialization and finalization APIs, for example.
xxxInitialize(); xxxFinalize();
where xxx is the name of the YANG module.
""Note, it may make sense to make these YANG modules dynamically loadable plugins. In that case they will follow the SAFplus plugin architecture""
- Modules will be aware of active/standby designations and work properly with other instances running on the system.
- Modules will be written with the expectation that customers will modify the source code to implement proprietary features. So clearly documented, well commented code.
- Where appropriate, modules will communicate with the underlying Linux OS to implement features.
Features that cannot be implemented by OpenClovis will be clearly marked by the DbgNotImplemented("description") macro.
YANG modules
SAFplus Checkpoint
Checkpoint deletion
When all processes close a checkpoint wait N (configurable) seconds and then delete it from memory. This is the checkpoint "retention time".
Q&A:
[Hung] Do you mean that any process (including safplus process and user process) that is using the checkpoint, in the case all of them close the checkpoint, then the data of the checkpoint stored in the shared memory must be deleted? Can you give an example for more specific if any?
[Andrew] Yes, Let's say processes A and B have a checkpoint open which is configured for a 5 minute retention time. A exits. The checkpoint stays "alive". B exits. Now no process has the checkpoint open. The system does NOT close the checkpoint. At 3 minutes after there were no users, process C opens the checkpoint. It opens the original checkpoint data because it was retained for that time. Now process C closes the checkpoint. Again no process has it open so the timer starts. After 5 minutes the data is deleted from shared memory.
For persistent checkpoints, we should have another field "persistentRetentionTime" that configures how long the data is retained on disk.
The purpose of these retention times is to clean up unused resources, but not so quickly that a failure and restart will cause the data to be deleted.
[Hung] Thanks for the clarification. This concept basically comes from Checkpoint 6: each checkpoint is provided with retentionDuration argument when it's opened. When the last call to checkpoint close is performed, a timer is started with retentionDuration and when the timer expires, data of this checkpoint will be deleted from memory.
To resolve this issue, we must have a checkpoint server which receives checkpoint open and checkpoint close notification (We may already have checkpoint server handler class named CkptServerMsgHandler which may receive and handle incoming msg - I'm not sure, need to check). It also starts or stops the timer as needed.
So, implementation for this feature should be:
- Add retentionDuration to the checkpoint constructor
- When a checkpoint is opened, init() function is called then sends notification "CKPT_OPEN"
- On receiving "CKPT_OPEN" notification, server will check if the timer has been started and it is not expired, then server stops the timer and increase the count of ckpt opening
- When a checkpoint is closed (the parent process exits), from the ckpt destructor, sends the notification "CKPT_CLOSE"
- On receiving "CKPT_CLOSE" notification, server will decrease the count of ckpt opening and check if this is the last ckpt close notification, then it starts the timer. If the timer expires and there is no "CKPT_OPEN" within the retentionDuration, the checkpoint data will be deleted from the memory.
[Andrew] This strategy will not work because a process can be killed. If it is killed it will not be able to send the CKPT_CLOSE notification. And the same problem exists with CKPT_OPEN: if SAFplus is killed and restarted we need to start a retention timer for a checkpoint that may not be opened. Instead do it this way:
- Changes to the checkpoint clients:
- When a checkpoint is opened, store the "retention time" in the checkpoint shared memory header (store it as milliseconds for consistency with other stored durations). If multiple processes open the same checkpoint use the maximum retention time (compare your value with the one in shared memory and overwrite if yours is larger).
- Also create a uint64 in the checkpoint header called "openCount". This will track the # of times the checkpoint has been opened.
Also create a SAFplus::ProcSem whose ID is based on the checkpoint handle. "unlock" this ProcSem (adds 1). If the process dies for any reason, the ProcSem will be "locked" (subtracts 1)
Create a CheckpointCleanup class (which will run in safplus_amf, or any other process -- one per node).
- Every 30 (configurable) seconds: For all checkpoints in /dev/shm (their name follows a particular format):
- Read the checkpoint handle from the checkpoint
Create a SAFplus::ProcSem based on that handle
If the value of the ProcSem is zero, start a timer (boost has timers, IIRC) based on the RetentionTime, and remember a local copy of openCount (localOpenCount)
if openCount == localOpenCount (that is, nobody opened the checkpoint during the RetentionTime duration) delete the checkpoint's shared memory
- Every 30 (configurable) seconds: For all checkpoints in /dev/shm (their name follows a particular format):
This approach also has the advantage that no messaging is needed.