Differences between revisions 2 and 16 (spanning 14 versions)
Revision 2 as of 2014-06-03 06:31:25
Size: 5891
Editor: HungTa
Comment:
Revision 16 as of 2014-06-04 07:45:22
Size: 2137
Editor: HungTa
Comment:
Deletions are marked like this. Additions are marked like this.
Line 5: Line 5:
Instead of starting a new thread for every task to execute concurrently, the task can be passed to a thread pool. As soon as the pool has any idle threads the task is assigned to one of them and executed. Internally, the thread pool handle has to be created first, then the tasks are inserted into this handle and executed. Instead of starting a new thread for every task to execute concurrently, the task can be passed to a thread pool. As soon as the pool has any idle threads the task is assigned to one of them and executed.
Line 11: Line 11:
The Name Service is a client library that is linked with every component that uses it. It opens a cluster-wide, shared memory, non-persistent checkpoint using the name service's well-known [[Handle]].

The name service uses this checkpoint to resolve string names into objects.
The Thread Pool is a client library that is linked with every component that uses it.
Line 16: Line 14:
#include <clNameApi.hxx> #include <clThreadPool.hxx>
Line 20: Line 18:
class NameException: public std::exception
namespace SAFplus
Line 22: Line 21:
  ...
};
  // Definition of user task
  typedef void (*CallbackT) (void* invocation);
  typedef uint32_t (*UserCallbackT) (void* invocation);
Line 25: Line 25:
class NameRegistrar   class Callable
Line 28: Line 28:

    typedef enum {
         MODE_REDUNDANCY,
         MODE_ROUND_ROBIN,
         MODE_PREFER_LOCAL/* return a handle that points to this process (if it exists)
                             if it does not exist return a handle that points to this NODE.
                             if that does not exist, return any handle
                             So choose "closer" handles over ones that are far away
                             this mode makes communications more efficient
                          */
         MODE_NO_CHANGE, /* This is not a mode; it tells the API to not change the existing mode (or use MODE_REDUNDANCY if creating) */

         } MappingMode;
    /* For more convenience and logical, should move mapping mode setting to set functions
       For more clearance on this parameter, see the comments of the functions
    */
    //void setMode(const char* name, MappingMode m);
     

    /* Associate a name with a handle and pointer and associate a handle with a pointer.
       If the name does not exist, it is created. If it exists, it is overwritten.
       If the handle format contains a node or process designator, then this mapping will be removed when the node/process fails.

       the void* object pointer is local to this process; it does not need to be part of the checkpoint.
       
       This association is valid for all SAFplus API name lookups, and for AMF entity names.
       The mapping mode parameter is mandatory and specifies what the mapping mode is, based on it, the get functions retrieves appropriate handles
     */
    void set(const char* name, SAFplus::Handle handle, MappingMode m, void* object=NULL);
    void set(const std::string& name, SAFplus::Handle handle, MappingMode m, void* object=NULL);
    CallbackT m_fn;
    void* m_cookie;
Line 59: Line 31:

     /* Associate a name with a handle and pointer and associate a handle with a pointer (if object != NULL).
        If the name does not exist, it is created. If the name exists, this mapping is appended (the original mapping is not removed).
        If there are multiple associations, the first association will always be returned.
        If the handle format contains a node or process designator, then this mapping will be removed when the node/process fails.
        If the name has more than one mapping another mapping will become the default response for this name.

        This association is valid for all SAFplus API name lookups, and for AMF entity names.
        The mapping mode parameter is mandatory and specifies what the mapping mode is, based on it, the get functions look up appropriate handle: if it's different from MappingMode::MODE_NO_CHANGE, the mapping mode that was set for this name before will be replaced with it, otherwise (omitted), the mapping mode that was set for this name before will be kept as it was (no change)
      */
    void append(const char* name, SAFplus::Handle handle, MappingMode m, void* object=NULL);
    void append(const std::string& name, SAFplus::Handle handle, MappingMode m, void* object=NULL);
    // Associate name with arbitrary data. A copy of the data is made.
    void set(const char* name, const void* data, int length);
    void set(const std::string& name, const void* data, int length);

    // Associate name with arbitrary data. A copy of the data is NOT made; this call transfers the reference count (ownership) to the callee.
    void set(const char* name, SAFplus::Buffer*);
    void set(const std::string& name, SAFplus::Buffer*);

    // Get a handle associated with the data
    // The SAFplus APIs use these calls to resolve names to handles or objects.
    std::pair<SAFplus::Handle&,void* object> get(const char* name) throws(NameException&);
    std::pair<SAFplus::Handle&,void* object> get(const std::string& name) throws(NameException&);

    SAFplus::Handle& get(const char* name) throws(NameException&);
    SAFplus::Handle& get(const std::string& name) throws(NameException&);
 
    // Get object based on handle
    void* get(const SAFplus::Handle&) throws(NameException&);

    
    // Get a handle associated with the data
    // The SAFplus APIs use these calls to resolve names to handles or objects.
    // Do not free the returned buffer, call Buffer.decRef();
    SAFplus::Buffer& get(const char* name) throws(NameException&);
    SAFplus::Buffer& get(const std::string& name) throws(NameException&);
    Callable(CallbackT fn, void* cookie);
    void run();
Line 100: Line 36:
// Name is a singleton class; only one per process.
extern NameRegistrar name;
  class Poolable: public Wakeable
  {
  public:
    struct timespec m_startTime;
    struct timespec m_endTime;
    uint32_t m_executionTimeLimit;
    Callable* m_clb;

    Poolable(Callable* clb);
    virtual void wake(int amt,void* cookie=NULL);
  };

  class ThreadPool: public Poolable
  {
  protected:
    void createTask();
    void startNewTask();
    void taskEntry();

  public:
    short m_minThread;
    short m_maxThread;
    short m_numIdleTasks;
    short m_flags;
    Mutex m_mutex;
    UserCallbackT m_preIdleFn;
    void* m_preIdleCookie;
    UserCallbackT m_onDeckFn;
    void* m_onDeckCookie;
    ThreadCondition m_cond;
    uint32_t m_pendingJobs;

    ThreadPool(Callable* clb); /* Initialize pool; call createTask() */
    void run(); /* run task */
    
    ~ThreadPool(); /* Finalize pool */
    
  };
}

The Thread Pool

Thread Pools are useful when you need to limit the number of threads running in your application at the same time. There is a performance overhead associated with starting a new thread, and each thread is also allocated some memory for its stack etc.

Instead of starting a new thread for every task to execute concurrently, the task can be passed to a thread pool. As soon as the pool has any idle threads the task is assigned to one of them and executed.

Thread pools are often used in multi threaded servers. Each connection arriving at the server via the network is wrapped as a task and passed on to a thread pool. The threads in the thread pool will process the requests on the connections concurrently.

Implementation

The Thread Pool is a client library that is linked with every component that uses it.

#include <clThreadPool.hxx>

APIs

   1 namespace SAFplus
   2 {
   3   // Definition of user task
   4   typedef void (*CallbackT) (void* invocation);
   5   typedef uint32_t (*UserCallbackT) (void* invocation);
   6 
   7   class Callable
   8   {
   9   public:
  10     CallbackT m_fn;
  11     void* m_cookie;
  12     
  13     Callable(CallbackT fn, void* cookie);
  14     void run();
  15     
  16   };
  17 
  18   class Poolable: public Wakeable
  19   {
  20   public:    
  21     struct timespec m_startTime;
  22     struct timespec m_endTime;
  23     uint32_t m_executionTimeLimit;    
  24     Callable* m_clb;
  25 
  26     Poolable(Callable* clb);
  27     virtual void wake(int amt,void* cookie=NULL);
  28   };
  29 
  30   class ThreadPool: public Poolable
  31   {
  32   protected:
  33     void createTask();
  34     void startNewTask();
  35     void taskEntry();
  36 
  37   public:
  38     short m_minThread;
  39     short m_maxThread;
  40     short m_numIdleTasks;
  41     short m_flags;
  42     Mutex m_mutex;
  43     UserCallbackT m_preIdleFn;
  44     void* m_preIdleCookie;
  45     UserCallbackT m_onDeckFn;
  46     void* m_onDeckCookie;
  47     ThreadCondition m_cond;    
  48     uint32_t m_pendingJobs;
  49 
  50     ThreadPool(Callable* clb); /* Initialize pool; call createTask() */
  51     void run();   /* run task */
  52     
  53     ~ThreadPool(); /* Finalize pool */
  54     
  55   };
  56 }

SAFplus: ThreadPool (last edited 2014-06-04 18:42:59 by AndrewStone)