Differences between revisions 12 and 17 (spanning 5 versions)
Revision 12 as of 2014-06-04 06:38:44
Size: 1999
Editor: HungTa
Comment:
Revision 17 as of 2014-06-04 18:42:59
Size: 3361
Editor: AndrewStone
Comment:
Deletions are marked like this. Additions are marked like this.
Line 13: Line 13:
The thread pool contains APIs to add a "Poolable" or "Wakeable" object into an internal job list. When a thread in the pool frees up, it grabs a "Poolable" or "Wakeable" object off the job list and executes it within the thread.

To preserve resources, if there are many idle threads they will slowly exit down to a configured minimum number. But if the job list exceeds a configured threshold, a new thread will be created up to a configured maximum.
Line 21: Line 25:
// Definition of user task
typedef uint32_t (*CallbackT) (void* invocation);
  // Definition of user task
  typedef void (*CallbackT) (void* invocation);
  typedef uint32_t (*UserCallbackT) (void* invocation);
Line 24: Line 29:
class Callable
  {
  public:
    CallbackT m_fn;
    void* m_cookie;
    
    Callable(CallbackT fn, void* cookie);
    void run();
    
  };

class Poolable: public Wakeable
  class Poolable: public Wakeable
Line 41: Line 35:
    Callable* m_preIdleFn;
    Callable* m_onDeckFn;
    bool deleteWhenComplete; /** Flag that indicates this object should be deleted when it is finished running */
Line 44: Line 37:
    Poolable(Callable* preIdlFn, Callable* onDeckFn);
    virtual void wake(int amt,void* cookie=NULL);
    Poolable(uint_t timeLimit=30000); /** by default allow a 30 second execution time */
    virtual void wake(int amt,void* cookie=NULL); /** Implement this virtual function to do the job */
    virtual ~Poolable();
Line 48: Line 42:
class ThreadPool: public Poolable
  {
  class ThreadPool
    {
Line 55: Line 49:
  public:   protected:
Line 61: Line 55:
    Wakeable preIdleFn;
    void* m_preIdleCookie;
    Wakeable onDeckFn;
    void* m_onDeckCookie;
Line 64: Line 62:
    ThreadPool(); /* Initialize pool; call createTask() */
    void run(); /* run task */
  public:
    ThreadPool(uint_t minThreads, uint_t maxThreads); /* Initialize pool */

    /** Starts the thread pool running */
    void start();
    /** Stops the thread pool. All currently running threads complete their job and then exit */
    void stop();
Line 67: Line 70:
    /** Adds a job into the thread pool. Poolable is NOT COPIED; do not delete or let it fall out of scope.
        A Poolable object is more sophisticated than a Wakeable; it measures how long it was run for, can track
        whether it should be deleted, and will ASSERT if it is running for longer than a configured limit (deadlock detector)
     */
    void run(Poolable* p,void* arg);

    /** Adds a job into the thread pool. Wakeable is NOT COPIED; do not delete or let it fall out of scope */
    void run(Wakeable* p,void* arg);
Line 70: Line 82:

The Thread Pool

Thread Pools are useful when you need to limit the number of threads running in your application at the same time. There is a performance overhead associated with starting a new thread, and each thread is also allocated some memory for its stack etc.

Instead of starting a new thread for every task to execute concurrently, the task can be passed to a thread pool. As soon as the pool has any idle threads the task is assigned to one of them and executed.

Thread pools are often used in multi threaded servers. Each connection arriving at the server via the network is wrapped as a task and passed on to a thread pool. The threads in the thread pool will process the requests on the connections concurrently.

Implementation

The Thread Pool is a client library that is linked with every component that uses it.

The thread pool contains APIs to add a "Poolable" or "Wakeable" object into an internal job list. When a thread in the pool frees up, it grabs a "Poolable" or "Wakeable" object off the job list and executes it within the thread.

To preserve resources, if there are many idle threads they will slowly exit down to a configured minimum number. But if the job list exceeds a configured threshold, a new thread will be created up to a configured maximum.

#include <clThreadPool.hxx>

APIs

   1 namespace SAFplus
   2 {
   3   // Definition of user task
   4   typedef void (*CallbackT) (void* invocation);
   5   typedef uint32_t (*UserCallbackT) (void* invocation);
   6 
   7   class Poolable: public Wakeable
   8   {
   9   public:    
  10     struct timespec m_startTime;
  11     struct timespec m_endTime;
  12     uint32_t m_executionTimeLimit;
  13     bool deleteWhenComplete;  /** Flag that indicates this object should be deleted when it is finished running */
  14 
  15     Poolable(uint_t timeLimit=30000);  /** by default allow a 30 second execution time */
  16     virtual void wake(int amt,void* cookie=NULL);  /** Implement this virtual function to do the job */
  17     virtual ~Poolable();
  18   };
  19 
  20   class ThreadPool
  21     {
  22   protected:
  23     void createTask();
  24     void startNewTask();
  25     void taskEntry();
  26 
  27   protected:
  28     short m_minThread;
  29     short m_maxThread;
  30     short m_numIdleTasks;
  31     short m_flags;
  32     Mutex m_mutex;
  33     Wakeable preIdleFn;
  34     void* m_preIdleCookie;
  35     Wakeable onDeckFn;
  36     void* m_onDeckCookie;
  37     ThreadCondition m_cond;    
  38     uint32_t m_pendingJobs;
  39 
  40   public:
  41     ThreadPool(uint_t minThreads, uint_t maxThreads); /* Initialize pool */
  42 
  43     /** Starts the thread pool running */
  44     void start();
  45     /** Stops the thread pool.  All currently running threads complete their job and then exit */
  46     void stop();
  47     
  48     /** Adds a job into the thread pool.  Poolable is NOT COPIED; do not delete or let it fall out of scope. 
  49         A Poolable object is more sophisticated than a Wakeable; it measures how long it was run for, can track
  50         whether it should be deleted, and will ASSERT if it is running for longer than a configured limit (deadlock detector)
  51      */
  52     void run(Poolable* p,void* arg);
  53 
  54     /** Adds a job into the thread pool.  Wakeable is NOT COPIED; do not delete or let it fall out of scope */
  55     void run(Wakeable* p,void* arg);
  56 
  57     ~ThreadPool(); /* Finalize pool */
  58     
  59   };
  60 }

SAFplus: ThreadPool (last edited 2014-06-04 18:42:59 by AndrewStone)