00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "aiomsg/msg_comm.h"
00029 #include "aiomsg/msg_lsnr.h"
00030 #include "aiomsg/msg_conn.h"
00031 #include "aiomsg/msg_sock.h"
00032 #if OS_TYPE == OS_WIN32
00033 #include "aiomsg/msg_rpc.h"
00034 #endif
00035
00036 #include "base/list.hpp"
00037 #include "base/map.hpp"
00038 #include "base/template.hpp"
00039 #include "base/memory.hpp"
00040 #include "base/string.hpp"
00041 #include "base/common.hpp"
00042 #include "base/vector.hpp"
00043 #include "base/number.hpp"
00044 #include "base/keymaker.hpp"
00045
00046 #include "crypt/integer.hpp"
00047 #include "crypt/arithmet.hpp"
00048 #include "xml/xmlimpl.hpp"
00049 #include "xml/sxs.hpp"
00050
00051 #include "ossock.h"
00052
00053 BEGIN_TERIMBER_NAMESPACE
00054 #pragma pack(4)
00055
00056
00057 #define MSG_DEBUG_ID 0x80000003
00058
00059
00060
00061 extern exception_table aiosockTable;
00062 extern exception_table msgMsgTable;
00063
00064
00065 const ub8_t handshake_default_timeout = 3000;
00066
00067 const char* msg_xml_dtd = \
00068 "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \
00069 <!ENTITY % kind \"(sock | rpc | p2p)\"> \
00070 <!ELEMENT msgPort (listeners?, connections?)> \
00071 <!ATTLIST msgPort \
00072 address CTYPE vt_guid #REQUIRED \
00073 debug CTYPE vt_guid #IMPLIED > \
00074 <!ELEMENT listeners (listener)*> \
00075 <!ELEMENT connections (connection)*> \
00076 <!ELEMENT listener (accept | reject)?> \
00077 <!ELEMENT reject (peer)+> \
00078 <!ELEMENT accept (peer)+> \
00079 <!ELEMENT peer EMPTY> \
00080 <!ATTLIST peer \
00081 address CTYPE vt_guid #REQUIRED \
00082 security CTYPE vt_bool #IMPLIED \
00083 password CDATA #IMPLIED > \
00084 <!ELEMENT connection EMPTY> \
00085 <!ATTLIST listener \
00086 type %kind; #REQUIRED \
00087 port CTYPE vt_ub2 #IMPLIED \
00088 network CDATA #IMPLIED \
00089 connections CTYPE vt_ub4 #IMPLIED \
00090 ping CTYPE vt_ub4 #IMPLIED \
00091 waste CTYPE vt_ub4 #IMPLIED \
00092 info CDATA #IMPLIED \
00093 security CTYPE vt_bool #IMPLIED \
00094 password CDATA #IMPLIED > \
00095 <!ATTLIST connection \
00096 type %kind; #REQUIRED \
00097 address CTYPE vt_guid #REQUIRED \
00098 port CTYPE vt_ub2 #IMPLIED \
00099 network CDATA #IMPLIED \
00100 ping CTYPE vt_ub4 #IMPLIED \
00101 waste CTYPE vt_ub4 #IMPLIED \
00102 info CDATA #IMPLIED \
00103 security CTYPE vt_bool #IMPLIED \
00104 password CDATA #IMPLIED >";
00105
00106 static size_t meesages_in_work = 0;
00108
00109
00110 keylocker msg_communicator::_this_access(1024);
00111
00112 msg_communicator::this_map_t msg_communicator::_this_map;
00113
00114 msg_communicator*
00115 msg_communicator::loan_communicator(const guid_t& addr)
00116 {
00117
00118 if (!_this_access.enter(1000))
00119 return false;
00120
00121 this_map_t::iterator iter = _this_map.find(addr);
00122 if (iter == _this_map.end())
00123 {
00124 _this_access.leave();
00125 return 0;
00126 }
00127 else
00128 return *iter;
00129 }
00130
00131 void
00132 msg_communicator::return_communicator(const guid_t& addr, msg_communicator* comm)
00133 {
00134
00135 if (comm)
00136 {
00137 this_map_t::iterator iter = _this_map.find(addr);
00138 assert (iter != _this_map.end() && *iter == comm);
00139 _this_access.leave();
00140 }
00141 }
00142
00144 msg_communicator::msg_communicator() :
00145 msg_queue_processor(this),
00146 _thread_manager(max_thread_capacity, 60000),
00147 _aio_port(0, 60000)
00148 {
00149 }
00150
00151 msg_communicator::~msg_communicator()
00152 {
00153
00154 try
00155 {
00156
00157 uninit();
00158 }
00159 catch (...)
00160 {
00161 assert(false);
00162 }
00163 }
00164
00165 void
00166 msg_communicator::_register_this()
00167 {
00168
00169 keylocker_server gkeeper(_this_access, 1000);
00170
00171 this_map_t::iterator iter = _this_map.find(_address);
00172 if (iter != _this_map.end())
00173
00174 exception::_throw("Dublicate communicator address");
00175 else
00176
00177 _this_map.insert(_address, this);
00178 }
00179
00180 void
00181 msg_communicator::_revoke_this()
00182 {
00183
00184 keylocker_server gkeeper(_this_access);
00185
00186 _this_map.erase(_address);
00187 }
00188
00189
00190 msg_cpp*
00191 msg_communicator::construct_msg(size_t size)
00192 {
00193 byte_allocator* all = 0;
00194 msg_cpp* msg = 0;
00195
00196 if ((all = _manager.loan_object((size_t)msg_cpp::estimate_size(size)))
00197 && (msg = msg_cpp::construct(all, size)))
00198
00199 {
00200
00201 return msg;
00202 }
00203 else
00204 {
00205
00206 if (msg)
00207 destroy_msg(msg);
00208 else if (all)
00209 {
00210
00211 _manager.return_object(all);
00212 }
00213
00214
00215 exception::_throw(MSG_RESULT_NOTMEMORY, &msgMsgTable);
00216 return 0;
00217 }
00218 }
00219
00220 bool
00221 msg_communicator::destroy_msg(msg_cpp* msg_)
00222 {
00223 if (!msg_)
00224 return false;
00225
00226
00227
00228
00229 byte_allocator* all = msg_cpp::destroy(msg_);
00230
00231 assert(all);
00232
00233 all->reset(true);
00234
00235 _manager.return_object(all);
00236
00237 return true;
00238 }
00239
00240 msg_cpp*
00241 msg_communicator::copy_msg(msg_cpp* msg_)
00242 {
00243
00244 size_t size = msg_->get_size();
00245
00246 msg_cpp* msg = construct_msg(size);
00247
00248
00249 msg->msgid = msg_->msgid;
00250 msg->majver = msg_->majver;
00251 msg->minver = msg_->minver;
00252 msg->priority = msg_->priority;
00253 msg->timeout = msg_->timeout;
00254
00255
00256 msg->_type = msg_->_type;
00257 msg->_timestamp = msg_->_timestamp;
00258 msg->_sender = msg_->_sender;
00259 msg->_receiver = msg_->_receiver;
00260 msg->_sessionid = msg_->_sessionid;
00261
00262
00263
00264 if (size)
00265 memcpy(msg->get_body(), msg_->get_body(), size);
00266
00267
00268 return msg;
00269 }
00270
00271 bool
00272 msg_communicator::resize_msg(msg_cpp* msg_, size_t size)
00273 {
00274
00275 return msg_->resize(size);
00276 }
00277
00278
00279 void
00280 msg_communicator::get_msg_key(room_byte_t& crypt_private, const room_byte_t& crypt_external, const guid_t& session) const
00281 {
00282
00283 integer first(crypt_private, (ub4_t)crypt_private.size());
00284 integer second(crypt_external, (ub4_t)crypt_external.size());
00285 integer third((const ub1_t*)&session, sizeof(guid_t));
00286 integer result;
00287
00288
00289 result += first;
00290 result += second;
00291 result += third;
00292
00293 crypt_private.reserve((ub4_t)result.min_encoded_size());
00294 result.encode(crypt_private, (ub4_t)crypt_private.size());
00295 }
00296
00297 msg_connection*
00298 msg_communicator::_connect(const conf_connection& info_, ub8_t timeout)
00299 {
00300
00301 switch (info_._type)
00302 {
00303 #if OS_TYPE == OS_WIN32
00304 case rpc:
00305
00306 return msg_rpc_connection::connect(_communicator, info_);
00307 #endif
00308 case sock:
00309
00310 return msg_sock_connection::connect(_communicator, info_);
00311 default:
00312 assert(false);
00313 }
00314
00315 return 0;
00316 }
00317
00318 msg_connection*
00319 msg_communicator::find_connection(const guid_t& addr_)
00320 {
00321
00322 mutex_keeper keeper(_mtx_conn);
00323
00324 connection_map_t::iterator iter = _connections.find(addr_);
00325
00326 return iter != _connections.end() ? *iter : 0;
00327 }
00328
00329 msg_listener*
00330 msg_communicator::find_listener(transport_type type_)
00331 {
00332
00333 mutex_keeper keeper(_mtx_listeners);
00334 for (listener_list_t::iterator iter = _listeners.begin(); iter != _listeners.end(); ++iter)
00335 if ((*iter)->get_type() == type_)
00336 return *iter;
00337
00338 return 0;
00339 }
00340
00341 void
00342 msg_communicator::init(const char* info_, const char* ini_key_)
00343 {
00344 xml_parser_creator nav_creator;
00345 xml_designer_keeper_t nav_keeper(nav_creator, 0);
00346
00347
00348 size_t ini_key_len = ini_key_ ? strlen(ini_key_) : 0;
00349 #ifndef MSG_PRODUCTION
00350 ini_key_len = 0;
00351 #endif
00352
00353 if (ini_key_len)
00354 {
00355 room_byte_t ini_key(ini_key_len);
00356 ini_key.copy((const ub1_t*)ini_key_, ini_key_len);
00357
00358
00359 size_t buf_len = 0;
00360 room_byte_t buf(buf_len);
00361
00362
00363 if (!nav_keeper->load(buf, buf.size(), (const void*)msg_xml_dtd, strlen(msg_xml_dtd)))
00364 exception::_throw(nav_keeper->error());
00365 }
00366 else
00367 {
00368
00369 if (!nav_keeper->load(info_, (const void*)msg_xml_dtd, strlen(msg_xml_dtd)))
00370 exception::_throw(nav_keeper->error());
00371 }
00372
00373 nav_keeper->select_root();
00374 if (!nav_keeper->validate(true))
00375 exception::_throw(nav_keeper->error());
00376
00377 init((xml_designer*)nav_keeper);
00378 }
00379
00380 void
00381 msg_communicator::init(const void* info_, size_t len)
00382 {
00383 xml_parser_creator nav_creator;
00384 xml_designer_keeper_t nav_keeper(nav_creator, 0);
00385
00386
00387
00388 if (!nav_keeper->load(info_, len, (const void*)msg_xml_dtd, strlen(msg_xml_dtd)))
00389 exception::_throw(nav_keeper->error());
00390
00391 nav_keeper->select_root();
00392 if (!nav_keeper->validate(true))
00393 exception::_throw(nav_keeper->error());
00394
00395 init((xml_designer*)nav_keeper);
00396 }
00397
00398
00399 void
00400 msg_communicator::init(xml_designer* nav)
00401 {
00402 if (is_block())
00403 exception::_throw("Communicator has been initialized");
00404
00405
00406 if (!nav->select_root())
00407 exception::_throw("Invalid XML structure");
00408
00409 if (!nav->has_attributes())
00410 exception::_throw("Invalid XML structure");
00411
00412
00413 nav->select_attribute_by_name("address");
00414 if (!string_to_guid(_address, nav->get_value()))
00415 exception::_throw("Invalid address attribute format");
00416
00417
00418 nav->select_parent();
00419
00420
00421
00422 if (nav->has_children())
00423 {
00424 nav->select_first_child();
00425
00426 do
00427 {
00428
00429 if (nav->get_type() != ELEMENT_NODE) continue;
00430
00431 if (!strcmp(nav->get_name(), "listeners"))
00432 {
00433
00434 if (nav->has_children())
00435 {
00436 nav->select_first_child();
00437
00438 do
00439 {
00440 if (nav->get_type() != ELEMENT_NODE) continue;
00441
00442 conf_listener atom;
00443 atom._address = _address;
00444 parse_listener(nav, atom);
00445 add_listener_config(atom, false);
00446 }
00447 while (nav->select_next_sibling());
00448
00449 nav->select_parent();
00450 }
00451 }
00452 else
00453 {
00454
00455 if (nav->has_children())
00456 {
00457 nav->select_first_child();
00458 do
00459 {
00460 if (nav->get_type() != ELEMENT_NODE) continue;
00461
00462
00463 conf_connection atom;
00464 parse_connection(nav, atom);
00465 add_connection_config(atom);
00466 }
00467 while (nav->select_next_sibling());
00468
00469 nav->select_parent();
00470 }
00471 }
00472 }
00473 while (nav->select_next_sibling());
00474 }
00475
00476
00477 _create_listeners();
00478
00479
00480 v_on();
00481 }
00482
00483 void
00484 msg_communicator::add_connection_config(const conf_connection& atom)
00485 {
00486 mutex_keeper keeper(_mtx_config);
00487
00488 for (msg_connection_list_t::const_iterator iter = _config._connections.begin(); iter != _config._connections.end(); ++iter)
00489 if (iter->_address == atom._address)
00490 exception::_throw("Dublicate connection address");
00491
00492
00493 _config._connections.push_back(atom);
00494 }
00495
00496 void
00497 msg_communicator::add_listener_config(const conf_listener& atom, bool start)
00498 {
00499 mutex_keeper keeper(_mtx_config);
00500
00501 for (msg_listener_list_t::const_iterator iter = _config._listeners.begin(); iter != _config._listeners.end(); ++iter)
00502 if (iter->_type == atom._type)
00503 exception::_throw("Dublicate listener type");
00504
00505
00506 if (start)
00507 _create_listener(atom);
00508
00509 _config._listeners.push_back(atom);
00510 }
00511
00512 void
00513 msg_communicator::remove_connection_config(const guid_t& address)
00514 {
00515 mutex_keeper keeper(_mtx_config);
00516
00517 for (msg_connection_list_t::iterator iter = _config._connections.begin(); iter != _config._connections.end(); ++iter)
00518 if (iter->_address == address)
00519 {
00520 _config._connections.erase(iter);
00521
00522 keeper.unlock();
00523
00524
00525 msg_connection* conn = find_connection(address);
00526 if (conn)
00527 shutdown_connection(conn);
00528
00529 break;
00530 }
00531 }
00532
00533 void
00534 msg_communicator::remove_listener_config(transport_type type)
00535 {
00536 mutex_keeper keeper(_mtx_config);
00537
00538 for (msg_listener_list_t::iterator iter = _config._listeners.begin(); iter != _config._listeners.end(); ++iter)
00539 if (iter->_type == type)
00540 {
00541 _config._listeners.erase(iter);
00542
00543 mutex_keeper keeper(_mtx_listeners);
00544 for (listener_list_t::iterator iter_listener = _listeners.begin(); iter_listener != _listeners.end(); ++iter_listener)
00545 {
00546 if ((*iter_listener)->get_type() == type)
00547 {
00548 msg_listener* obj = *iter_listener;
00549 _listeners.erase(iter_listener);
00550
00551 keeper.unlock();
00552
00553 delete obj;
00554 break;
00555 }
00556 }
00557
00558 break;
00559 }
00560 }
00561
00562
00563 void
00564 msg_communicator::uninit()
00565 {
00566
00567 if (is_block() || !is_on())
00568 return;
00569
00570 v_off();
00571
00572
00573 _config.clear();
00574
00575
00576 _destroy_listeners();
00577
00578
00579 _destroy_connections();
00580 }
00581
00582
00583 void
00584 msg_communicator::v_on()
00585 {
00586 if (is_on())
00587 return;
00588
00589 _thread_manager.log_on(this);
00590
00591 _register_this();
00592
00593 _thread_manager.on();
00594
00595 _aio_port.log_on(this);
00596 _aio_port.on();
00597
00598
00599 msg_queue_processor::v_on();
00600
00601
00602 unblock();
00603
00604
00605 _turn_on_listeners();
00606
00607
00608 _turn_on_connections();
00609
00610
00611 _ping_timer.activate(this, 0, 1000);
00612
00613 format_logging(0, __FILE__, __LINE__, en_log_info, "msg communicator started");
00614 }
00615
00616
00617 void
00618 msg_communicator::v_off()
00619 {
00620 if (!is_on())
00621 return;
00622
00623
00624 _ping_timer.deactivate();
00625
00626
00627 _turn_off_listeners();
00628
00629
00630 _turn_off_connections();
00631
00632
00633 block();
00634
00635
00636 msg_cpp* msg = 0;
00637 while (pop(msg))
00638 destroy_msg(msg);
00639
00640
00641 msg_queue_processor::v_off();
00642
00643 _aio_port.off();
00644 _aio_port.log_on(0);
00645
00646
00647 _thread_manager.off();
00648 _thread_manager.log_on(0);
00649
00650 format_logging(0, __FILE__, __LINE__, en_log_info, "msg communicator stooped");
00651 }
00652
00653 void
00654 msg_communicator::_turn_off_listeners()
00655 {
00656
00657 mutex_keeper keeper(_mtx_listeners);
00658 for (listener_list_t::iterator iter = _listeners.begin(); iter != _listeners.end(); ++iter)
00659 {
00660 (*iter)->off();
00661 (*iter)->log_on(0);
00662 }
00663 }
00664
00665 void
00666 msg_communicator::_turn_on_listeners()
00667 {
00668
00669 mutex_keeper keeper(_mtx_listeners);
00670 for (listener_list_t::iterator iter = _listeners.begin(); iter != _listeners.end(); ++iter)
00671 {
00672 (*iter)->log_on(this);
00673 (*iter)->on();
00674 }
00675 }
00676
00677 void
00678 msg_communicator::_destroy_listeners()
00679 {
00680
00681 mutex_keeper keeper(_mtx_listeners);
00682 for (listener_list_t::iterator iter = _listeners.begin(); iter != _listeners.end(); ++iter)
00683 delete *iter;
00684
00685 _listeners.clear();
00686 }
00687
00688 void
00689 msg_communicator::_create_listeners()
00690 {
00691
00692 mutex_keeper keeper(_mtx_config);
00693 for (msg_listener_list_t::const_iterator iter = _config._listeners.begin(); iter != _config._listeners.end(); ++iter)
00694 _create_listener(*iter);
00695 }
00696
00697 void
00698 msg_communicator::_create_listener(const conf_listener& atom)
00699 {
00700 mutex_keeper keeper(_mtx_listeners);
00701 switch (atom._type)
00702 {
00703 case sock:
00704
00705 _listeners.push_back(new msg_sock_listener(_communicator, atom));
00706 break;
00707 #if OS_TYPE == OS_WIN32
00708 case rpc:
00709
00710 _listeners.push_back(new msg_rpc_listener(_communicator, atom));
00711 break;
00712 #endif
00713 default:
00714 assert(false);
00715 }
00716 }
00717
00718 void
00719 msg_communicator::_turn_off_connections()
00720 {
00721
00722 mutex_keeper keeper(_mtx_conn);
00723
00724 for (connection_map_t::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
00725 {
00726 (*iter)->off();
00727 }
00728 }
00729
00730 void
00731 msg_communicator::_turn_on_connections()
00732 {
00733
00734 mutex_keeper keeper(_mtx_conn);
00735
00736 for (connection_map_t::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
00737 {
00738 (*iter)->on();
00739 }
00740 }
00741
00742 void
00743 msg_communicator::_destroy_connections()
00744 {
00745
00746 mutex_keeper keeper(_mtx_conn);
00747 for (connection_map_t::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
00748 delete *iter;
00749
00750 _connections.clear();
00751 }
00752
00753
00754 void
00755 msg_communicator::notify(size_t ident, size_t interval, size_t multiplier)
00756 {
00757
00758 mutex_keeper keeper(_mtx_conn);
00759 for (connection_map_t::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
00760 if (!(*iter)->is_block() && (*iter)->is_last_activity_timeout())
00761 (*iter)->ping_notify();
00762 }
00763
00764
00765 void
00766 msg_communicator::wakeup()
00767 {
00768 _thread_manager.borrow_thread(0, 0, this, stay_on_alert_time);
00769 }
00770
00771 bool
00772 msg_communicator::validate_connection(const msg_connection* connection_) const
00773 {
00774
00775 mutex_keeper keeper(_mtx_conn);
00776 connection_map_t::const_iterator first = _connections.begin();
00777 connection_map_t::const_iterator last = _connections.end();
00778
00779 for (; first != last; ++first)
00780 if (connection_ == *first)
00781 return true;
00782
00783 return false;
00784 }
00785
00786 void
00787 msg_communicator::add_connection(msg_connection* connection_)
00788 {
00789
00790 mutex_keeper keeper(_mtx_conn);
00791
00792 if (_connections.end() != _connections.find(connection_->get_info()._address))
00793 {
00794 exception::_throw(MSG_RESULT_ACCESS_DENIED, &msgMsgTable);
00795 }
00796
00797
00798
00799 _connections.insert(connection_->get_info()._address, connection_);
00800
00801 connection_->on();
00802 }
00803
00804 void
00805 msg_communicator::change_connection_address(const guid_t& old_address, msg_connection* connection_)
00806 {
00807
00808 mutex_keeper keeper(_mtx_conn);
00809
00810 connection_map_t::iterator iter = _connections.find(old_address);
00811 assert(iter != _connections.end());
00812 _connections.erase(iter);
00813
00814 if (_connections.end() != _connections.find(connection_->get_info()._address))
00815 {
00816 delete connection_;
00817 exception::_throw(MSG_RESULT_ACCESS_DENIED, &msgMsgTable);
00818 }
00819
00820
00821 _connections.insert(connection_->get_info()._address, connection_);
00822 }
00823
00824 void
00825 msg_communicator::shutdown_connection(msg_connection* connection_)
00826 {
00827 if (connection_->block())
00828 {
00829 format_logging(0, __FILE__, __LINE__, en_log_info, "shutdown conection: %x", connection_);
00830
00831
00832 connection_->off();
00833
00834 _thread_manager.revoke_client(connection_);
00835
00836 msg_creator creator(this);
00837 msg_pointer_t msg_(creator);
00838
00839 try
00840 {
00841
00842 msg_ = construct_msg(0);
00843 format_logging(0, __FILE__, __LINE__, en_log_info, "contruct message");
00844
00845 msg_->_type = system_type;
00846 msg_->msgid = msg_id_shutdown;
00847 msg_->_receiver = _address;
00848 msg_->_sender = connection_->get_info()._address;
00849 msg_->priority = MSG_PRIORITY_SYSTEM;
00850 push(msg_);
00851 msg_.detach();
00852 }
00853 catch (exception&)
00854 {
00855
00856
00857 }
00858 }
00859 }
00860
00861 void
00862 msg_communicator::_close_connection(const guid_t& addr)
00863 {
00864
00865 mutex_keeper keeper(_mtx_conn);
00866 connection_map_t::iterator iter = _connections.find(addr);
00867 if (iter != _connections.end())
00868 {
00869
00870 delete (*iter);
00871
00872 _connections.erase(iter);
00873 }
00874 }
00875
00876 void
00877 msg_communicator::comm_msg(msg_cpp* msg_)
00878 {
00879 if (is_block())
00880 exception::_throw("Communicator is blocked");
00881
00882
00883 if (msg_->_type == system_type && msg_->msgid == msg_id_ping)
00884 {
00885
00886 destroy_msg(msg_);
00887 return;
00888 }
00889
00890
00891
00892 msg_->_sessionid = null_uuid;
00893
00894 push(msg_);
00895 }
00896
00897
00898 void
00899 msg_communicator::v_do_job(size_t ident, void* user_data)
00900 {
00901 msg_cpp* msg = 0;
00902 if (!pop(msg))
00903 {
00904 assert(false);
00905 return;
00906 }
00907
00908 msg_creator creator(this);
00909 msg_pointer_t msg_(creator);
00910
00911 msg_ = msg;
00912
00913 try
00914 {
00915
00916
00917 if (msg_->msgid == msg_id_shutdown && msg_->_type == system_type)
00918 {
00919 _close_connection(msg_->_sender);
00920 return;
00921 }
00922
00923
00924 mutex_keeper keeper(_mtx_conn);
00925
00926 msg_connection* connection = 0;
00927 connection_map_t::iterator iter = _connections.find(msg_->_receiver);
00928 if (iter == _connections.end())
00929 {
00930 keeper.unlock();
00931
00932
00933 mutex_keeper guard(_mtx_config);
00934 msg_connection_list_t::const_iterator conf_iter = _config._connections.begin();
00935
00936 for (; conf_iter != _config._connections.end(); ++conf_iter)
00937 {
00938 if (conf_iter->_address == msg_->_receiver)
00939 break;
00940 }
00941
00942 if (conf_iter != _config._connections.end())
00943 {
00944 conf_connection cinfo(*conf_iter);
00945
00946 guard.unlock();
00947
00948
00949 connection = _connect(cinfo, __max(handshake_default_timeout, msg_->timeout));
00950 }
00951
00952 keeper.lock();
00953 }
00954 else
00955 connection = *iter;
00956
00957 if (!connection)
00958 exception::_throw(MSG_RESULT_UNKNOWN_DESTINATION, &msgMsgTable);
00959
00960
00961 connection->push_msg(msg_);
00962 msg_.detach();
00963 }
00964 catch (exception& x)
00965 {
00966 if (msg_->_type == user_type_send || msg_->_type == user_type_send_async)
00967 {
00968
00969 try
00970 {
00971 msg_pointer_t err_(creator, 0);
00972 msg_pack::make_reply_msg(msg_, err_);
00973 msg_pack::make_error_msg(err_, x.what());
00974
00975 push(err_);
00976 err_.detach();
00977 }
00978 catch (exception&)
00979 {
00980 }
00981 }
00982 }
00983 catch (...)
00984 {
00985 assert(false);
00986 }
00987 }
00988
00989 msg_cpp*
00990 msg_communicator::_pack_keys(const integer& n, const integer& e)
00991 {
00992 ub4_t l_n = n.min_encoded_size();
00993 ub4_t l_e = e.min_encoded_size();
00994
00995 msg_cpp* msg = construct_msg(l_n + l_e + 2 * sizeof(ub4_t));
00996
00997 ub1_t* buf = msg->get_body();
00998
00999
01000 *(ub4_t*)buf = htonl(l_n);
01001
01002 buf += sizeof(ub4_t);
01003
01004 n.encode(buf, l_n);
01005
01006 buf += l_n;
01007
01008 *(ub4_t*)buf = htonl(l_e);
01009
01010 buf += sizeof(ub4_t);
01011
01012 e.encode(buf, l_e);
01013
01014 return msg;
01015 }
01016
01017 void
01018 msg_communicator::_extract_keys(const msg_cpp* msg, integer& n, integer& e)
01019 {
01020
01021 const ub1_t* buf = msg->get_body();
01022 size_t size = msg->get_size();
01023
01024
01025 if (size < 2 * (sizeof(ub4_t) + 1))
01026 exception::_throw(MSG_RESULT_ACCESS_DENIED, &msgMsgTable);
01027
01028
01029
01030 ub4_t l_n = ntohl(*(ub4_t*)buf);
01031
01032 if (size < l_n + 2 * sizeof(ub4_t) + 1)
01033 exception::_throw(MSG_RESULT_ACCESS_DENIED, &msgMsgTable);
01034
01035
01036 buf += sizeof(ub4_t);
01037
01038 n.decode(buf, l_n);
01039
01040 buf += l_n;
01041
01042
01043 ub4_t l_e = ntohl(*(ub4_t*)buf);
01044
01045 if (size != l_e + l_n + 2 * sizeof(ub4_t))
01046 exception::_throw(MSG_RESULT_ACCESS_DENIED, &msgMsgTable);
01047
01048
01049 buf += sizeof(ub4_t);
01050
01051 e.decode(buf, l_e);
01052 }
01053
01054 msg_cpp*
01055 msg_communicator::construct_handshake(const rsa* rsa)
01056 {
01057 msg_cpp* msg = rsa ? _pack_keys(rsa->get_n(), rsa->get_e()) : construct_msg(0);
01058
01059 msg->priority = MSG_PRIORITY_SYSTEM;
01060 msg->_type = handshake_type;
01061 msg->msgid = msg_id_handshake_request;
01062 msg->timeout = handshake_default_timeout;
01063 msg->_sessionid = uuid_gen();
01064 msg->_sender = _address;
01065 return msg;
01066 }
01067
01068 msg_cpp*
01069 msg_communicator::reply_handshake(const msg_cpp* msg_, room_byte_t* symetric_private_key_)
01070 {
01071 if (msg_->_type != handshake_type
01072 || msg_->msgid != msg_id_handshake_request
01073 || msg_->_receiver != _address)
01074 exception::_throw(MSG_RESULT_ACCESS_DENIED, &msgMsgTable);
01075
01076 msg_cpp* reply = 0;
01077
01078 if (symetric_private_key_)
01079 {
01080 integer n, e;
01081 _extract_keys(msg_, n, e);
01082 rsa rsa(e, n);
01083 reply = _generate_crypt_private_key(rsa, *symetric_private_key_);
01084 }
01085 else
01086 {
01087 reply = construct_msg(0);
01088 }
01089
01090
01091 reply->_sender = _address;
01092
01093 reply->_receiver = msg_->_sender;
01094 reply->priority = MSG_PRIORITY_SYSTEM;
01095 reply->_type = handshake_type;
01096 reply->msgid = msg_id_handshake_reply;
01097 reply->timeout = handshake_default_timeout;
01098 reply->_sessionid = msg_->_sessionid;
01099
01100 return reply;
01101 }
01102
01103 void
01104 msg_communicator::check_handshake(const guid_t& sessionid, const msg_cpp* reply_, const rsa* rsa, room_byte_t& symetric_private_key_)
01105 {
01106 if (reply_->_type != handshake_type
01107 || reply_->msgid != msg_id_handshake_reply
01108
01109 || reply_->_receiver != _address
01110 || reply_->_sessionid != sessionid)
01111 exception::_throw(MSG_RESULT_ACCESS_DENIED, &msgMsgTable);
01112
01113 if (rsa)
01114 _decrypt_private_key(reply_, *rsa, symetric_private_key_);
01115 }
01116
01117 msg_cpp*
01118 msg_communicator::_generate_crypt_private_key(const rsa& rsa, room_byte_t& symetric_private_key_)
01119 {
01120
01121 random_generator rng;
01122 integer in_key(rng, rsa_key_size / 16);
01123 symetric_private_key_.reserve(in_key.min_encoded_size());
01124 in_key.encode(symetric_private_key_, in_key.min_encoded_size());
01125
01126 integer out_key;
01127 rsa.encode(in_key, out_key);
01128 ub4_t l_key = out_key.min_encoded_size();
01129
01130 msg_cpp* msg = construct_msg(l_key + sizeof(ub4_t));
01131
01132 ub1_t* buf = msg->get_body();
01133 *(ub4_t*)buf = htonl(l_key);
01134 out_key.encode(buf + sizeof(ub4_t), l_key);
01135
01136 return msg;
01137 }
01138
01139 void
01140 msg_communicator::_decrypt_private_key(const msg_cpp* reply_, const rsa& rsa, room_byte_t& symetric_private_key_)
01141 {
01142 const ub1_t* buf_reply = reply_->get_body();
01143 size_t size = reply_->get_size();
01144
01145 if (size < sizeof(ub4_t) + 1)
01146 exception::_throw(MSG_RESULT_ACCESS_DENIED, &msgMsgTable);
01147
01148 ub4_t l_key = ntohl(*(ub4_t*)buf_reply);
01149
01150 if (size != l_key + sizeof(ub4_t))
01151 exception::_throw(MSG_RESULT_ACCESS_DENIED, &msgMsgTable);
01152
01153 buf_reply += sizeof(ub4_t);
01154
01155 integer in_key, out_key;
01156 in_key.decode(buf_reply, l_key);
01157 rsa.decode(in_key, out_key);
01158 symetric_private_key_.reserve(out_key.min_encoded_size());
01159 out_key.encode(symetric_private_key_, out_key.min_encoded_size());
01160 }
01161
01162 msg_cpp*
01163 msg_communicator::construct_ping()
01164 {
01165 msg_cpp* msg = construct_msg(0);
01166
01167 msg->_type = system_type;
01168 msg->msgid = msg_id_ping;
01169 msg->_sender = _address;
01170 return msg;
01171 }
01172
01173
01174 aiosock&
01175 msg_communicator::get_aiosock()
01176 {
01177 return _aio_port;
01178 }
01179
01180
01181 threadpool&
01182 msg_communicator::get_thread_manager()
01183 {
01184 return _thread_manager;
01185 }
01186
01187
01188 event_pool_t&
01189 msg_communicator::get_event_pool()
01190 {
01191 return _event_pool;
01192 }
01193
01194 void
01195 msg_communicator::doxray()
01196 {
01197 mutex_keeper lguard(_mtx_listeners);
01198 size_t listeners = _listeners.size();
01199 lguard.unlock();
01200
01201 mutex_keeper cguard(_mtx_conn);
01202 size_t connections = _connections.size();
01203 cguard.unlock();
01204
01205 size_t free_all = 0, busy_all = 0;
01206 _manager.get_stats(free_all, busy_all);
01207 size_t free_ev = 0, busy_ev = 0;
01208 _event_pool.get_stats(free_ev, busy_ev);
01209
01210 format_logging(0, __FILE__, __LINE__, en_log_xray, "<aiomsg listeners=\"%d\" connections = \"%d\" free_all = \"%d\" busy_all = \"%d\" free_ev = \"%d\" busy_ev = \"%d\" />",
01211 listeners, connections, free_all, busy_all, free_ev, busy_ev);
01212
01213 _thread_manager.doxray();
01214 _aio_port.doxray();
01215 }
01216
01217 void
01218 msg_communicator::log_msg(const msg_cpp* msg)
01219 {
01220 date timestamp(msg->_timestamp);
01221 char sbuf[sizeof(guid_t)*2 + 1];
01222 char rbuf[sizeof(guid_t)*2 + 1];
01223 char tbuf[25];
01224
01225 #if OS_TYPE == OS_WIN32
01226 format_logging(0, __FILE__, __LINE__, en_log_info, "msgid=%d majver=%d minver=%d sender=%s receiver=%s timeout=%I64u, size=%u timestamp=%s",
01227 #else
01228 format_logging(0, __FILE__, __LINE__, en_log_info, "msgid=%d majver=%d minver=%d sender=%s receiver=%s timeout=%llu, size=%u timestamp=%s",
01229 #endif
01230
01231 msg->msgid, msg->majver, msg->minver,
01232 guid_to_string(sbuf, msg->_sender),
01233 guid_to_string(rbuf, msg->_receiver),
01234 msg->timeout,
01235 msg->get_size(),
01236 timestamp.get_date(tbuf)
01237 );
01238 }
01239
01241
01242 void
01243 msg_communicator::parse_connection(xml_designer* nav, conf_connection& atom)
01244 {
01245
01246
01247
01248
01249
01250
01251
01252
01253
01254
01255
01256
01257
01258 nav->select_attribute_by_name("type");
01259 if (!str_template::strcmp(nav->get_value(), "rpc", os_minus_one))
01260 atom._type = rpc;
01261 else if (!str_template::strcmp(nav->get_value(), "sock", os_minus_one))
01262 atom._type = sock;
01263
01264
01265
01266 nav->select_parent();
01267
01268 nav->select_attribute_by_name("address");
01269 if (!string_to_guid(atom._address, nav->get_value()))
01270 exception::_throw("Invalid parent attribute format");
01271 nav->select_parent();
01272
01273 if (nav->select_attribute_by_name("port"))
01274 {
01275 str_template::strscan(nav->get_value(), 32, "%hu", &atom._port);
01276 nav->select_parent();
01277 }
01278 else
01279 atom._port = 3337;
01280
01281 if (nav->select_attribute_by_name("network"))
01282 {
01283 atom._network = nav->get_value();
01284 nav->select_parent();
01285 }
01286 else
01287 atom._network = "localhost";
01288
01289 if (nav->select_attribute_by_name("ping"))
01290 {
01291 str_template::strscan(nav->get_value(), 32, "%u", &atom._ping);
01292 nav->select_parent();
01293 }
01294 else
01295 atom._ping = 1000;
01296
01297
01298 if (nav->select_attribute_by_name("info"))
01299 {
01300 atom._info = nav->get_value();
01301 nav->select_parent();
01302 }
01303
01304 if (nav->select_attribute_by_name("security"))
01305 {
01306 atom._support_crypt = strcmp(nav->get_value(), "1") == 0;
01307 nav->select_parent();
01308
01309 if (atom._support_crypt && nav->select_attribute_by_name("password"))
01310 {
01311 const char* password = nav->get_value();
01312 size_t len = password ? strlen(password) : 0;
01313 if (len)
01314 atom._crypt_external.reserve(len).copy((const ub1_t*)password, len);
01315
01316 nav->select_parent();
01317 }
01318 }
01319 else
01320 atom._support_crypt = false;
01321
01322 #ifndef MSG_PRODUCTION
01323 atom._support_crypt = false;
01324 #endif
01325 }
01326
01327
01328
01329 void
01330 msg_communicator::parse_listener(xml_designer* nav, conf_listener& atom)
01331 {
01332
01333
01334
01335
01336
01337
01338
01339
01340
01341
01342
01343 nav->select_attribute_by_name("type");
01344 if (!str_template::strcmp(nav->get_value(), "rpc", os_minus_one))
01345 atom._type = rpc;
01346 else if (!str_template::strcmp(nav->get_value(), "sock", os_minus_one))
01347 atom._type = sock;
01348
01349
01350
01351 nav->select_parent();
01352
01353 if (nav->select_attribute_by_name("port"))
01354 {
01355 str_template::strscan(nav->get_value(), 32, "%hu", &atom._port);
01356 nav->select_parent();
01357 }
01358 else
01359 atom._port = 0;
01360
01361 if (nav->select_attribute_by_name("network"))
01362 {
01363 atom._network = nav->get_value();
01364 nav->select_parent();
01365 }
01366 else
01367 atom._network = "localhost";
01368
01369 if (nav->select_attribute_by_name("connections"))
01370 {
01371 str_template::strscan(nav->get_value(), 32, "%u", &atom._connections);
01372 if (!atom._connections)
01373 atom._connections = 1;
01374 nav->select_parent();
01375 }
01376
01377 if (nav->select_attribute_by_name("ping"))
01378 {
01379 str_template::strscan(nav->get_value(), 32, "%u", &atom._ping);
01380 nav->select_parent();
01381 }
01382 else
01383 atom._ping = 1000;
01384
01385 if (nav->select_attribute_by_name("info"))
01386 {
01387 atom._info = nav->get_value();
01388 nav->select_parent();
01389 }
01390
01391 if (nav->select_attribute_by_name("security"))
01392 {
01393 atom._support_crypt = strcmp(nav->get_value(), "1") == 0;
01394 nav->select_parent();
01395
01396 if (atom._support_crypt && nav->select_attribute_by_name("password"))
01397 {
01398 const char* password = nav->get_value();
01399 size_t len = password ? strlen(password) : 0;
01400 if (len)
01401 atom._crypt_accept.reserve(len).copy((const ub1_t*)password, len);
01402
01403 nav->select_parent();
01404 }
01405 }
01406 else
01407 atom._support_crypt = false;
01408
01409
01410
01411
01412 if (nav->has_children())
01413 {
01414 nav->select_first_child();
01415 do
01416 {
01417
01418 if (nav->get_type() != ELEMENT_NODE) continue;
01419 bool accept = strcmp(nav->get_name(), "accept") == 0;
01420
01421
01422 if (nav->select_first_child())
01423 {
01424 do
01425 {
01426 if (nav->get_type() != ELEMENT_NODE) continue;
01427
01428
01429
01430 conf_peer peer;
01431
01432 nav->select_attribute_by_name("address");
01433 if (!string_to_guid(peer._address, nav->get_value()))
01434 exception::_throw("Invalid peer address format");
01435 nav->select_parent();
01436
01437 if (nav->select_attribute_by_name("security"))
01438 {
01439 peer._support_crypt = strcmp(nav->get_value(), "1") == 0;
01440 nav->select_parent();
01441
01442 if (peer._support_crypt && nav->select_attribute_by_name("password"))
01443 {
01444 const char* password = nav->get_value();
01445 size_t len = password ? strlen(password) : 0;
01446 if (len)
01447 peer._crypt.reserve(len).copy((const ub1_t*)password, len);
01448
01449 nav->select_parent();
01450 }
01451 }
01452 else if (accept)
01453 {
01454 peer._support_crypt = atom._support_crypt;
01455 peer._crypt = atom._crypt_accept;
01456 }
01457
01458 #ifndef MSG_PRODUCTION
01459 peer._support_crypt = false;
01460 #endif
01461 accept ? atom._accept.push_back(peer) : atom._reject.push_back(peer);
01462
01463 } while (nav->select_next_sibling());
01464
01465
01466 nav->select_parent();
01467 }
01468 } while (nav->select_next_sibling());
01469
01470 nav->select_parent();
01471 }
01472
01473
01474 #ifndef MSG_PRODUCTION
01475 atom._support_crypt = false;
01476 #endif
01477 }
01478
01479 #pragma pack()
01480 END_TERIMBER_NAMESPACE