00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "aiomsg/msg_conn.h"
00028 #include "aiomsg/msg_lsnr.h"
00029 #include "base/list.hpp"
00030 #include "base/template.hpp"
00031 #include "base/memory.hpp"
00032 #include "base/common.hpp"
00033 #include "base/number.hpp"
00034
00035 BEGIN_TERIMBER_NAMESPACE
00036 #pragma pack(4)
00037
00038 extern exception_table msgMsgTable;
00039
00040 msg_connection::msg_connection(msg_communicator* communicator_, const conf_listener& linfo, const conf_connection& info_) :
00041 msg_queue_processor(communicator_),
00042 _info(info_),
00043 _linfo(linfo),
00044 _state(CONN_STATE_HANDSHAKE_RECEIVER),
00045 _rsa(rsa_key_size, info_._support_crypt)
00046 {
00047 if (_info._support_crypt)
00048 {
00049 _communicator->get_msg_key(_info._crypt_private, _info._crypt_external, _info._session);
00050 }
00051 }
00052
00053 msg_connection::msg_connection(msg_communicator* communicator_, const conf_connection& info_) :
00054 msg_queue_processor(communicator_),
00055 _info(info_),
00056 _linfo(),
00057 _state(CONN_STATE_HANDSHAKE_INITIATOR),
00058 _rsa(rsa_key_size, info_._support_crypt)
00059 {
00060 if (_info._support_crypt)
00061 {
00062 _communicator->get_msg_key(_info._crypt_private, _info._crypt_external, _info._session);
00063 }
00064 }
00065
00066
00067 msg_connection::~msg_connection()
00068 {
00069
00070 msg_cpp* msg = 0;
00071 while (pop(msg))
00072 {
00073
00074 _communicator->destroy_msg(msg);
00075 }
00076 }
00077
00078
00079 void msg_connection::ping_notify()
00080 {
00081
00082 msg_cpp* msg = 0;
00083 try
00084 {
00085 msg_creator creator(_communicator);
00086 msg_pointer_t msg_(creator);
00087
00088
00089 msg_ = _communicator->construct_ping();
00090
00091 msg_->_receiver = _info._address;
00092
00093 push_msg(msg_);
00094 msg_.detach();
00095
00096 set_last_activity();
00097 }
00098 catch (exception&)
00099 {
00100 }
00101 catch (...)
00102 {
00103 assert(false);
00104 }
00105 }
00106
00107
00108 void
00109 msg_connection::push_msg(msg_cpp* msg_)
00110 {
00111 msg_->_sessionid = _info._session;
00112 push(msg_);
00113 }
00114
00115
00116 void
00117 msg_connection::wakeup()
00118 {
00119 _communicator->get_thread_manager().borrow_thread(queue_thread_ident, 0, this, stay_on_alert_time);
00120 }
00121
00122 msg_cpp*
00123 msg_connection::prepare_handshake_msg()
00124 {
00125 msg_cpp* msg = _communicator->construct_handshake(get_rsa());
00126
00127 msg->_receiver = _info._address;
00128
00129 _info._session = msg->_sessionid;
00130
00131 return msg;
00132 }
00133
00134 msg_cpp*
00135 msg_connection::prepare_handshake_reply(msg_cpp* msg)
00136 {
00137
00138
00139
00140 msg_listener::accept_address(msg->_sender, _linfo, _info);
00141
00142 _info._session = msg->_sessionid;
00143
00144 _info._address = msg->_sender;
00145
00146 msg_cpp* reply = _communicator->reply_handshake(msg, _info._support_crypt ? &_info._crypt_private : 0);
00147
00148 _state = CONN_STATE_CONNECTED;
00149
00150 reply->timeout = msg->timeout;
00151
00152 return reply;
00153 }
00154
00155 void
00156 msg_connection::validate_handshake_reply(msg_cpp* msg)
00157 {
00158
00159 _communicator->check_handshake(_info._session, msg, get_rsa(), _info._crypt_private);
00160 _info._address = msg->_sender;
00161 _state = CONN_STATE_CONNECTED;
00162 }
00163
00164 void
00165 msg_connection::process_incoming_message(msg_cpp* msg)
00166 {
00167
00168
00169
00170
00171
00172
00173 switch (msg->_type)
00174 {
00175 case handshake_type:
00176
00177 switch (_state)
00178 {
00179 case CONN_STATE_HANDSHAKE_INITIATOR:
00180
00181 _communicator->check_handshake(_info._session, msg, get_rsa(), _info._crypt_private);
00182 _info._address = msg->_sender;
00183 _state = CONN_STATE_CONNECTED;
00184 wakeup();
00185 break;
00186 case CONN_STATE_HANDSHAKE_RECEIVER:
00187 {
00188
00189 msg_creator creator(_communicator);
00190 msg_pointer_t reply(creator);
00191
00192 guid_t old_address = _info._address;
00193
00194 msg_listener::accept_address(msg->_sender, _linfo, _info);
00195
00196 _info._session = msg->_sessionid;
00197
00198 _communicator->change_connection_address(old_address, this);
00199 reply = _communicator->reply_handshake(msg, _info._support_crypt ? &_info._crypt_private : 0);
00200
00201 reply->timeout = msg->timeout;
00202
00203
00204 _state = CONN_STATE_CONNECTED;
00205 push(reply);
00206 reply.detach();
00207 }
00208 break;
00209 default:
00210 exception::_throw(MSG_RESULT_INVALID_MSGFORMAT, &msgMsgTable);
00211 }
00212
00213
00214 _communicator->destroy_msg(msg);
00215
00216 break;
00217 case system_type:
00218
00219 if (_state == CONN_STATE_CONNECTED && msg->_sessionid == _info._session && msg->msgid == msg_id_ping)
00220 _communicator->destroy_msg(msg);
00221 else
00222 exception::_throw(MSG_RESULT_INVALID_MSGFORMAT, &msgMsgTable);
00223 break;
00224 case user_type_send:
00225 case user_type_send_async:
00226 case user_type_post:
00227 case user_type_reply:
00228 case user_type_reply_async:
00229
00230 if (_state != CONN_STATE_CONNECTED || msg->_sessionid != _info._session)
00231 exception::_throw(MSG_RESULT_INVALID_SESSION, &msgMsgTable);
00232
00233 _communicator->comm_msg(msg);
00234 break;
00235 default:
00236 exception::_throw(MSG_RESULT_INVALID_MSGFORMAT, &msgMsgTable);
00237 }
00238
00239 set_last_activity();
00240 }
00241
00242 #pragma pack()
00243 END_TERIMBER_NAMESPACE