00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "aiomsg/msgimpl.h"
00029 #include "base/except.h"
00030 #include "base/string.hpp"
00031 #include "base/list.hpp"
00032 #include "base/map.hpp"
00033 #include "base/memory.hpp"
00034 #include "base/template.hpp"
00035 #include "base/number.hpp"
00036 #include "base/common.hpp"
00037
00038 #include "xml/xmlimpl.hpp"
00039 #include "xml/sxs.hpp"
00040
00041 #include "aiomsg/msg_user.h"
00042
00043 const char* msg_connection_dtd = \
00044 "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \
00045 <!ENTITY % kind \"(rpc | sock | http | p2p)\"> \
00046 <!ELEMENT connection EMPTY> \
00047 <!ATTLIST connection \
00048 type %kind; #REQUIRED \
00049 address CTYPE vt_guid #REQUIRED \
00050 port CTYPE vt_ub2 #IMPLIED \
00051 network CDATA #IMPLIED \
00052 ping CTYPE vt_ub4 #IMPLIED \
00053 info CDATA #IMPLIED \
00054 security CTYPE vt_bool #IMPLIED \
00055 password CDATA #IMPLIED >";
00056
00057 const char* msg_listener_dtd = \
00058 "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \
00059 <!ELEMENT listener (accept | reject)?> \
00060 <!ELEMENT reject (peer)+> \
00061 <!ELEMENT accept (peer)+> \
00062 <!ELEMENT peer EMPTY> \
00063 <!ATTLIST peer \
00064 address CTYPE vt_guid #REQUIRED \
00065 security CTYPE vt_bool #IMPLIED \
00066 password CDATA #IMPLIED > \
00067 <!ATTLIST listener \
00068 port CTYPE vt_ub2 #IMPLIED \
00069 network CDATA #IMPLIED \
00070 connections CTYPE vt_ub4 #IMPLIED \
00071 ping CTYPE vt_ub4 #IMPLIED \
00072 info CDATA #IMPLIED \
00073 security CTYPE vt_bool #IMPLIED \
00074 password CDATA #IMPLIED >";
00075
00076 aiomsgfactory::aiomsgfactory()
00077 {
00078 }
00079
00080
00081 aiomsgfactory::~aiomsgfactory()
00082 {
00083 }
00084
00085 terimber_aiomsg*
00086 aiomsgfactory::get_aiomsg(terimber_log* log)
00087 {
00088 terimber::aiomsg* obj = new terimber::aiomsg();
00089 if (obj)
00090 obj->log_on(log);
00091
00092 return obj;
00093 }
00094
00095
00096
00097 BEGIN_TERIMBER_NAMESPACE
00098 #pragma pack(4)
00099
00101 aiomsg::aiomsg() : _user_conn(0)
00102 {
00103 }
00104
00105
00106 aiomsg::~aiomsg()
00107 {
00108 stop();
00109 }
00110
00111 bool
00112 aiomsg::init(const char* info, const char* ini_key)
00113 {
00114 try
00115 {
00116 _communicator.init(info, ini_key);
00117 }
00118 catch (terimber::exception& x)
00119 {
00120 mutex_keeper keeper(_mtx);
00121 _error = x.what();
00122 return false;
00123 }
00124
00125 return true;
00126 }
00127
00128 bool
00129 aiomsg::init(const void* buffer, size_t len)
00130 {
00131
00132
00133 try
00134 {
00135
00136 _communicator.log_on(this);
00137 _communicator.init(buffer, len);
00138 }
00139 catch (terimber::exception& x)
00140 {
00141 mutex_keeper keeper(_mtx);
00142 _error = x.what();
00143 return false;
00144 }
00145
00146 return true;
00147 }
00148
00149 bool
00150 aiomsg::uninit()
00151 {
00152 try
00153 {
00154 stop();
00155 _communicator.uninit();
00156 _communicator.log_on(0);
00157 }
00158 catch (terimber::exception& x)
00159 {
00160 mutex_keeper keeper(_mtx);
00161 _error = x.what();
00162 return false;
00163 }
00164
00165 return true;
00166 }
00167
00168 bool
00169 aiomsg::start(msg_callback_notify* callback, size_t additional_thread_count)
00170 {
00171
00172 conf_connection atom;
00173 atom._ping = 1000;
00174 atom._address = _communicator.get_address();
00175 try
00176 {
00177 check_on();
00178 stop();
00179 _user_conn = msg_user_connection::connect(&_communicator, callback, atom, additional_thread_count);
00180 _user_conn->on();
00181 }
00182 catch (terimber::exception& x)
00183 {
00184 mutex_keeper keeper(_mtx);
00185 _error = x.what();
00186 return false;
00187 }
00188
00189 return true;
00190 }
00191
00192 bool
00193 aiomsg::stop()
00194 {
00195
00196 try
00197 {
00198
00199 if (_user_conn)
00200 {
00201 _user_conn->off();
00202 _communicator.shutdown_connection(_user_conn);
00203 _user_conn = 0;
00204 }
00205 return true;
00206 }
00207 catch (terimber::exception& x)
00208 {
00209 mutex_keeper keeper(_mtx);
00210 _error = x.what();
00211 return false;
00212 }
00213
00214 return true;
00215 }
00216
00218 const char*
00219 aiomsg::get_port_error() const
00220 {
00221 return _error;
00222 }
00223
00224 const guid_t&
00225 aiomsg::get_port_address() const
00226 {
00227 return _communicator.get_address();
00228 }
00229
00230 msg_t*
00231 aiomsg::construct(size_t size)
00232 {
00233 try
00234 {
00235 check_on();
00236 return _communicator.construct_msg(size);
00237 }
00238 catch (terimber::exception& x)
00239 {
00240 mutex_keeper keeper(_mtx);
00241 _error= x.what();
00242 return 0;
00243 }
00244 }
00245
00246 bool
00247 aiomsg::resize(msg_t* msg, size_t size)
00248 {
00249 try
00250 {
00251 check_on();
00252 return _communicator.resize_msg(_communicator.cast(msg), size);
00253 }
00254 catch (terimber::exception& x)
00255 {
00256 mutex_keeper keeper(_mtx);
00257 _error= x.what();
00258 return false;
00259 }
00260 }
00261
00262 bool
00263 aiomsg::destroy(msg_t* msg)
00264 {
00265 try
00266 {
00267 check_on();
00268 return _communicator.destroy_msg(_communicator.cast(msg));
00269 }
00270 catch (terimber::exception& x)
00271 {
00272 mutex_keeper keeper(_mtx);
00273 _error= x.what();
00274 return false;
00275 }
00276 }
00277
00278 size_t
00279 aiomsg::get_size(const msg_t* msg) const
00280 {
00281 try
00282 {
00283 check_on();
00284 const msg_cpp* msg_ = msg_communicator::cast(msg);
00285 return msg_ ? msg_->get_size() : 0;
00286 }
00287 catch (terimber::exception& x)
00288 {
00289 mutex_keeper keeper(_mtx);
00290 _error= x.what();
00291 return 0;
00292 }
00293 }
00294
00295 bool
00296 aiomsg::write_buffer(msg_t* msg, size_t offset, const void* buf, size_t len)
00297 {
00298 try
00299 {
00300 check_on();
00301 msg_cpp* msg_ = msg_communicator::cast(msg);
00302 if (!msg_ || msg_->get_size() < offset + len)
00303 return false;
00304
00305 memcpy(msg_->get_body() + offset, buf, len);
00306 return true;
00307 }
00308 catch (terimber::exception& x)
00309 {
00310 mutex_keeper keeper(_mtx);
00311 _error= x.what();
00312 return 0;
00313 }
00314 }
00315
00316 const void*
00317 aiomsg::get_buffer(const msg_t* msg) const
00318 {
00319 try
00320 {
00321 check_on();
00322 const msg_cpp* msg_ = msg_communicator::cast(msg);
00323 return msg_ ? msg_->get_body() : 0;
00324 }
00325 catch (terimber::exception& x)
00326 {
00327 mutex_keeper keeper(_mtx);
00328 _error= x.what();
00329 return 0;
00330 }
00331 }
00332
00333 bool
00334 aiomsg::set_receiver(msg_t* msg, const guid_t& receiver)
00335 {
00336 try
00337 {
00338 check_on();
00339 return msg_communicator::set_receiver(msg_communicator::cast(msg), receiver);
00340 }
00341 catch (terimber::exception& x)
00342 {
00343 mutex_keeper keeper(_mtx);
00344 _error= x.what();
00345 return false;
00346 }
00347 }
00348
00349 bool
00350 aiomsg::get_sender(const msg_t* msg, guid_t& sender) const
00351 {
00352 try
00353 {
00354 check_on();
00355 return msg_communicator::get_sender(msg_communicator::cast(msg), sender);
00356 }
00357 catch (terimber::exception& x)
00358 {
00359 mutex_keeper keeper(_mtx);
00360 _error= x.what();
00361 return false;
00362 }
00363 }
00364
00365 bool
00366 aiomsg::send(bool copy, msg_t* msg, msg_t** reply)
00367 {
00368 try
00369 {
00370 check_on();
00371 check_ident();
00372 msg_cpp* reply_ = 0;
00373
00374 if (!_user_conn->send(copy, msg_communicator::cast(msg), reply_))
00375 terimber::exception::_throw(_user_conn->get_last_error());
00376
00377
00378 if (reply)
00379 *reply = static_cast< msg_t* >(reply_);
00380 else if (reply_)
00381 _communicator.destroy_msg(reply_);
00382
00383 }
00384 catch (terimber::exception& x)
00385 {
00386 mutex_keeper keeper(_mtx);
00387 _error= x.what();
00388 return false;
00389 }
00390
00391 return true;
00392 }
00393
00394 guid_t
00395 aiomsg::send_async(bool copy, msg_t* msg)
00396 {
00397 guid_t retVal;
00398 try
00399 {
00400 check_on();
00401 check_ident();
00402
00403 if (null_uuid == (retVal = _user_conn->send_async(copy, _communicator.cast(msg))))
00404 terimber::exception::_throw(_user_conn->get_last_error());
00405 }
00406 catch (terimber::exception& x)
00407 {
00408 mutex_keeper keeper(_mtx);
00409 _error= x.what();
00410 }
00411
00412 return retVal;
00413 }
00414
00415 bool
00416 aiomsg::post(bool copy, msg_t* msg)
00417 {
00418 try
00419 {
00420 check_on();
00421 check_ident();
00422
00423 if (!_user_conn->post(copy, msg_communicator::cast(msg)))
00424 terimber::exception::_throw(_user_conn->get_last_error());
00425 }
00426 catch (terimber::exception& x)
00427 {
00428 mutex_keeper keeper(_mtx);
00429 _error= x.what();
00430 return false;
00431 }
00432
00433 return true;
00434 }
00435
00436 bool
00437 aiomsg::add_connection(const char* xml_description)
00438 {
00439 if (!xml_description)
00440 {
00441 _error = "Null pointer provided";
00442 return false;
00443 }
00444
00445
00446 xml_parser_creator nav_creator;
00447 xml_designer_keeper_t nav_keeper(nav_creator, 0);
00448 if (!nav_keeper->load(xml_description, strlen(xml_description), (const void*)msg_connection_dtd, strlen(msg_connection_dtd)))
00449 {
00450 _error = nav_keeper->error();
00451 return false;
00452 }
00453
00454
00455 try
00456 {
00457 check_on();
00458
00459
00460 nav_keeper->select_root();
00461 conf_connection atom;
00462
00463 msg_communicator::parse_connection(nav_keeper, atom);
00464 _communicator.add_connection_config(atom);
00465 }
00466 catch (terimber::exception& x)
00467 {
00468 mutex_keeper keeper(_mtx);
00469 _error= x.what();
00470 return false;
00471 }
00472
00473 return true;
00474 }
00475
00476 bool
00477 aiomsg::add_listener(const char* xml_description)
00478 {
00479 if (!xml_description)
00480 {
00481 _error = "Null pointer provided";
00482 return false;
00483 }
00484
00485
00486 xml_parser_creator nav_creator;
00487 xml_designer_keeper_t nav_keeper(nav_creator, 0);
00488 if (!nav_keeper->load(xml_description, strlen(xml_description), (const void*)msg_listener_dtd, strlen(msg_listener_dtd)))
00489 {
00490 _error = nav_keeper->error();
00491 return false;
00492 }
00493
00494
00495 try
00496 {
00497 check_on();
00498
00499
00500 nav_keeper->select_root();
00501 conf_listener atom;
00502
00503 msg_communicator::parse_listener(nav_keeper, atom);
00504
00505 _communicator.add_listener_config(atom, true);
00506 }
00507 catch (terimber::exception& x)
00508 {
00509 mutex_keeper keeper(_mtx);
00510 _error= x.what();
00511 return false;
00512 }
00513
00514 return true;
00515 }
00516
00517
00518
00519
00520 bool
00521 aiomsg::remove_connection(const guid_t& address)
00522 {
00523 try
00524 {
00525 check_on();
00526 _communicator.remove_connection_config(address);
00527 }
00528 catch (terimber::exception& x)
00529 {
00530 mutex_keeper keeper(_mtx);
00531 _error= x.what();
00532 return false;
00533 }
00534 return true;
00535 }
00536
00537
00538
00539 bool
00540 aiomsg::remove_listener(const char* type)
00541 {
00542 if (!type)
00543 {
00544 _error = "Null pointer provided";
00545 return false;
00546 }
00547
00548 try
00549 {
00550 check_on();
00551 transport_type type_ = sock;
00552 _communicator.remove_listener_config(type_);
00553 }
00554 catch (terimber::exception& x)
00555 {
00556 mutex_keeper keeper(_mtx);
00557 _error= x.what();
00558 return false;
00559 }
00560 return true;
00561 }
00562
00564 void
00565 aiomsg::check_on() const
00566 {
00567 if (!_communicator.is_on())
00568 terimber::exception::_throw("Communicator is not initialized");
00569 }
00570
00571 void
00572 aiomsg::check_ident() const
00573 {
00574 if (!_communicator.validate_connection(_user_conn))
00575 terimber::exception::_throw("Connection not found");
00576 }
00577
00579
00580 void
00581 aiomsg::doxray()
00582 {
00583 _communicator.doxray();
00584 }
00585
00586
00587 #pragma pack()
00588 END_TERIMBER_NAMESPACE