00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 #include "aiomsg/msgimpl.h"
00029 #include "base/except.h"
00030 #include "base/string.hpp"
00031 #include "base/list.hpp"
00032 #include "base/map.hpp"
00033 #include "base/memory.hpp"
00034 #include "base/template.hpp"
00035 #include "base/number.hpp"
00036 #include "base/common.hpp"
00037 
00038 #include "xml/xmlimpl.hpp"
00039 #include "xml/sxs.hpp"
00040 
00041 #include "aiomsg/msg_user.h"
00042 
00043 const char* msg_connection_dtd = \
00044 "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \
00045 <!ENTITY % kind \"(rpc | sock | http | p2p)\"> \
00046 <!ELEMENT connection EMPTY> \
00047 <!ATTLIST connection \
00048 type %kind; #REQUIRED \
00049 address CTYPE vt_guid #REQUIRED \
00050 port CTYPE vt_ub2 #IMPLIED \
00051 network CDATA #IMPLIED \
00052 ping CTYPE vt_ub4 #IMPLIED \
00053 info CDATA #IMPLIED \
00054 security CTYPE vt_bool #IMPLIED \
00055 password CDATA #IMPLIED >";
00056 
00057 const char* msg_listener_dtd = \
00058 "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \
00059 <!ELEMENT listener (accept | reject)?> \
00060 <!ELEMENT reject (peer)+> \
00061 <!ELEMENT accept (peer)+> \
00062 <!ELEMENT peer EMPTY> \
00063 <!ATTLIST peer \
00064 address CTYPE vt_guid #REQUIRED \
00065 security CTYPE vt_bool #IMPLIED \
00066 password CDATA #IMPLIED > \
00067 <!ATTLIST listener \
00068 port CTYPE vt_ub2 #IMPLIED \
00069 network CDATA #IMPLIED \
00070 connections CTYPE vt_ub4 #IMPLIED \
00071 ping CTYPE vt_ub4 #IMPLIED \
00072 info CDATA #IMPLIED \
00073 security CTYPE vt_bool #IMPLIED \
00074 password CDATA #IMPLIED >";
00075 
00076 aiomsgfactory::aiomsgfactory()
00077 {
00078 }
00079 
00080 
00081 aiomsgfactory::~aiomsgfactory()
00082 {
00083 }
00084 
00085 terimber_aiomsg*
00086 aiomsgfactory::get_aiomsg(terimber_log* log)
00087 {
00088         terimber::aiomsg* obj = new terimber::aiomsg();
00089         if (obj)
00090                 obj->log_on(log);
00091 
00092         return obj;
00093 }
00094 
00095 
00096 
00097 BEGIN_TERIMBER_NAMESPACE
00098 #pragma pack(4)
00099 
00101 aiomsg::aiomsg() : _user_conn(0)
00102 {
00103 }
00104 
00105 
00106 aiomsg::~aiomsg() 
00107 {
00108         stop();
00109 }
00110 
00111 bool 
00112 aiomsg::init(const char* info, const char* ini_key)
00113 {
00114         try
00115         {
00116         _communicator.init(info, ini_key);
00117         }
00118         catch (terimber::exception& x)
00119         {
00120                 mutex_keeper keeper(_mtx);
00121                 _error = x.what();
00122                 return false;
00123         }
00124 
00125         return true;
00126 }
00127 
00128 bool 
00129 aiomsg::init(const void* buffer, size_t len)
00130 {
00131         
00132 
00133         try
00134         {
00135                 
00136                 _communicator.log_on(this);
00137         _communicator.init(buffer, len);
00138         }
00139         catch (terimber::exception& x)
00140         {
00141                 mutex_keeper keeper(_mtx);
00142                 _error = x.what();
00143                 return false;
00144         }
00145 
00146         return true;
00147 }
00148 
00149 bool
00150 aiomsg::uninit() 
00151 {
00152         try
00153         {
00154                 stop();
00155                 _communicator.uninit();
00156                 _communicator.log_on(0);
00157         }
00158         catch (terimber::exception& x)
00159         {
00160                 mutex_keeper keeper(_mtx);
00161                 _error = x.what();
00162                 return false;
00163         }
00164 
00165         return true;
00166 }
00167 
00168 bool 
00169 aiomsg::start(msg_callback_notify* callback, size_t additional_thread_count) 
00170 {
00171         
00172         conf_connection atom;
00173         atom._ping = 1000; 
00174         atom._address = _communicator.get_address();
00175         try
00176         {
00177                 check_on();
00178                 stop();
00179                 _user_conn = msg_user_connection::connect(&_communicator, callback, atom, additional_thread_count);
00180                 _user_conn->on();
00181         }
00182         catch (terimber::exception& x)
00183         {
00184                 mutex_keeper keeper(_mtx);
00185                 _error = x.what();
00186                 return false;
00187         }
00188 
00189         return true;
00190 }
00191 
00192 bool
00193 aiomsg::stop() 
00194 {
00195 
00196         try
00197         {
00198                 
00199                 if (_user_conn)
00200                 {
00201                         _user_conn->off();
00202                         _communicator.shutdown_connection(_user_conn);
00203                         _user_conn = 0;
00204                 }
00205                 return true;
00206         }
00207         catch (terimber::exception& x)
00208         {
00209                 mutex_keeper keeper(_mtx);
00210                 _error = x.what();
00211                 return false;
00212         }
00213 
00214         return true;
00215 }
00216 
00218 const char*
00219 aiomsg::get_port_error() const 
00220 {
00221         return _error; 
00222 }
00223 
00224 const guid_t&
00225 aiomsg::get_port_address() const 
00226 {
00227         return _communicator.get_address();
00228 }
00229 
00230 msg_t*
00231 aiomsg::construct(size_t size) 
00232 {
00233         try
00234         {
00235                 check_on();
00236                 return _communicator.construct_msg(size);
00237         }
00238         catch (terimber::exception& x)
00239         {
00240                 mutex_keeper keeper(_mtx);
00241                 _error= x.what();
00242                 return 0;
00243         }
00244 }
00245 
00246 bool    
00247 aiomsg::resize(msg_t* msg, size_t size) 
00248 {
00249         try
00250         {
00251                 check_on();
00252                 return _communicator.resize_msg(_communicator.cast(msg), size); 
00253         }
00254         catch (terimber::exception& x)
00255         {
00256                 mutex_keeper keeper(_mtx);
00257                 _error= x.what();
00258                 return false;
00259         }
00260 }
00261 
00262 bool    
00263 aiomsg::destroy(msg_t* msg) 
00264 {
00265         try
00266         {
00267                 check_on();
00268                 return _communicator.destroy_msg(_communicator.cast(msg)); 
00269         }
00270         catch (terimber::exception& x)
00271         {
00272                 mutex_keeper keeper(_mtx);
00273                 _error= x.what();
00274                 return false;
00275         }
00276 }
00277 
00278 size_t
00279 aiomsg::get_size(const msg_t* msg) const 
00280 {
00281         try
00282         {
00283                 check_on();
00284                 const msg_cpp* msg_ = msg_communicator::cast(msg);
00285                 return msg_ ? msg_->get_size() : 0;
00286         }
00287         catch (terimber::exception& x)
00288         {
00289                 mutex_keeper keeper(_mtx);
00290                 _error= x.what();
00291                 return 0;
00292         }
00293 }
00294 
00295 bool 
00296 aiomsg::write_buffer(msg_t* msg, size_t offset, const void* buf, size_t len)
00297 {
00298         try
00299         {
00300                 check_on();
00301                 msg_cpp* msg_ = msg_communicator::cast(msg);
00302                 if (!msg_ || msg_->get_size() < offset + len)
00303                         return false;
00304                 
00305                 memcpy(msg_->get_body() + offset, buf, len);
00306                 return true;
00307         }
00308         catch (terimber::exception& x)
00309         {
00310                 mutex_keeper keeper(_mtx);
00311                 _error= x.what();
00312                 return 0;
00313         }
00314 }
00315 
00316 const void*
00317 aiomsg::get_buffer(const msg_t* msg) const 
00318 {
00319         try
00320         {
00321                 check_on();
00322                 const msg_cpp* msg_ = msg_communicator::cast(msg);
00323                 return msg_ ? msg_->get_body() : 0;
00324         }
00325         catch (terimber::exception& x)
00326         {
00327                 mutex_keeper keeper(_mtx);
00328                 _error= x.what();
00329                 return 0;
00330         }
00331 }
00332 
00333 bool    
00334 aiomsg::set_receiver(msg_t* msg, const guid_t& receiver) 
00335 {
00336         try
00337         {
00338                 check_on();
00339                 return msg_communicator::set_receiver(msg_communicator::cast(msg), receiver); 
00340         }
00341         catch (terimber::exception& x)
00342         {
00343                 mutex_keeper keeper(_mtx);
00344                 _error= x.what();
00345                 return false;
00346         }
00347 }
00348 
00349 bool 
00350 aiomsg::get_sender(const msg_t* msg, guid_t& sender) const 
00351 {
00352         try
00353         {
00354                 check_on();
00355                 return msg_communicator::get_sender(msg_communicator::cast(msg), sender); 
00356         }
00357         catch (terimber::exception& x)
00358         {
00359                 mutex_keeper keeper(_mtx);
00360                 _error= x.what();
00361                 return false;
00362         }
00363 }
00364 
00365 bool    
00366 aiomsg::send(bool copy, msg_t* msg, msg_t** reply) 
00367 {
00368         try
00369         {
00370                 check_on();
00371                 check_ident();
00372                 msg_cpp* reply_ = 0;
00373 
00374                 if (!_user_conn->send(copy, msg_communicator::cast(msg), reply_))
00375                         terimber::exception::_throw(_user_conn->get_last_error());
00376 
00377                 
00378                 if (reply) 
00379                         *reply = static_cast< msg_t* >(reply_);
00380                 else if (reply_)
00381                         _communicator.destroy_msg(reply_);
00382 
00383         }
00384         catch (terimber::exception& x)
00385         {
00386                 mutex_keeper keeper(_mtx);
00387                 _error= x.what();
00388                 return false;
00389         }
00390 
00391         return true;
00392 }
00393 
00394 guid_t
00395 aiomsg::send_async(bool copy, msg_t* msg) 
00396 {
00397         guid_t retVal;
00398         try
00399         {
00400                 check_on();
00401                 check_ident();
00402 
00403                 if (null_uuid == (retVal = _user_conn->send_async(copy, _communicator.cast(msg))))
00404                         terimber::exception::_throw(_user_conn->get_last_error());
00405         }
00406         catch (terimber::exception& x)
00407         {
00408                 mutex_keeper keeper(_mtx);
00409                 _error= x.what();
00410         }
00411 
00412         return retVal;
00413 }
00414 
00415 bool    
00416 aiomsg::post(bool copy, msg_t* msg) 
00417 {
00418         try
00419         {
00420                 check_on();
00421                 check_ident();
00422 
00423                 if (!_user_conn->post(copy, msg_communicator::cast(msg)))
00424                         terimber::exception::_throw(_user_conn->get_last_error());
00425         }
00426         catch (terimber::exception& x)
00427         {
00428                 mutex_keeper keeper(_mtx);
00429                 _error= x.what();
00430                 return false;
00431         }
00432 
00433         return true;
00434 }
00435 
00436 bool 
00437 aiomsg::add_connection(const char* xml_description)
00438 {
00439         if (!xml_description)
00440         {
00441                 _error = "Null pointer provided";
00442                 return false;
00443         }
00444 
00445         
00446         xml_parser_creator nav_creator;
00447         xml_designer_keeper_t nav_keeper(nav_creator, 0);
00448         if (!nav_keeper->load(xml_description, strlen(xml_description), (const void*)msg_connection_dtd, strlen(msg_connection_dtd)))
00449         {
00450                 _error = nav_keeper->error();
00451                 return false;
00452         }
00453 
00454 
00455         try
00456         {
00457                 check_on();
00458 
00459                 
00460                 nav_keeper->select_root();
00461                 conf_connection atom;
00462                 
00463                 msg_communicator::parse_connection(nav_keeper, atom);
00464                 _communicator.add_connection_config(atom);
00465         }
00466         catch (terimber::exception& x)
00467         {
00468                 mutex_keeper keeper(_mtx);
00469                 _error= x.what();
00470                 return false;
00471         }
00472 
00473         return true;
00474 }
00475 
00476 bool 
00477 aiomsg::add_listener(const char* xml_description)
00478 {
00479         if (!xml_description)
00480         {
00481                 _error = "Null pointer provided";
00482                 return false;
00483         }
00484 
00485         
00486         xml_parser_creator nav_creator;
00487         xml_designer_keeper_t nav_keeper(nav_creator, 0);
00488         if (!nav_keeper->load(xml_description, strlen(xml_description), (const void*)msg_listener_dtd, strlen(msg_listener_dtd)))
00489         {
00490                 _error = nav_keeper->error();
00491                 return false;
00492         }
00493 
00494 
00495         try
00496         {
00497                 check_on();
00498 
00499                 
00500                 nav_keeper->select_root();
00501                 conf_listener atom;
00502                 
00503                 msg_communicator::parse_listener(nav_keeper, atom);
00504                 
00505                 _communicator.add_listener_config(atom, true);
00506         }
00507         catch (terimber::exception& x)
00508         {
00509                 mutex_keeper keeper(_mtx);
00510                 _error= x.what();
00511                 return false;
00512         }
00513 
00514         return true; 
00515 }
00516 
00517 
00518 
00519 
00520 bool 
00521 aiomsg::remove_connection(const guid_t& address)
00522 {
00523         try
00524         {
00525                 check_on();
00526                 _communicator.remove_connection_config(address);
00527         }
00528         catch (terimber::exception& x)
00529         {
00530                 mutex_keeper keeper(_mtx);
00531                 _error= x.what();
00532                 return false;
00533         }
00534         return true;
00535 }
00536 
00537 
00538 
00539 bool 
00540 aiomsg::remove_listener(const char* type)
00541 {
00542         if (!type)
00543         {
00544                 _error = "Null pointer provided";
00545                 return false;
00546         }
00547 
00548         try
00549         {
00550                 check_on();
00551                 transport_type type_ = sock;
00552                 _communicator.remove_listener_config(type_);
00553         }
00554         catch (terimber::exception& x)
00555         {
00556                 mutex_keeper keeper(_mtx);
00557                 _error= x.what();
00558                 return false;
00559         }
00560         return true;
00561 }
00562 
00564 void 
00565 aiomsg::check_on() const
00566 {
00567         if (!_communicator.is_on())
00568                 terimber::exception::_throw("Communicator is not initialized");
00569 }
00570 
00571 void 
00572 aiomsg::check_ident() const
00573 {
00574         if (!_communicator.validate_connection(_user_conn))
00575                 terimber::exception::_throw("Connection not found");
00576 }
00577 
00579 
00580 void
00581 aiomsg::doxray()
00582 {
00583         _communicator.doxray();
00584 }
00585 
00586 
00587 #pragma pack()
00588 END_TERIMBER_NAMESPACE