00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 #include "threadpool/thread.h"
00029 
00030 BEGIN_TERIMBER_NAMESPACE
00031 #pragma pack(4)
00032 
00034 
00035 #if OS_TYPE == OS_WIN32
00036 unsigned int __stdcall 
00037 #else
00038 void* 
00039 #endif
00040 thread::start_thread(void* data)
00041 {
00042         
00043         thread* _this = (thread*)data;
00044         
00045         _this->change_state(THREAD_RUNNING);
00046         
00047         _this->_ev_start.signal();
00048         
00049         while (_this->sleep())
00050         {
00051                 
00052                 while (_this->execute());
00053         }
00054 
00055         
00056         
00057         _this->_ev_end.signal();
00058         
00059         return 0;
00060 }
00061 
00062 thread::thread() :
00063         _job_task(0, 0, INFINITE, 0),
00064         _state(THREAD_CLOSE),
00065         _handle(0)
00066 {
00067 }
00068         
00069 thread::~thread() 
00070 {
00071         if (_handle)
00072                 stop();
00073 }
00074 
00075 thread_state 
00076 thread::get_state() const
00077 {
00078         
00079         mutex_keeper keeper(_mtx);
00080         return _state;
00081 }
00082 
00083 bool 
00084 thread::change_state(thread_state new_state)
00085 {
00086         
00087         mutex_keeper keeper(_mtx);
00088         
00089         
00090 
00091 
00092 
00093 
00094 
00095 
00096 
00097 
00098 
00099         
00100 
00101 
00102 
00103 
00104 
00105 
00106         switch (_state)
00107         {
00108                 case THREAD_CLOSE:
00109                         return (new_state == THREAD_STARTING) ? (_state = new_state, true) : false;
00110                 case THREAD_STARTING:
00111                         return (new_state == THREAD_RUNNING 
00112                                         || new_state == THREAD_CLOSE
00113                                         ) ? (_state = new_state, true) : false;
00114                 case THREAD_SLEEPING:
00115                         return (new_state == THREAD_RUNNING 
00116                                         || new_state == THREAD_STOPPING
00117                                         ) ? (_state = new_state, true) : false;
00118                 case THREAD_RUNNING:
00119                         return (new_state == THREAD_SLEEPING 
00120                                         || new_state == THREAD_STOPPING
00121                                         ) ? (_state = new_state, true) : false;
00122                 case THREAD_STOPPING:
00123                         return (new_state == THREAD_CLOSE) ? (_state = new_state, true) : false;
00124                 default:
00125                         assert(false);
00126         }
00127 
00128         return false;
00129 }
00130 
00131 bool 
00132 thread::inside_thread() const
00133 {
00134         
00135 #if OS_TYPE == OS_WIN32
00136         return ::GetCurrentThread() == _handle;
00137 #else
00138         return pthread_self() == _handle;
00139 #endif
00140 }
00141 
00142 bool 
00143 thread::start()
00144 {
00145         
00146         if (inside_thread())
00147                 return false;
00148 
00149         
00150         
00151         if (!change_state(THREAD_STARTING))
00152                 return false;
00153         
00154         _ev_start.nonsignal();
00155         _ev_end.nonsignal();
00156         
00157 #if OS_TYPE == OS_WIN32
00158         
00159         unsigned int threadID = 0;
00160         if (!(_handle = (void*)_beginthreadex(0, 0, start_thread, this, 0, &threadID)))
00161         {
00162                 change_state(THREAD_CLOSE);
00163                 return false;
00164         }
00165 
00166 #else
00167         if (pthread_create(&_handle, 0, start_thread, this))
00168                 return false;
00169 #endif
00170 
00171         
00172         _ev_start.wait();
00173         return true;
00174 }
00175 
00176 bool 
00177 thread::assign_job(const job_task& job_task)
00178 {
00179         
00180         if (inside_thread())
00181                 return false;
00182 
00183         
00184         mutex_keeper keeper(_mtx_job);
00185         switch (_state)
00186         {
00187                 case THREAD_SLEEPING:
00188                 case THREAD_RUNNING:
00189                         _job_task = job_task;
00190                         break;
00191                 default:
00192                         return false;
00193         }
00194 
00195         keeper.unlock();
00196         
00197         _ev_wakeup.signal();
00198         return true;
00199 }
00200 
00201 bool 
00202 thread::cancel_job()
00203 {       
00204         
00205         if (inside_thread())
00206                 return false;
00207 
00208         
00209         mutex_keeper keeper(_mtx_job);
00210         switch (_state)
00211         {
00212                 case THREAD_SLEEPING:
00213                 case THREAD_RUNNING:
00214                         _job_task.clear();
00215                         break;
00216                 default:
00217                         return false;
00218         }
00219 
00220         keeper.unlock();
00221 
00222         
00223         _ev_wakeup.signal();
00224         return true;
00225 }
00226 
00227 bool 
00228 thread::stop()
00229 {
00230         
00231         if (inside_thread())
00232                 return false;
00233 
00234         
00235         if (!change_state(THREAD_STOPPING))
00236                 return false;
00237 
00238         
00239         _ev_wakeup.signal();
00240         
00241         _ev_end.wait();
00242 #if OS_TYPE == OS_WIN32
00243         
00244         if (WAIT_TIMEOUT == WaitForSingleObject(_handle, 1000))
00245                 
00246                 TerminateThread(_handle, 0);
00247 
00248         
00249         CloseHandle(_handle);
00250 
00251 #else
00252         if (pthread_detach(_handle) != 0)
00253                 pthread_cancel(_handle);
00254 #endif
00255 
00256         _handle = 0;
00257 
00258         
00259         if (!change_state(THREAD_CLOSE))
00260         {
00261                 assert(false);
00262                 return false;
00263         }
00264 
00265         return true;
00266 }
00267 
00268 void 
00269 thread::wakeup() const
00270 {
00271         _ev_wakeup.signal();
00272 }
00273 
00274 bool 
00275 thread::execute()
00276 {
00277         mutex_keeper keeper(_mtx_job);
00278         if (_state != THREAD_RUNNING 
00279                 || !_job_task._employer 
00280                 )
00281                 return false;
00282 
00283         
00284         job_task task = _job_task;
00285         
00286         keeper.unlock();
00287 
00288 
00289         try
00290         {
00291                 if (!task._employer->v_has_job(task._ident, task._user_data)) 
00292                         return false;
00293 
00294                 task._employer->v_do_job(task._ident, task._user_data);
00295         }
00296         catch (...)
00297         {
00298                 
00299                 
00300                 
00301                 assert(false);
00302         }
00303 
00304         return true;
00305 }
00306 
00307 bool 
00308 thread::sleep()
00309 {
00310         
00311         if (!change_state(THREAD_SLEEPING))
00312                 return false;
00313 
00314         
00315         mutex_keeper keeper(_mtx_job);
00316         size_t wait_timeout = _job_task._timeout;
00317         keeper.unlock();
00318 
00319         
00320         _ev_wakeup.wait(wait_timeout);
00321 
00322         
00323         return change_state(THREAD_RUNNING);
00324 }
00325 
00327 
00328 thread* 
00329 thread_creator::create(const job_task& task)
00330 {
00331         thread* obj = new thread;
00332         if (obj)
00333                 obj->start(); 
00334         return obj;
00335 }
00336 
00337 
00338 void 
00339 thread_creator::activate(thread* obj, const job_task& task)
00340 {
00341         
00342         if (obj->get_state() == THREAD_CLOSE)
00343                 obj->start();
00344         obj->assign_job(task);
00345 }
00346 
00347 
00348 void 
00349 thread_creator::back(thread* obj, const job_task&)
00350 {
00351         obj->cancel_job(); 
00352 }
00353 
00354 
00355 void 
00356 thread_creator::destroy(thread* obj, const job_task& task)
00357 {
00358         deactivate(obj, task);
00359         delete obj;
00360 }
00361 
00362 
00363 void 
00364 thread_creator::deactivate(thread* obj, const job_task& task)
00365 {
00366         back(obj, task);
00367         obj->stop();
00368 }
00369 
00370 
00371 #pragma pack()
00372 END_TERIMBER_NAMESPACE