SAFplus

The Thread Pool

Thread Pools are useful when you need to limit the number of threads running in your application at the same time. There is a performance overhead associated with starting a new thread, and each thread is also allocated some memory for its stack etc.

Instead of starting a new thread for every task to execute concurrently, the task can be passed to a thread pool. As soon as the pool has any idle threads the task is assigned to one of them and executed.

Thread pools are often used in multi threaded servers. Each connection arriving at the server via the network is wrapped as a task and passed on to a thread pool. The threads in the thread pool will process the requests on the connections concurrently.

Implementation

The Thread Pool is a client library that is linked with every component that uses it.

The thread pool contains APIs to add a "Poolable" or "Wakeable" object into an internal job list. When a thread in the pool frees up, it grabs a "Poolable" or "Wakeable" object off the job list and executes it within the thread.

To preserve resources, if there are many idle threads they will slowly exit down to a configured minimum number. But if the job list exceeds a configured threshold, a new thread will be created up to a configured maximum.

#include <clThreadPool.hxx>

APIs

   1 namespace SAFplus
   2 {
   3   // Definition of user task
   4   typedef void (*CallbackT) (void* invocation);
   5   typedef uint32_t (*UserCallbackT) (void* invocation);
   6 
   7   class Poolable: public Wakeable
   8   {
   9   public:    
  10     struct timespec m_startTime;
  11     struct timespec m_endTime;
  12     uint32_t m_executionTimeLimit;
  13     bool deleteWhenComplete;  /** Flag that indicates this object should be deleted when it is finished running */
  14 
  15     Poolable(uint_t timeLimit=30000);  /** by default allow a 30 second execution time */
  16     virtual void wake(int amt,void* cookie=NULL);  /** Implement this virtual function to do the job */
  17     virtual ~Poolable();
  18   };
  19 
  20   class ThreadPool
  21     {
  22   protected:
  23     void createTask();
  24     void startNewTask();
  25     void taskEntry();
  26 
  27   protected:
  28     short m_minThread;
  29     short m_maxThread;
  30     short m_numIdleTasks;
  31     short m_flags;
  32     Mutex m_mutex;
  33     Wakeable preIdleFn;
  34     void* m_preIdleCookie;
  35     Wakeable onDeckFn;
  36     void* m_onDeckCookie;
  37     ThreadCondition m_cond;    
  38     uint32_t m_pendingJobs;
  39 
  40   public:
  41     ThreadPool(uint_t minThreads, uint_t maxThreads); /* Initialize pool */
  42 
  43     /** Starts the thread pool running */
  44     void start();
  45     /** Stops the thread pool.  All currently running threads complete their job and then exit */
  46     void stop();
  47     
  48     /** Adds a job into the thread pool.  Poolable is NOT COPIED; do not delete or let it fall out of scope. 
  49         A Poolable object is more sophisticated than a Wakeable; it measures how long it was run for, can track
  50         whether it should be deleted, and will ASSERT if it is running for longer than a configured limit (deadlock detector)
  51      */
  52     void run(Poolable* p,void* arg);
  53 
  54     /** Adds a job into the thread pool.  Wakeable is NOT COPIED; do not delete or let it fall out of scope */
  55     void run(Wakeable* p,void* arg);
  56 
  57     ~ThreadPool(); /* Finalize pool */
  58     
  59   };
  60 }

SAFplus: ThreadPool (last edited 2014-06-04 18:42:59 by AndrewStone)