Differences between revisions 2 and 17 (spanning 15 versions)
Revision 2 as of 2014-06-03 06:31:25
Size: 5891
Editor: HungTa
Comment:
Revision 17 as of 2014-06-04 18:42:59
Size: 3361
Editor: AndrewStone
Comment:
Deletions are marked like this. Additions are marked like this.
Line 5: Line 5:
Instead of starting a new thread for every task to execute concurrently, the task can be passed to a thread pool. As soon as the pool has any idle threads the task is assigned to one of them and executed. Internally, the thread pool handle has to be created first, then the tasks are inserted into this handle and executed. Instead of starting a new thread for every task to execute concurrently, the task can be passed to a thread pool. As soon as the pool has any idle threads the task is assigned to one of them and executed.
Line 11: Line 11:
The Name Service is a client library that is linked with every component that uses it.  It opens a cluster-wide, shared memory, non-persistent checkpoint using the name service's well-known [[Handle]]. The Thread Pool is a client library that is linked with every component that uses it.
Line 13: Line 13:
The name service uses this checkpoint to resolve string names into objects. The thread pool contains APIs to add a "Poolable" or "Wakeable" object into an internal job list. When a thread in the pool frees up, it grabs a "Poolable" or "Wakeable" object off the job list and executes it within the thread.

To preserve resources, if there are many idle threads they will slowly exit down to a configured minimum number. But if the job list exceeds a configured threshold, a new thread will be created up to a configured maximum.
Line 16: Line 18:
#include <clNameApi.hxx> #include <clThreadPool.hxx>
Line 20: Line 22:
class NameException: public std::exception
namespace SAFplus
Line 22: Line 25:
  ...
};
  // Definition of user task
  typedef void (*CallbackT) (void* invocation);
  typedef uint32_t (*UserCallbackT) (void* invocation);
Line 25: Line 29:
class NameRegistrar   class Poolable: public Wakeable
Line 27: Line 31:
  public:
    struct timespec m_startTime;
    struct timespec m_endTime;
    uint32_t m_executionTimeLimit;
    bool deleteWhenComplete; /** Flag that indicates this object should be deleted when it is finished running */

    Poolable(uint_t timeLimit=30000); /** by default allow a 30 second execution time */
    virtual void wake(int amt,void* cookie=NULL); /** Implement this virtual function to do the job */
    virtual ~Poolable();
  };

  class ThreadPool
    {
  protected:
    void createTask();
    void startNewTask();
    void taskEntry();

  protected:
    short m_minThread;
    short m_maxThread;
    short m_numIdleTasks;
    short m_flags;
    Mutex m_mutex;
    Wakeable preIdleFn;
    void* m_preIdleCookie;
    Wakeable onDeckFn;
    void* m_onDeckCookie;
    ThreadCondition m_cond;
    uint32_t m_pendingJobs;
Line 28: Line 63:
    ThreadPool(uint_t minThreads, uint_t maxThreads); /* Initialize pool */
Line 29: Line 65:
    typedef enum {
         MODE_REDUNDANCY,
         MODE_ROUND_ROBIN,
         MODE_PREFER_LOCAL/* return a handle that points to this process (if it exists)
                             if it does not exist return a handle that points to this NODE.
                             if that does not exist, return any handle
                             So choose "closer" handles over ones that are far away
                             this mode makes communications more efficient
                          */
         MODE_NO_CHANGE, /* This is not a mode; it tells the API to not change the existing mode (or use MODE_REDUNDANCY if creating) */
    /** Starts the thread pool running */
    void start();
    /** Stops the thread pool. All currently running threads complete their job and then exit */
    void stop();
    
    /** Adds a job into the thread pool. Poolable is NOT COPIED; do not delete or let it fall out of scope.
        A Poolable object is more sophisticated than a Wakeable; it measures how long it was run for, can track
        whether it should be deleted, and will ASSERT if it is running for longer than a configured limit (deadlock detector)
     */
    void run(Poolable* p,void* arg);
Line 40: Line 76:
         } MappingMode;
    /* For more convenience and logical, should move mapping mode setting to set functions
       For more clearance on this parameter, see the comments of the functions
    */
    //void setMode(const char* name, MappingMode m);
     
    /** Adds a job into the thread pool. Wakeable is NOT COPIED; do not delete or let it fall out of scope */
    void run(Wakeable* p,void* arg);
Line 47: Line 79:
    /* Associate a name with a handle and pointer and associate a handle with a pointer.
       If the name does not exist, it is created. If it exists, it is overwritten.
       If the handle format contains a node or process designator, then this mapping will be removed when the node/process fails.

       the void* object pointer is local to this process; it does not need to be part of the checkpoint.
       
       This association is valid for all SAFplus API name lookups, and for AMF entity names.
       The mapping mode parameter is mandatory and specifies what the mapping mode is, based on it, the get functions retrieves appropriate handles
     */
    void set(const char* name, SAFplus::Handle handle, MappingMode m, void* object=NULL);
    void set(const std::string& name, SAFplus::Handle handle, MappingMode m, void* object=NULL);
    

     /* Associate a name with a handle and pointer and associate a handle with a pointer (if object != NULL).
        If the name does not exist, it is created. If the name exists, this mapping is appended (the original mapping is not removed).
        If there are multiple associations, the first association will always be returned.
        If the handle format contains a node or process designator, then this mapping will be removed when the node/process fails.
        If the name has more than one mapping another mapping will become the default response for this name.

        This association is valid for all SAFplus API name lookups, and for AMF entity names.
        The mapping mode parameter is mandatory and specifies what the mapping mode is, based on it, the get functions look up appropriate handle: if it's different from MappingMode::MODE_NO_CHANGE, the mapping mode that was set for this name before will be replaced with it, otherwise (omitted), the mapping mode that was set for this name before will be kept as it was (no change)
      */
    void append(const char* name, SAFplus::Handle handle, MappingMode m, void* object=NULL);
    void append(const std::string& name, SAFplus::Handle handle, MappingMode m, void* object=NULL);
    // Associate name with arbitrary data. A copy of the data is made.
    void set(const char* name, const void* data, int length);
    void set(const std::string& name, const void* data, int length);

    // Associate name with arbitrary data. A copy of the data is NOT made; this call transfers the reference count (ownership) to the callee.
    void set(const char* name, SAFplus::Buffer*);
    void set(const std::string& name, SAFplus::Buffer*);

    // Get a handle associated with the data
    // The SAFplus APIs use these calls to resolve names to handles or objects.
    std::pair<SAFplus::Handle&,void* object> get(const char* name) throws(NameException&);
    std::pair<SAFplus::Handle&,void* object> get(const std::string& name) throws(NameException&);

    SAFplus::Handle& get(const char* name) throws(NameException&);
    SAFplus::Handle& get(const std::string& name) throws(NameException&);
 
    // Get object based on handle
    void* get(const SAFplus::Handle&) throws(NameException&);

    
    // Get a handle associated with the data
    // The SAFplus APIs use these calls to resolve names to handles or objects.
    // Do not free the returned buffer, call Buffer.decRef();
    SAFplus::Buffer& get(const char* name) throws(NameException&);
    SAFplus::Buffer& get(const std::string& name) throws(NameException&);
    ~ThreadPool(); /* Finalize pool */
Line 99: Line 82:

// Name is a singleton class; only one per process.
extern NameRegistrar name;
}

The Thread Pool

Thread Pools are useful when you need to limit the number of threads running in your application at the same time. There is a performance overhead associated with starting a new thread, and each thread is also allocated some memory for its stack etc.

Instead of starting a new thread for every task to execute concurrently, the task can be passed to a thread pool. As soon as the pool has any idle threads the task is assigned to one of them and executed.

Thread pools are often used in multi threaded servers. Each connection arriving at the server via the network is wrapped as a task and passed on to a thread pool. The threads in the thread pool will process the requests on the connections concurrently.

Implementation

The Thread Pool is a client library that is linked with every component that uses it.

The thread pool contains APIs to add a "Poolable" or "Wakeable" object into an internal job list. When a thread in the pool frees up, it grabs a "Poolable" or "Wakeable" object off the job list and executes it within the thread.

To preserve resources, if there are many idle threads they will slowly exit down to a configured minimum number. But if the job list exceeds a configured threshold, a new thread will be created up to a configured maximum.

#include <clThreadPool.hxx>

APIs

   1 namespace SAFplus
   2 {
   3   // Definition of user task
   4   typedef void (*CallbackT) (void* invocation);
   5   typedef uint32_t (*UserCallbackT) (void* invocation);
   6 
   7   class Poolable: public Wakeable
   8   {
   9   public:    
  10     struct timespec m_startTime;
  11     struct timespec m_endTime;
  12     uint32_t m_executionTimeLimit;
  13     bool deleteWhenComplete;  /** Flag that indicates this object should be deleted when it is finished running */
  14 
  15     Poolable(uint_t timeLimit=30000);  /** by default allow a 30 second execution time */
  16     virtual void wake(int amt,void* cookie=NULL);  /** Implement this virtual function to do the job */
  17     virtual ~Poolable();
  18   };
  19 
  20   class ThreadPool
  21     {
  22   protected:
  23     void createTask();
  24     void startNewTask();
  25     void taskEntry();
  26 
  27   protected:
  28     short m_minThread;
  29     short m_maxThread;
  30     short m_numIdleTasks;
  31     short m_flags;
  32     Mutex m_mutex;
  33     Wakeable preIdleFn;
  34     void* m_preIdleCookie;
  35     Wakeable onDeckFn;
  36     void* m_onDeckCookie;
  37     ThreadCondition m_cond;    
  38     uint32_t m_pendingJobs;
  39 
  40   public:
  41     ThreadPool(uint_t minThreads, uint_t maxThreads); /* Initialize pool */
  42 
  43     /** Starts the thread pool running */
  44     void start();
  45     /** Stops the thread pool.  All currently running threads complete their job and then exit */
  46     void stop();
  47     
  48     /** Adds a job into the thread pool.  Poolable is NOT COPIED; do not delete or let it fall out of scope. 
  49         A Poolable object is more sophisticated than a Wakeable; it measures how long it was run for, can track
  50         whether it should be deleted, and will ASSERT if it is running for longer than a configured limit (deadlock detector)
  51      */
  52     void run(Poolable* p,void* arg);
  53 
  54     /** Adds a job into the thread pool.  Wakeable is NOT COPIED; do not delete or let it fall out of scope */
  55     void run(Wakeable* p,void* arg);
  56 
  57     ~ThreadPool(); /* Finalize pool */
  58     
  59   };
  60 }

SAFplus: ThreadPool (last edited 2014-06-04 18:42:59 by AndrewStone)