Message Service
Messaging constitutes the fundamental transport of messages and infrastructure and classes designed to make certain message paradigms simple.
There are 3 underlying transports to the SAFplus7 messaging layer.
- IOC messaging C API
- This is a thin layer on top of IOC messaging itself and can have multiple transports, TIPC and UDP are currently supported. Applications can create their own message server listening to any IOC port.
- SAFplus messaging.
- All SAFplus components and user applications need an IOC message server to carry all SAFplus library communications. The IOC port involved is well-known for SAFplus services and is either well-known or dynamically assigned for user applications. Since this single port is handling multiple protocols (each SAFplus library speaks with its own protocol) messages are contained within a larger protocol that identifies the contained protocol. It is possible for user applications to register their own sub-protocol and receive notifications when messages arrive. In this manner, applications can take advantage of much of the SAFplus message infrastructure. (In SAFplus 6.1, this capability is called the "EO" -- in SAFplus7, the 6.1 "EO" will not be used).
- SA-Forum Message Queues
- See the Service Availability Forum documentation
Synchronous and Asynchronous
Message receipt can occur either synchronously or asynchronously using the "Wakeable" feature of the SAFplus thread semaphore system.
Use Cases
- Simple message server
- Call a Wakeable.wake() whenever a message is received. This Wakeable can be a callback or a message queue
- SAFplus message server
- Register sub-protocols by passing a well-known ID (256 bytes). This ID is examined in every received message and the appropriate Wakeable is called (from an array of them).
- Pooled SAFplus or Simple message server
- Add a thread pool to the above and the server does the call-backs in multiple threads.
- Message client with multithreaded sync/async send/reply
- Client is capable of issuing multiple "sends" and then waiting for the multiple replies. For a particular reply, it figures out which send it pairs to and "wake"s that entity.
1 // Multi-threaded synchronous
2
3 for (int i=0;i<maxNodes;i++) // Sending the same query to every node
4 {
5 threadCreate(doit,i);
6 }
7
8 void doit(int node)
9 {
10 msg* buffer = msgClient.getBuffer();
11 buffer = <fill it up>;
12 msg* packet = msgClient.sendReply(node_index_to_address(i),buffer,buffer_ownership_transferred); // No wakeable passed so synchronous
13 <handle the reply>
14 }
15
16
17 // Simultaneous Synchronous
18 queue replyQueue; // replyQueue is a Wakeable that implements wake to put the reply (the passed cookie) onto the queue.
19 msg* buffer = msgClient.getBuffer();
20 buffer = <fill it up>;
21 for (int i=0;i<maxNodes;i++) // Sending the same query to every node
22 {
23 msgClient.sendReply(node_index_to_address(i),buffer,no_ownership_transfer, replyQueue);
24 }
25 msgClient.release(buffer);
26
27 while(!replyQueue.empty())
28 {
29 msg* reply = replyQueue.pop();
30 }
31
32 //async
33 MsgHandler handleReply; // handleReply is a Wakeable that handles the message inline.
34 msg* buffer = msgClient.getBuffer();
35 buffer = <fill it up>;
36 for (int i=0;i<maxNodes;i++) // Sending the same query to every node
37 {
38 msgClient.sendReply(node_index_to_address(i),buffer,no_ownership_transfer, handleReply);
39 }
40 msgClient.release(buffer);
For performance, it is important to use as few buffer copy operations as possible. This is why the API specifies whether the buffer's "ownership" is passed to the messaging layer or retained. It ownership is passed, the buffer can be used directly in the IOC layer. But I don't like this implementation, is there a cleaner way to communicate this information?
Classes and Objects
MsgServerI
This is and abstract class defining the interface of a message server. It contains a function to get the address of the server,
- /** Send a message
- @param msgtype The destination message handler @param destination Address of the destination node/process @param buffer Your data @param length Your data length Raises the "Error" Exception if something goes wrong, or if the destination queue does not exist.
- /
void SendMsg(ClIocAddressT destination, void* buffer, ClWordT length,ClWordT msgtype=0);
/** Start the server */ void Start(); /** Stop message processing right away
- Messages waiting in the queue are not dropped
- /
- This function stops accepting new messages right away, but does not return until all enqueued messages have been processed, and all processing threads are stopped.
- / void Quiesce();
void RecvMsg
