00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "aiomsg/msg_user.h"
00029 #include "base/map.hpp"
00030 #include "base/list.hpp"
00031 #include "base/memory.hpp"
00032 #include "base/string.hpp"
00033 #include "base/common.hpp"
00034 #include "base/template.hpp"
00035 #include "base/stack.hpp"
00036 #include "base/number.hpp"
00037
00038 BEGIN_TERIMBER_NAMESPACE
00039 #pragma pack(4)
00040
00041 static const char* s_err_text = "Asyncronic sending timeout occured";
00042
00043
00044
00045
00046
00047 const size_t callbacks_thread_ident = 0;
00048 const size_t timeouts_thread_ident = 1;
00049
00050 msg_wait_reply::msg_wait_reply(event* event_) :
00051 _event(event_), _reply(0)
00052 {
00053 assert(_event);
00054 }
00055
00056 msg_wait_async_reply::msg_wait_async_reply(const guid_t& ident, ub8_t timeout_) :
00057 _ident(ident), _reply(0)
00058 {
00059 date date_;
00060 date_ += timeout_;
00061 _expired = date_ ;
00062 }
00063
00065 msg_user_connection::msg_user_connection(msg_communicator* communicator_, msg_callback_notify* callback_, const conf_connection& info_, size_t additional_threads) :
00066 msg_connection(communicator_, info_), _callback(callback_),
00067 _additional_threads(additional_threads)
00068 {
00069 }
00070
00071
00072 msg_user_connection::~msg_user_connection()
00073 {
00074
00075 mutex_keeper keeper(_mtx_wait);
00076 for (reply_map_t::iterator iter = _map.begin(); iter != _map.end(); ++iter)
00077 {
00078 msg_cpp* msg = iter->get_reply();
00079 if (msg)
00080 _communicator->destroy_msg(msg);
00081
00082 iter->set_reply(0);
00083 }
00084
00085 _map.clear();
00086
00087
00088 mutex_keeper keeper_async(_mtx_async_wait);
00089 for (reply_async_map_t::iterator iter_async = _async_map.begin(); iter_async != _async_map.end(); ++iter_async)
00090 {
00091 msg_cpp* msg = iter_async->get_reply();
00092 _communicator->destroy_msg(msg);
00093 }
00094
00095 _async_map.clear();
00096
00097
00098 for (reply_async_list_t::iterator iter_list = _async_list.begin(); iter_list != _async_list.end(); ++iter_list)
00099 {
00100 msg_cpp* msg = iter_list->get_reply();
00101 _communicator->destroy_msg(msg);
00102 }
00103
00104 _async_list.clear();
00105
00106 }
00107
00108
00109
00110 void
00111 msg_user_connection::ping_notify()
00112 {
00113 if (peek_async_timeouted())
00114
00115 _communicator->get_thread_manager().borrow_thread(timeouts_thread_ident, 0, this, stay_on_alert_time);
00116
00117 set_last_activity();
00118 }
00119
00120
00121 void
00122 msg_user_connection::wakeup()
00123 {
00124 if (!_additional_threads || !_communicator->get_thread_manager().borrow_from_range(queue_thread_ident, queue_thread_ident + _additional_threads, 0, this, stay_on_alert_time))
00125 _communicator->get_thread_manager().borrow_thread(queue_thread_ident, 0, this, stay_on_alert_time);
00126 }
00127
00129
00130 void
00131 msg_user_connection::v_off()
00132 {
00133 mutex_keeper keeper(_mtx_wait);
00134 for (reply_map_t::iterator iter = _map.begin(); iter != _map.end(); ++iter)
00135 {
00136 msg_cpp* msg = iter->get_reply();
00137 if (msg)
00138 _communicator->destroy_msg(msg);
00139
00140 iter->set_reply(0);
00141 }
00142
00143
00144 msg_queue_processor::v_off();
00145 }
00146
00147 void
00148 msg_user_connection::push_msg(msg_cpp* msg_)
00149 {
00150
00151 switch (msg_->_type)
00152 {
00153 case user_type_reply:
00154 {
00155
00156 mutex_keeper keeper(_mtx_wait);
00157 reply_map_t::iterator iter = _map.find(msg_->_marker);
00158 if (iter != _map.end())
00159 iter->set_reply(msg_);
00160 else
00161 _communicator->destroy_msg(msg_);
00162 }
00163 break;
00164 case user_type_reply_async:
00165 {
00166
00167 mutex_keeper keeper_async(_mtx_async_wait);
00168
00169 reply_async_map_t::iterator iter_async = _async_map.find(msg_->_marker);
00170 if (iter_async != _async_map.end())
00171 {
00172
00173
00174
00175 iter_async->set_reply(msg_);
00176
00177 _async_list.push_back(*iter_async);
00178 _async_map.erase(iter_async);
00179
00180 _communicator->get_thread_manager().borrow_thread(callbacks_thread_ident, 0, this, stay_on_alert_time);
00181 }
00182 else
00183 {
00184 _communicator->destroy_msg(msg_);
00185 _communicator->get_thread_manager().borrow_thread(timeouts_thread_ident, 0, this, stay_on_alert_time);
00186 }
00187 }
00188 break;
00189 default:
00190
00191 msg_connection::push_msg(msg_);
00192 break;
00193 }
00194 }
00195
00196 bool
00197 msg_user_connection::send(bool copy, msg_cpp* msg_, msg_cpp*& reply_)
00198 {
00199 if (!msg_ || msg_->_receiver == _info._address || msg_->_receiver == null_uuid)
00200 {
00201 _error = "Invalid receiver address";
00202 return false;
00203 }
00204
00205 if (msg_->priority != MSG_PRIORITY_HIGH
00206 || msg_->priority != MSG_PRIORITY_NORMAL)
00207 {
00208 msg_->priority = MSG_PRIORITY_NORMAL;
00209 }
00210
00211 if (msg_->msgid & 0x80000000)
00212 {
00213 msg_->msgid &= 0x7FFFFFFF;
00214 }
00215
00216 pool_object_keeper< event_pool_t > ekeeper(&_communicator->get_event_pool(), 0, (size_t)msg_->timeout);
00217 if (!ekeeper)
00218 {
00219 _error = "no enough event resources";
00220 return false;
00221 }
00222
00223 ekeeper->nonsignal();
00224
00225 msg_wait_reply _wait(ekeeper);
00226
00227 ub8_t timeout = msg_->timeout;
00228 guid_t marker = uuid_gen();
00229
00230 {
00231 mutex_keeper keeper(_mtx_wait);
00232 _map.insert(marker, _wait);
00233 }
00234
00235 msg_creator creator(_communicator);
00236 msg_pointer_t msg__(creator);
00237
00238 try
00239 {
00240
00241 msg__ = copy ? _communicator->copy_msg(msg_) : msg_;
00242
00243 msg__->_sender = _info._address;
00244 msg__->_marker = marker;
00245
00246 msg__->_type = user_type_send;
00247
00248 _communicator->comm_msg(msg__);
00249 msg__.detach();
00250 }
00251 catch (exception& err)
00252 {
00253
00254 if (!copy)
00255 msg__.detach();
00256
00257
00258 _error = err.what();
00259
00260 mutex_keeper keeper(_mtx_wait);
00261 _map.erase(marker);
00262
00263 return false;
00264 }
00265
00266
00267 if (WAIT_OBJECT_0 == ekeeper->wait((size_t)timeout))
00268 {
00269
00270 mutex_keeper keeper(_mtx_wait);
00271 reply_map_t::iterator iter = _map.find(marker);
00272 if (iter != _map.end())
00273 {
00274
00275 reply_ = iter->get_reply();
00276
00277 _map.erase(iter);
00278
00279 return true;
00280 }
00281 }
00282 else
00283 {
00284
00285 mutex_keeper keeper(_mtx_wait);
00286 _map.erase(marker);
00287 }
00288
00289
00290 _error = "Timeout occured";
00291
00292 try
00293 {
00294 reply_ = _communicator->construct_msg(_error.length());
00295
00296 reply_->msgid = MSG_ERROR_ID;
00297 memcpy(reply_->get_body(), (const char*)_error, _error.length());
00298 }
00299 catch (exception&)
00300 {
00301 }
00302
00303 return true;
00304 }
00305
00306 guid_t
00307 msg_user_connection::send_async(bool copy, msg_cpp* msg_)
00308 {
00309 if (!msg_ || msg_->_receiver == _info._address|| msg_->_receiver == null_uuid)
00310 {
00311 _error = "Invalid receiver address";
00312 return null_uuid;
00313 }
00314
00315 if (msg_->priority != MSG_PRIORITY_HIGH
00316 || msg_->priority != MSG_PRIORITY_NORMAL)
00317 {
00318 msg_->priority = MSG_PRIORITY_NORMAL;
00319 }
00320
00321 if (msg_->msgid & 0x80000000)
00322 {
00323 msg_->msgid &= 0x7FFFFFFF;
00324 }
00325
00326
00327 guid_t marker = uuid_gen();
00328
00329 msg_wait_async_reply _wait(marker, msg_->timeout);
00330
00331 msg_creator creator(_communicator);
00332 msg_pointer_t msg__(creator);
00333
00334 {
00335 mutex_keeper keeper(_mtx_async_wait);
00336 _async_map.insert(marker, _wait);
00337 }
00338
00339
00340 try
00341 {
00342
00343 msg__ = copy ? _communicator->copy_msg(msg_) : msg_;
00344
00345 msg__->_sender = _info._address;
00346 msg__->_marker = marker;
00347
00348 msg__->_type = user_type_send_async;
00349
00350 _communicator->comm_msg(msg__);
00351 msg__.detach();
00352 }
00353 catch (exception& err)
00354 {
00355
00356 if (!copy)
00357 msg__.detach();
00358
00359
00360 _error = err.what();
00361
00362 return null_uuid;
00363 }
00364
00365 return marker;
00366 }
00367
00368 bool
00369 msg_user_connection::post(bool copy, msg_cpp* msg_)
00370 {
00371 if (!msg_ || msg_->_receiver == _info._address || msg_->_receiver == null_uuid)
00372 {
00373 _error = "Invalid receiver address";
00374 return false;
00375 }
00376
00377 if (msg_->priority != MSG_PRIORITY_HIGH
00378 || msg_->priority != MSG_PRIORITY_NORMAL)
00379 {
00380 msg_->priority = MSG_PRIORITY_NORMAL;
00381 }
00382
00383 if (msg_->msgid & 0x80000000)
00384 {
00385 msg_->msgid &= 0x7FFFFFFF;
00386 }
00387
00388 msg_creator creator(_communicator);
00389 msg_pointer_t msg__(creator);
00390
00391 try
00392 {
00393
00394 msg__ = copy ? _communicator->copy_msg(msg_) : msg_;
00395
00396 msg__->_sender = _info._address;
00397
00398 msg__->_type = user_type_post;
00399
00400 _communicator->comm_msg(msg__);
00401 msg__.detach();
00402 }
00403 catch (exception& err)
00404 {
00405
00406 if (!copy)
00407 msg__.detach();
00408
00409 _error = err.what();
00410 return false;
00411 }
00412
00413 return true;
00414 }
00415
00416
00417 bool
00418 msg_user_connection::v_has_job(size_t ident, void* user_data)
00419 {
00420 if (!msg_base::is_on())
00421 return false;
00422
00423 switch (ident)
00424 {
00425 case callbacks_thread_ident:
00426 return peek_async();
00427 case timeouts_thread_ident:
00428 return peek_async_timeouted();
00429 case queue_thread_ident:
00430 default:
00431 return msg_queue_processor::v_has_job(ident, user_data);
00432 }
00433 }
00434
00435 void
00436 msg_user_connection::v_do_job(size_t ident, void* user_data)
00437 {
00438 switch (ident)
00439 {
00440 case callbacks_thread_ident:
00441 {
00442 msg_creator creator(_communicator);
00443 msg_cpp* msg_ = 0;
00444 guid_t marker;
00445 if (pop_async(msg_, marker))
00446 {
00447 msg_pointer_t msg(creator);
00448 msg = msg_;
00449
00450 if (_callback)
00451 {
00452 try
00453 {
00454 if (_callback->async_callback(msg, marker))
00455 msg.detach();
00456 }
00457 catch (...)
00458 {
00459 }
00460 }
00461 }
00462 }
00463 break;
00464 case timeouts_thread_ident:
00465 {
00466 guid_t marker;
00467 while (pop_async_timeouted(marker))
00468 {
00469 if (_callback)
00470 {
00471 msg_creator creator(_communicator);
00472 msg_pointer_t err(creator, 0);
00473 msg_pack::make_error_msg(err, s_err_text);
00474
00475 err->_receiver = err->_sender = _info._address;
00476 try
00477 {
00478 if (_callback->async_callback(err, marker))
00479 err.detach();
00480 }
00481 catch (...)
00482 {
00483 }
00484 }
00485 }
00486 }
00487 break;
00488 case queue_thread_ident:
00489 default:
00490 process_income_message();
00491 }
00492 }
00493
00494 void
00495 msg_user_connection::process_income_message()
00496 {
00497 msg_creator creator(_communicator);
00498 msg_cpp* msg_ = 0;
00499
00500 if (pop(msg_))
00501 {
00502 msg_pointer_t msg(creator);
00503 msg = msg_;
00504
00505 if (_additional_threads && peek())
00506 {
00507 if (!_additional_threads || !_communicator->get_thread_manager().borrow_from_range(queue_thread_ident, queue_thread_ident + _additional_threads, 0, this, stay_on_alert_time))
00508 _communicator->get_thread_manager().borrow_thread(queue_thread_ident, 0, this, stay_on_alert_time);
00509 }
00510
00511
00512 if (_callback)
00513 {
00514 try
00515 {
00516
00517 if (msg->_type == user_type_send || msg->_type == user_type_send_async)
00518 {
00519 msg_pointer_t reply(creator);
00520 reply = _communicator->construct_msg(0);
00521
00522
00523 sb8_t old_timestamp = msg->_timestamp;
00524 ub8_t old_timeout = msg->timeout;
00525 guid_t old_sender = msg_->_sender;
00526
00527 msg_pack::make_reply_msg(msg, reply);
00528
00529 if (_callback->incoming_callback(msg, reply))
00530 msg.detach();
00531
00532
00533
00534 reply->_timestamp = old_timestamp;
00535 reply->timeout = old_timeout;
00536 reply->_receiver = old_sender;
00537
00538
00539 _communicator->comm_msg(reply);
00540 reply.detach();
00541 }
00542 else if (msg->_type == user_type_post)
00543 {
00544 if (_callback->incoming_callback(msg, 0))
00545 msg.detach();
00546 }
00547 else
00548 assert(false);
00549 }
00550 catch (exception&)
00551 {
00552 }
00553 }
00554
00555 }
00556 }
00557
00558
00559 bool
00560 msg_user_connection::pop_async(msg_cpp*& msg_, guid_t& ident)
00561 {
00562 mutex_keeper keeper(_mtx_async_wait);
00563 if (_async_list.empty()) return false;
00564 msg_ = _async_list.front().get_reply_data(ident);
00565 _async_list.pop_front();
00566 return true;
00567 }
00568
00569 bool
00570 msg_user_connection::peek_async()
00571 {
00572 mutex_keeper keeper(_mtx_async_wait);
00573 return !_async_list.empty();
00574 }
00575
00576 bool
00577 msg_user_connection::pop_async_timeouted(guid_t& ident)
00578 {
00579 mutex_keeper keeper(_mtx_async_wait);
00580 date now;
00581 for (reply_async_map_t::iterator iter = _async_map.begin(); iter != _async_map.end(); ++iter)
00582 if (iter->is_expired(now))
00583 {
00584 iter->get_reply_data(ident);
00585 _async_map.erase(iter);
00586 return true;
00587 }
00588
00589 return false;
00590 }
00591
00592 bool
00593 msg_user_connection::peek_async_timeouted()
00594 {
00595 mutex_keeper keeper(_mtx_async_wait);
00596 date now;
00597 for (reply_async_map_t::iterator iter = _async_map.begin(); iter != _async_map.end(); ++iter)
00598 if (iter->is_expired(now))
00599 return true;
00600
00601 return false;
00602 }
00603
00604
00605 msg_user_connection*
00606 msg_user_connection::connect(msg_communicator* communicator_, msg_callback_notify* callback_, const conf_connection& info_, size_t additional_thread)
00607 {
00608
00609 msg_user_connection* connection = new msg_user_connection(communicator_, callback_, info_, additional_thread);
00610 connection->_state = CONN_STATE_CONNECTED;
00611
00612 try
00613 {
00614
00615 communicator_->add_connection(connection);
00616 }
00617 catch (exception& x)
00618 {
00619 delete connection;
00620 throw x;
00621 }
00622
00623
00624 return connection;
00625 }
00626
00627 #pragma pack()
00628 END_TERIMBER_NAMESPACE