00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "threadpool/thread.h"
00029
00030 BEGIN_TERIMBER_NAMESPACE
00031 #pragma pack(4)
00032
00034
00035 #if OS_TYPE == OS_WIN32
00036 unsigned int __stdcall
00037 #else
00038 void*
00039 #endif
00040 thread::start_thread(void* data)
00041 {
00042
00043 thread* _this = (thread*)data;
00044
00045 _this->change_state(THREAD_RUNNING);
00046
00047 _this->_ev_start.signal();
00048
00049 while (_this->sleep())
00050 {
00051
00052 while (_this->execute());
00053 }
00054
00055
00056
00057 _this->_ev_end.signal();
00058
00059 return 0;
00060 }
00061
00062 thread::thread() :
00063 _job_task(0, 0, INFINITE, 0),
00064 _state(THREAD_CLOSE),
00065 _handle(0)
00066 {
00067 }
00068
00069 thread::~thread()
00070 {
00071 if (_handle)
00072 stop();
00073 }
00074
00075 thread_state
00076 thread::get_state() const
00077 {
00078
00079 mutex_keeper keeper(_mtx);
00080 return _state;
00081 }
00082
00083 bool
00084 thread::change_state(thread_state new_state)
00085 {
00086
00087 mutex_keeper keeper(_mtx);
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106 switch (_state)
00107 {
00108 case THREAD_CLOSE:
00109 return (new_state == THREAD_STARTING) ? (_state = new_state, true) : false;
00110 case THREAD_STARTING:
00111 return (new_state == THREAD_RUNNING
00112 || new_state == THREAD_CLOSE
00113 ) ? (_state = new_state, true) : false;
00114 case THREAD_SLEEPING:
00115 return (new_state == THREAD_RUNNING
00116 || new_state == THREAD_STOPPING
00117 ) ? (_state = new_state, true) : false;
00118 case THREAD_RUNNING:
00119 return (new_state == THREAD_SLEEPING
00120 || new_state == THREAD_STOPPING
00121 ) ? (_state = new_state, true) : false;
00122 case THREAD_STOPPING:
00123 return (new_state == THREAD_CLOSE) ? (_state = new_state, true) : false;
00124 default:
00125 assert(false);
00126 }
00127
00128 return false;
00129 }
00130
00131 bool
00132 thread::inside_thread() const
00133 {
00134
00135 #if OS_TYPE == OS_WIN32
00136 return ::GetCurrentThread() == _handle;
00137 #else
00138 return pthread_self() == _handle;
00139 #endif
00140 }
00141
00142 bool
00143 thread::start()
00144 {
00145
00146 if (inside_thread())
00147 return false;
00148
00149
00150
00151 if (!change_state(THREAD_STARTING))
00152 return false;
00153
00154 _ev_start.nonsignal();
00155 _ev_end.nonsignal();
00156
00157 #if OS_TYPE == OS_WIN32
00158
00159 unsigned int threadID = 0;
00160 if (!(_handle = (void*)_beginthreadex(0, 0, start_thread, this, 0, &threadID)))
00161 {
00162 change_state(THREAD_CLOSE);
00163 return false;
00164 }
00165
00166 #else
00167 if (pthread_create(&_handle, 0, start_thread, this))
00168 return false;
00169 #endif
00170
00171
00172 _ev_start.wait();
00173 return true;
00174 }
00175
00176 bool
00177 thread::assign_job(const job_task& job_task)
00178 {
00179
00180 if (inside_thread())
00181 return false;
00182
00183
00184 mutex_keeper keeper(_mtx_job);
00185 switch (_state)
00186 {
00187 case THREAD_SLEEPING:
00188 case THREAD_RUNNING:
00189 _job_task = job_task;
00190 break;
00191 default:
00192 return false;
00193 }
00194
00195 keeper.unlock();
00196
00197 _ev_wakeup.signal();
00198 return true;
00199 }
00200
00201 bool
00202 thread::cancel_job()
00203 {
00204
00205 if (inside_thread())
00206 return false;
00207
00208
00209 mutex_keeper keeper(_mtx_job);
00210 switch (_state)
00211 {
00212 case THREAD_SLEEPING:
00213 case THREAD_RUNNING:
00214 _job_task.clear();
00215 break;
00216 default:
00217 return false;
00218 }
00219
00220 keeper.unlock();
00221
00222
00223 _ev_wakeup.signal();
00224 return true;
00225 }
00226
00227 bool
00228 thread::stop()
00229 {
00230
00231 if (inside_thread())
00232 return false;
00233
00234
00235 if (!change_state(THREAD_STOPPING))
00236 return false;
00237
00238
00239 _ev_wakeup.signal();
00240
00241 _ev_end.wait();
00242 #if OS_TYPE == OS_WIN32
00243
00244 if (WAIT_TIMEOUT == WaitForSingleObject(_handle, 1000))
00245
00246 TerminateThread(_handle, 0);
00247
00248
00249 CloseHandle(_handle);
00250
00251 #else
00252 if (pthread_detach(_handle) != 0)
00253 pthread_cancel(_handle);
00254 #endif
00255
00256 _handle = 0;
00257
00258
00259 if (!change_state(THREAD_CLOSE))
00260 {
00261 assert(false);
00262 return false;
00263 }
00264
00265 return true;
00266 }
00267
00268 void
00269 thread::wakeup() const
00270 {
00271 _ev_wakeup.signal();
00272 }
00273
00274 bool
00275 thread::execute()
00276 {
00277 mutex_keeper keeper(_mtx_job);
00278 if (_state != THREAD_RUNNING
00279 || !_job_task._employer
00280 )
00281 return false;
00282
00283
00284 job_task task = _job_task;
00285
00286 keeper.unlock();
00287
00288
00289 try
00290 {
00291 if (!task._employer->v_has_job(task._ident, task._user_data))
00292 return false;
00293
00294 task._employer->v_do_job(task._ident, task._user_data);
00295 }
00296 catch (...)
00297 {
00298
00299
00300
00301 assert(false);
00302 }
00303
00304 return true;
00305 }
00306
00307 bool
00308 thread::sleep()
00309 {
00310
00311 if (!change_state(THREAD_SLEEPING))
00312 return false;
00313
00314
00315 mutex_keeper keeper(_mtx_job);
00316 size_t wait_timeout = _job_task._timeout;
00317 keeper.unlock();
00318
00319
00320 _ev_wakeup.wait(wait_timeout);
00321
00322
00323 return change_state(THREAD_RUNNING);
00324 }
00325
00327
00328 thread*
00329 thread_creator::create(const job_task& task)
00330 {
00331 thread* obj = new thread;
00332 if (obj)
00333 obj->start();
00334 return obj;
00335 }
00336
00337
00338 void
00339 thread_creator::activate(thread* obj, const job_task& task)
00340 {
00341
00342 if (obj->get_state() == THREAD_CLOSE)
00343 obj->start();
00344 obj->assign_job(task);
00345 }
00346
00347
00348 void
00349 thread_creator::back(thread* obj, const job_task&)
00350 {
00351 obj->cancel_job();
00352 }
00353
00354
00355 void
00356 thread_creator::destroy(thread* obj, const job_task& task)
00357 {
00358 deactivate(obj, task);
00359 delete obj;
00360 }
00361
00362
00363 void
00364 thread_creator::deactivate(thread* obj, const job_task& task)
00365 {
00366 back(obj, task);
00367 obj->stop();
00368 }
00369
00370
00371 #pragma pack()
00372 END_TERIMBER_NAMESPACE