Ion
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
workerpool.cc
Go to the documentation of this file.
1 
18 #include "ion/base/workerpool.h"
19 
20 #include "ion/base/lockguards.h"
21 
22 namespace ion {
23 namespace base {
24 
26  : worker_(CHECK_NOTNULL(worker)),
27  threads_(GetNonNullAllocator()),
28  suspended_(true),
29  killing_(false),
30  slow_path_(false),
31  spawn_func_([this](){ this->ThreadEntryPoint(); return true; }) {}
32 
34  ion::base::LockGuard lock(&mutex_);
35  KillAllThreads();
36 }
37 
38 void WorkerPool::ResizeThreadPool(size_t thread_count) {
39  ion::base::LockGuard lock(&mutex_);
40 
42  if (thread_count < threads_.size()) {
46  KillAllThreads();
47  }
48 
50  while (thread_count > threads_.size()) {
51  threads_.insert(port::SpawnThreadStd(&spawn_func_));
52  if (!suspended_)
53  Post(&active_threads_sema_);
54  }
55 }
56 
57 void WorkerPool::Suspend() {
58  ion::base::LockGuard lock(&mutex_);
59  if (!suspended_) {
60  suspended_ = true;
63  slow_path_ = true;
64  for (size_t i = 0; i < threads_.size(); ++i) {
65  Wait(&active_threads_sema_);
66  }
67  slow_path_ = false;
68  }
69 }
70 
71 void WorkerPool::Resume() {
72  ion::base::LockGuard lock(&mutex_);
73  if (suspended_) {
74  suspended_ = false;
76  for (size_t i = 0; i < threads_.size(); ++i) {
77  Post(&active_threads_sema_);
78  }
79  }
80 }
81 
82 bool WorkerPool::IsSuspended() const {
83  ion::base::LockGuard lock(&mutex_);
84  return suspended_;
85 }
86 
90 
91  while (true) {
92  while (slow_path_) {
95  if (killing_) {
98  return;
99  } else {
103  }
104  }
105 
108  Wait(&work_sema_);
109  Wait(&active_threads_sema_);
110  worker_->DoWork();
111  Post(&active_threads_sema_);
112  }
113 }
114 
115 void WorkerPool::KillAllThreads() {
116  DCHECK(mutex_.IsLocked());
117 
120  killing_ = true;
121  slow_path_ = true;
122  for (size_t i = 0; i < threads_.size(); ++i) {
124  Post(&work_sema_);
125  Post(&active_threads_sema_);
126  }
127 
129  for (const auto& thread_id : threads_) {
130  bool join_succeeded = port::JoinThread(thread_id);
131  DCHECK(join_succeeded);
132  }
133  threads_.clear();
134  slow_path_ = false;
135  killing_ = false;
136 
139  while (active_threads_sema_.TryWait()) {}
140 }
141 
142 void WorkerPool::Wait(port::Semaphore* sema) {
143  bool semaphore_wait_succeeded = sema->Wait();
144  DCHECK(semaphore_wait_succeeded);
145 }
146 
147 void WorkerPool::Post(port::Semaphore* sema) {
148  bool semaphore_post_succeeded = sema->Post();
149  DCHECK(semaphore_post_succeeded);
150 }
151 
152 } // namespace base
153 } // namespace ion
#define DCHECK(expr)
Definition: logging.h:331
void YieldThread()
Causes the calling thread to relinquish the CPU if there are other threads waiting to execute...
Definition: threadutils.cc:359
Interface to enable pluggable worker behavior.
Definition: workerpool.h:51
bool IsLocked()
Returns whether the Mutex is currently locked. Does not block.
Definition: mutex.cc:83
bool JoinThread(ThreadId id)
Windows-specific public functions.
Definition: threadutils.cc:322
A LockGuard locks a mutex when created, and unlocks it when destroyed.
Definition: lockguards.h:90
ThreadId SpawnThreadStd(const ThreadStdFunc *func)
Definition: threadutils.cc:200
void ResizeThreadPool(size_t thread_count)
Changes the number of theads in the pool.
#define CHECK_NOTNULL(val)
Check that the input is not NULL.
Definition: logging.h:382
void Suspend()
Suspends all threads until Resume() is called.
void Resume()
Resumes all threads.
WorkerPool(Worker *worker)
Constructor/destructor.
Definition: workerpool.cc:25
bool IsThreadNamingSupported()
Thread naming functions.
Definition: threadutils.cc:213
bool IsSuspended() const
Return true if pool's threads are suspended.
bool SetThreadName(const std::string &name)
Sets the name of the current thread.
Definition: threadutils.cc:340
bool TryWait()
Does not block.
Definition: semaphore.cc:107
virtual void DoWork()=0
Called repeatedly in worker thread loop, whenever GetWorkSemaphore() is signaled to indicate that the...
const std::string & GetName() const
Gets a descriptive name for the pool from the worker.
Definition: workerpool.h:79
virtual void ThreadEntryPoint()
ThreadEntryPoint() is run on each created thread.