00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "threadpool/threadpool.h"
00029 #include "base/map.hpp"
00030 #include "base/list.hpp"
00031 #include "base/template.hpp"
00032 #include "base/common.hpp"
00033 #include "base/memory.hpp"
00034
00035 static const size_t housekeeper_timeout = 10000;
00036
00037 terimber_threadpool_factory::terimber_threadpool_factory()
00038 {
00039 }
00040
00041 terimber_threadpool_factory::~terimber_threadpool_factory()
00042 {
00043 }
00044
00045
00046 terimber_threadpool*
00047 terimber_threadpool_factory::get_thread_pool(terimber_log* log, size_t capacity, size_t deactivate_time_msec)
00048 {
00049 TERIMBER::threadpool* obj = new TERIMBER::threadpool(capacity, deactivate_time_msec);
00050 if (obj)
00051 {
00052 obj->log_on(log);
00053 obj->on();
00054 }
00055
00056 return obj;
00057 }
00058
00059
00060 BEGIN_TERIMBER_NAMESPACE
00061 #pragma pack(4)
00062
00064 threadpool::threadpool(size_t capacity, size_t deactivate_time_msec) :
00065 _on(false),
00066 _capacity(capacity),
00067 _thread_in_use(0),
00068 _thread_pool(thread_creator::static_constructor(), capacity),
00069 _deactivate_time_msec(deactivate_time_msec)
00070 {
00071 }
00072
00073 threadpool::~threadpool()
00074 {
00075 off();
00076 }
00077
00078 bool
00079 threadpool::on()
00080 {
00081 if (_on)
00082 return false;
00083
00084 format_logging(0, __FILE__, __LINE__, en_log_info, "starting thread pool...");
00085
00086
00087 job_task job(this, 0, housekeeper_timeout, 0);
00088 _housekeeper.start();
00089 _on = true;
00090 _housekeeper.assign_job(job);
00091 format_logging(0, __FILE__, __LINE__, en_log_info, "thread pool started");
00092 return true;
00093 }
00094
00095 void
00096 threadpool::off()
00097 {
00098 if (!_on)
00099 return;
00100
00101 format_logging(0, __FILE__, __LINE__, en_log_info, "stopping thread pool...");
00102
00103
00104 mutex_keeper guard(_clients_mtx);
00105
00106 _on = false;
00107
00108
00109 guard.unlock();
00110
00111
00112 _housekeeper.cancel_job();
00113 _housekeeper.stop();
00114
00115 guard.lock();
00116
00117
00118 _clean_up_clients(0);
00119
00120
00121 while (!_dispose_queue.empty())
00122 {
00123 thread* obj = _dispose_queue.front()._obj;
00124 _dispose_queue.pop_front();
00125 guard.unlock();
00126 _thread_pool.return_object(obj);
00127 guard.lock();
00128 }
00129
00130 guard.unlock();
00131
00132
00133 _thread_pool.clear();
00134
00135 format_logging(0, __FILE__, __LINE__, en_log_info, "thread pool stopped");
00136 }
00137
00138
00139
00140 bool
00141 threadpool::v_has_job(size_t ident, void* user_data)
00142 {
00143
00144 mutex_keeper guard(_clients_mtx);
00145
00146 if (!_on)
00147 {
00148 format_logging(0, __FILE__, __LINE__, en_log_error, "thread pool is not activated");
00149 return false;
00150 }
00151
00152
00153 if (user_data == 0)
00154 {
00155
00156 if (!_dispose_queue.empty())
00157 return true;
00158
00159 guard.unlock();
00160
00161 _thread_pool.deactivate(_deactivate_time_msec);
00162 return false;
00163 }
00164
00165
00166
00167 terimber_thread_employer* client = (terimber_thread_employer*)user_data;
00168
00169 client_info_map_t::iterator it_client;
00170 ident_info_map_t::iterator it_info;
00171
00172
00173 if (!_validate_client(ident, client, it_client, it_info))
00174 return false;
00175
00176
00177 assert(it_info->_client == client);
00178 assert(it_info->_ident == ident);
00179
00180
00181 it_info->_wasted_calls = 0;
00182
00183
00184 client_thread_info rinfo(*it_info);
00185
00186
00187 guard.unlock();
00188
00189
00190 if (rinfo._client->v_has_job(rinfo._ident, rinfo._data))
00191 return true;
00192
00193
00194
00195
00196 guard.lock();
00197
00198 if (!_validate_client(ident, client, it_client, it_info))
00199 return false;
00200
00201
00202 assert(it_info->_client == client);
00203 assert(it_info->_ident == ident);
00204
00205
00206 if (it_info->_wasted_calls == 1)
00207 {
00208
00209 _dispose_queue.push_back(*it_info);
00210
00211 it_client->erase(_map_allocator, it_info);
00212
00213 if (it_client->empty())
00214 _clients_map.erase(it_client);
00215 }
00216 else
00217 {
00218 ++it_info->_wasted_calls;
00219 }
00220
00221 return false;
00222 }
00223
00224
00225 void
00226 threadpool::v_do_job(size_t ident, void* user_data)
00227 {
00228 mutex_keeper guard(_clients_mtx);
00229
00230 if (!_on)
00231 {
00232 format_logging(0, __FILE__, __LINE__, en_log_error, "thread pool is not activated");
00233 return;
00234 }
00235
00236
00237 if (user_data == 0)
00238 {
00239 if (_dispose_queue.empty())
00240 return;
00241
00242
00243 client_thread_info info(_dispose_queue.front());
00244
00245 _dispose_queue.pop_front();
00246
00247
00248 assert(_thread_in_use > 0);
00249
00250
00251 --_thread_in_use;
00252
00253
00254 guard.unlock();
00255
00256
00257 _thread_pool.return_object(info._obj);
00258 format_logging(0, __FILE__, __LINE__, en_log_info, "return thread %d for client %d back to pool", ident, info._client);
00259 }
00260 else
00261 {
00262
00263 terimber_thread_employer* client = (terimber_thread_employer*)user_data;
00264
00265 client_info_map_t::iterator it_client;
00266 ident_info_map_t::iterator it_info;
00267
00268 if (!_validate_client(ident, client, it_client, it_info))
00269 return;
00270
00271
00272 assert(it_info->_client == client);
00273 assert(it_info->_ident == ident);
00274
00275
00276 client_thread_info rinfo(*it_info);
00277
00278
00279 guard.unlock();
00280
00281
00282 rinfo._client->v_do_job(rinfo._ident, rinfo._data);
00283
00284 format_logging(0, __FILE__, __LINE__, en_log_paranoid, "execute v_do_job thread %d for client %d", ident, rinfo._client);
00285 }
00286 }
00287
00288 bool
00289 threadpool::_validate_client(size_t ident, terimber_thread_employer* obj, client_info_map_t::iterator& it_client, ident_info_map_t::iterator& it_info)
00290 {
00291
00292 return ((it_client = _clients_map.find(obj)) == _clients_map.end()) ? false : ((it_info = it_client->find(ident)) != it_client->end());
00293 }
00294
00295
00296 bool
00297 threadpool::borrow_thread(size_t ident, void* data, terimber_thread_employer* client, size_t stay_on_alert_time_msec)
00298 {
00299
00300 if (!client)
00301 {
00302 assert(false);
00303 format_logging(0, __FILE__, __LINE__, en_log_error, "null pointer for user callback");
00304 return false;
00305 }
00306
00307
00308 mutex_keeper guard(_clients_mtx);
00309
00310 if (!_on)
00311 {
00312 format_logging(0, __FILE__, __LINE__, en_log_error, "thread pool is not activated");
00313 return false;
00314 }
00315
00316 client_info_map_t::iterator it_client;
00317 ident_info_map_t::iterator it_info;
00318
00319
00320 if (_validate_client(ident, client, it_client, it_info))
00321 {
00322
00323 assert(it_info->_obj);
00324
00325
00326 it_info->_wasted_calls = 0;
00327
00328
00329 client_thread_info rinfo(*it_info);
00330
00331
00332 guard.unlock();
00333
00334
00335 rinfo._obj->wakeup();
00336
00337 format_logging(0, __FILE__, __LINE__, en_log_paranoid, "wakeup thread %d for client %d", ident, client);
00338 }
00339 else
00340 {
00341
00342 if (_thread_in_use == _capacity)
00343 {
00344 format_logging(0, __FILE__, __LINE__, en_log_error, "no threads are available, max capacity is reached");
00345 return false;
00346 }
00347
00348 try
00349 {
00350
00351 ident_info_map_t dummy;
00352 client_info_map_t::pairib_t it_client = _clients_map.insert(client, dummy);
00353
00354
00355 client_thread_info info(0, ident, data, client);
00356
00357
00358 job_task task(this, ident, stay_on_alert_time_msec, client);
00359
00360 info._obj = _thread_pool.loan_object(task);
00361
00362 assert(info._obj);
00363
00364 try
00365 {
00366
00367 it_client.first->insert(_map_allocator, ident, info);
00368 }
00369 catch (exception&)
00370 {
00371 _clients_map.erase(it_client.first);
00372 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
00373 return false;
00374 }
00375 }
00376 catch (exception&)
00377 {
00378 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
00379 return false;
00380 }
00381
00382
00383
00384 ++_thread_in_use;
00385
00386
00387 guard.unlock();
00388
00389 format_logging(0, __FILE__, __LINE__, en_log_paranoid, "open new thread %d for client %d", ident, client);
00390 }
00391
00392 return true;
00393 }
00394
00395
00396 bool
00397 threadpool::borrow_from_range(size_t from, size_t to, void* data, terimber_thread_employer* client, size_t stay_on_alert_time_msec)
00398 {
00399
00400 if (!client || from > to)
00401 {
00402 assert(false);
00403 format_logging(0, __FILE__, __LINE__, en_log_error, "null pointer for user callback or invalid range");
00404 return false;
00405 }
00406
00407
00408 size_t from_ = from, to_ = to;
00409
00410 mutex_keeper guard(_clients_mtx);
00411
00412 if (!_on)
00413 {
00414 format_logging(0, __FILE__, __LINE__, en_log_error, "thread pool is not activated");
00415 return false;
00416 }
00417
00418
00419 try
00420 {
00421 ident_info_map_t dummy;
00422 client_info_map_t::pairib_t it_client = _clients_map.insert(client, dummy);
00423
00424 if (!it_client.second)
00425 {
00426 bool hole_flag = false;
00427
00428 ident_info_map_t::iterator it_info = it_client.first->lower_bound(from);
00429
00430
00431 while (it_info != it_client.first->end()
00432 && it_info.key() <= to)
00433 {
00434 if (from != it_info.key())
00435 break;
00436
00437 if (it_info->_obj->get_state() == THREAD_SLEEPING)
00438 {
00439
00440 it_info->_wasted_calls = 0;
00441
00442
00443 client_thread_info rinfo(*it_info);
00444
00445
00446 guard.unlock();
00447
00448
00449 rinfo._obj->wakeup();
00450
00451 format_logging(0, __FILE__, __LINE__, en_log_paranoid, "wakeup thread %d for client %d", from, rinfo._client);
00452
00453 return true;
00454 }
00455 else
00456 {
00457
00458 ++from;
00459 ++it_info;
00460 }
00461 }
00462
00463
00464 if (from > to || _thread_in_use == _capacity)
00465 {
00466 format_logging(0, __FILE__, __LINE__, en_log_info, "no threads are available within the range [%d, %d] for client %d", from_, to_, client);
00467
00468 return false;
00469 }
00470 }
00471 else
00472 {
00473
00474 if (_thread_in_use == _capacity)
00475 {
00476 _clients_map.erase(it_client.first);
00477 format_logging(0, __FILE__, __LINE__, en_log_error, "no threads are available, max capacity is reached");
00478 return false;
00479 }
00480 }
00481
00482
00483 client_thread_info info(0, from, data, client);
00484
00485
00486 job_task task(this, from, stay_on_alert_time_msec, client);
00487
00488 info._obj = _thread_pool.loan_object(task);
00489
00490 assert(info._obj);
00491
00492 try
00493 {
00494
00495 it_client.first->insert(_map_allocator, from, info);
00496 }
00497 catch (exception&)
00498 {
00499 _clients_map.erase(it_client.first);
00500 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
00501 return false;
00502 }
00503
00504
00505 ++_thread_in_use;
00506 }
00507 catch (exception&)
00508 {
00509 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
00510 return false;
00511 }
00512
00513
00514 guard.unlock();
00515
00516 format_logging(0, __FILE__, __LINE__, en_log_paranoid, "open new thread %d for client %d", from, client);
00517
00518 return true;
00519 }
00520
00521
00522 void
00523 threadpool::revoke_client(terimber_thread_employer* client)
00524 {
00525 if (client)
00526 {
00527
00528 mutex_keeper guard(_clients_mtx);
00529 _clean_up_clients(client);
00530 format_logging(0, __FILE__, __LINE__, en_log_info, "revoke client %d", client);
00531 }
00532 }
00533
00534 void
00535 threadpool::_clean_up_clients(terimber_thread_employer* client)
00536 {
00537
00538 if (client)
00539 {
00540 client_info_map_t::iterator it_client = _clients_map.find(client);
00541
00542 if (it_client != _clients_map.end())
00543 {
00544 while (!it_client->empty())
00545 {
00546 ident_info_map_t::iterator it_info = it_client->begin();
00547 _dispose_queue.push_back(*it_info);
00548 it_client->erase(_map_allocator, it_info);
00549 }
00550
00551 _clients_map.erase(it_client);
00552 }
00553
00554 _housekeeper.wakeup();
00555 }
00556 else
00557 {
00558 while (!_clients_map.empty())
00559 {
00560 client_info_map_t::iterator it_client = _clients_map.begin();
00561
00562 while (!it_client->empty())
00563 {
00564 ident_info_map_t::iterator it_info = it_client->begin();
00565 _dispose_queue.push_back(*it_info);
00566 it_client->erase(_map_allocator, it_info);
00567 }
00568
00569 _clients_map.erase(it_client);
00570 }
00571 }
00572 }
00573
00574
00575 void
00576 threadpool::doxray()
00577 {
00578
00579 mutex_keeper guard(_clients_mtx);
00580
00581 size_t threads = _thread_in_use,
00582 capacity = _capacity,
00583 clients = _clients_map.size(),
00584 disposal = _dispose_queue.size();
00585
00586 guard.unlock();
00587
00588 format_logging(0, __FILE__, __LINE__, en_log_xray, "<threadpool threads=\"%d\" capacity=\"%d\" clients=\"%d\" disposal=\"%d\" />",
00589 threads, capacity, clients, disposal);
00590 }
00591
00592 #pragma pack()
00593 END_TERIMBER_NAMESPACE
00594