00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "aiomsg/msg_sock.h"
00029
00030 #include "base/list.hpp"
00031 #include "base/map.hpp"
00032 #include "base/memory.hpp"
00033 #include "base/string.hpp"
00034 #include "base/template.hpp"
00035 #include "base/except.h"
00036 #include "base/number.hpp"
00037 #include "base/common.hpp"
00038
00039 #include <oserror.h>
00040
00041 BEGIN_TERIMBER_NAMESPACE
00042 #pragma pack(4)
00043
00044 extern exception_table msgMsgTable;
00045 extern exception_table aiosockTable;
00046
00047
00048 void sockStatus(aiosock& port, int status_)
00049 {
00050 if (status_)
00051 {
00052 int code = 0;
00053 char err[1024];
00054 err[0] = 0;
00055 port.get_error_description(
00056 #if OS_TYPE == OS_WIN32
00057 (code = WSAGetLastError()),
00058 #else
00059 (code = errno),
00060 #endif
00061 err, sizeof(err) - 1);
00062 exception::_throw(err);
00063 }
00064 }
00065
00067 msg_sock_connection::msg_sock_connection(msg_communicator* communicator_, const conf_connection& info_) :
00068 msg_connection(communicator_, info_),
00069 _msg_send(0),
00070 _offset_send(0),
00071 _msg_recv(0),
00072 _size_recv(0),
00073 _offset_recv(0)
00074 {
00075 _handle = _communicator->get_aiosock().create(this, true);
00076 }
00077
00078 msg_sock_connection::msg_sock_connection(msg_communicator* communicator_, size_t handle, const conf_listener& linfo, const conf_connection& info) :
00079 msg_connection(communicator_, linfo, info),
00080 _msg_send(0),
00081 _offset_send(0),
00082 _msg_recv(0),
00083 _size_recv(0),
00084 _offset_recv(0)
00085 {
00086 _handle = handle;
00087 }
00088
00089 msg_sock_connection::~msg_sock_connection()
00090 {
00091
00092 if (_handle)
00093 {
00094 _communicator->get_aiosock().close(_handle);
00095 _handle = 0;
00096 }
00097
00098 if (_msg_send)
00099 {
00100 _communicator->destroy_msg(_msg_send);
00101 _msg_send = 0;
00102 }
00103
00104 if (_msg_recv)
00105 {
00106 _communicator->destroy_msg(_msg_recv);
00107 _msg_recv = 0;
00108 _size_recv = 0;
00109 }
00110 }
00111
00112
00113
00114 void
00115 msg_sock_connection::v_on_error(size_t handle, int err, aiosock_type mask, void* userdata)
00116 {
00117 assert(handle == _handle);
00118
00119 _communicator->format_logging(0, __FILE__, __LINE__, en_log_info, "shutdown conection, error: %x, mask: %d", err, mask);
00120 _communicator->shutdown_connection(this);
00121 }
00122
00123
00124
00125 void
00126 msg_sock_connection::v_on_connect(size_t handle, const sockaddr_in& peeraddr, void* userdata)
00127 {
00128 assert(handle == _handle);
00129
00130 msg_creator creator(_communicator);
00131 msg_pointer_t msg(creator);
00132
00133 try
00134 {
00135 msg = prepare_handshake_msg();
00136
00137 push(msg);
00138 msg.detach();
00139
00140 int err = _communicator->get_aiosock().receive(_handle, &_size_recv, sizeof(ub4_t), INFINITE, 0, 0);
00141 sockStatus(_communicator->get_aiosock(), err);
00142 }
00143 catch (exception& x)
00144 {
00145 _communicator->format_logging(0, __FILE__, __LINE__, en_log_info, "shutdown conection, error: %s", x.what());
00146 _communicator->shutdown_connection(this);
00147 }
00148 }
00149
00150
00151
00152 void
00153 msg_sock_connection::v_on_send(size_t handle, void* buf, size_t requested, size_t processed, const sockaddr_in& peeraddr, void* userdata)
00154 {
00155 assert(handle == _handle);
00156
00157 if (requested != processed)
00158 {
00159 assert(requested < processed);
00160 _offset_send += (ub4_t)processed;
00161 int err = _communicator->get_aiosock().send(_handle, _msg_send->get_block() + _offset_send, requested - processed, (size_t)_msg_send->timeout, 0, 0);
00162
00163 try
00164 {
00165 sockStatus(_communicator->get_aiosock(), err);
00166 }
00167 catch (...)
00168 {
00169
00170 _communicator->format_logging(0, __FILE__, __LINE__, en_log_info, "shutdown conection, error: %x", err);
00171 _communicator->shutdown_connection(this);
00172 }
00173 }
00174 else
00175 {
00176
00177 _communicator->destroy_msg(_msg_send);
00178 _msg_send = 0;
00179 _offset_send = 0;
00180 wakeup();
00181 }
00182 }
00183
00184
00185
00186 void
00187 msg_sock_connection::v_on_receive(size_t handle, void* buf, size_t requested, size_t processed, const sockaddr_in& peeraddr, void* userdata)
00188 {
00189 assert(handle == _handle);
00190
00191 if (!_msg_recv)
00192 {
00193 try
00194 {
00195 assert(requested == sizeof(ub4_t));
00196 assert(buf == &_size_recv);
00197
00198 if (buf != &_size_recv || requested != sizeof(ub4_t) || processed != sizeof(ub4_t))
00199 exception::_throw(MSG_RESULT_INVALID_MSGFORMAT, &aiosockTable);
00200
00201 size_t sizeh = ntohl(_size_recv);
00202 int body_size = (int)sizeh - (int)msg_cpp::block_size(0);
00203 if (body_size < 0)
00204 exception::_throw(MSG_RESULT_INVALID_MSGFORMAT, &msgMsgTable);
00205
00206
00207
00208 _msg_recv = _communicator->construct_msg(body_size);
00209
00210 _offset_recv = sizeof(ub4_t);
00211
00212 int err = _communicator->get_aiosock().receive(_handle, _msg_recv->get_block() + _offset_recv, sizeh - sizeof(ub4_t), msg_default_timeout, 0, 0);
00213 sockStatus(_communicator->get_aiosock(), err);
00214 }
00215 catch (exception& x)
00216 {
00217
00218 _communicator->format_logging(0, __FILE__, __LINE__, en_log_info, "shutdown conection, error: %s", x.what());
00219 _communicator->shutdown_connection(this);
00220 }
00221 }
00222 else if (requested != processed)
00223 {
00224 assert(requested > processed);
00225 _offset_recv += (ub4_t)processed;
00226 int err = _communicator->get_aiosock().receive(_handle, _msg_recv->get_block() + _offset_recv, requested - processed, msg_default_timeout, 0, 0);
00227 try
00228 {
00229 sockStatus(_communicator->get_aiosock(), err);
00230 }
00231 catch (...)
00232 {
00233
00234 _communicator->format_logging(0, __FILE__, __LINE__, en_log_info, "shutdown conection, error: %x", err);
00235 _communicator->shutdown_connection(this);
00236 }
00237 }
00238 else
00239 {
00240 try
00241 {
00242
00243 _msg_recv->unpack_msg(get_crypt_key());
00244
00245 process_incoming_message(_msg_recv);
00246 _msg_recv = 0;
00247 _offset_recv = 0;
00248
00249
00250 int err = _communicator->get_aiosock().receive(_handle, &_size_recv, sizeof(ub4_t), INFINITE, 0, 0);
00251 sockStatus(_communicator->get_aiosock(), err);
00252 }
00253 catch (exception& x)
00254 {
00255 _communicator->format_logging(0, __FILE__, __LINE__, en_log_info, "shutdown conection, error: %s", x.what());
00256 _communicator->shutdown_connection(this);
00257 }
00258 }
00259 }
00260
00261
00262
00263 void
00264 msg_sock_connection::v_on_accept(size_t handle, size_t handle_accepted, terimber_aiosock_callback*& callback, const sockaddr_in& peeraddr, void* userdata)
00265 {
00266 assert(false);
00267 }
00268
00269
00270 bool
00271 msg_sock_connection::v_has_job(size_t ident, void* user_data)
00272 {
00273 size_t top_priority = 0;
00274
00275 switch (ident)
00276 {
00277 case queue_thread_ident:
00278 return (msg_connection::v_has_job(ident, user_data)
00279 && _msg_send == 0
00280 && (_state == CONN_STATE_CONNECTED
00281 || ((_state == CONN_STATE_HANDSHAKE_INITIATOR
00282 || _state == CONN_STATE_HANDSHAKE_RECEIVER)
00283 && touch(top_priority)
00284 && top_priority == MSG_PRIORITY_SYSTEM
00285 )
00286 )
00287 );
00288 default:
00289 assert(false);
00290 }
00291
00292 return false;
00293 }
00294
00295
00296 void
00297 msg_sock_connection::v_do_job(size_t ident, void* user_data)
00298 {
00299 msg_creator creator(_communicator);
00300
00301 switch (ident)
00302 {
00303 case queue_thread_ident:
00304 {
00305 assert(_msg_send == 0);
00306 if (!pop(_msg_send))
00307 {
00308 assert(false);
00309 return;
00310 }
00311
00312
00313 try
00314 {
00315
00316 if (_msg_send->_type != handshake_type)
00317 _msg_send->_sessionid = _info._session;
00318
00319
00320
00321 _msg_send->pack_msg((_msg_send->_type & user_type_mask) ? get_crypt_key() : 0);
00322 int err = _communicator->get_aiosock().send(_handle, _msg_send->get_block(), ntohl(*(ub4_t*)_msg_send->get_block()), (size_t)_msg_send->timeout, 0, 0);
00323 sockStatus(_communicator->get_aiosock(), err);
00324
00325
00326 set_last_activity();
00327 }
00328 catch (exception& err)
00329 {
00330 if (_msg_send->_type == user_type_send || _msg_send->_type == user_type_send_async)
00331 {
00332 try
00333 {
00334
00335 msg_pointer_t reply(creator, 0);
00336
00337 msg_pack::make_reply_msg(_msg_send, reply);
00338 msg_pack::make_error_msg(reply, err.what());
00339
00340 _communicator->comm_msg(reply);
00341 reply.detach();
00342 }
00343 catch (exception&)
00344 {
00345 }
00346 catch (...)
00347 {
00348 assert(false);
00349 }
00350 }
00351
00352
00353 _communicator->format_logging(0, __FILE__, __LINE__, en_log_info, "shutdown conection, error: %s", err.what());
00354 _communicator->shutdown_connection(this);
00355 }
00356 catch (...)
00357 {
00358 assert(false);
00359 }
00360 }
00361 break;
00362 default:
00363 assert(false);
00364 }
00365 }
00366
00367
00368 msg_sock_connection*
00369 msg_sock_connection::connect(msg_communicator* communicator_, const conf_connection& info_)
00370 {
00371
00372 msg_sock_connection* connection = new msg_sock_connection(communicator_, info_);
00373
00374 if (!connection)
00375 exception::_throw(MSG_RESULT_NOTMEMORY, &msgMsgTable);
00376
00377
00378 if (!connection->_handle)
00379 {
00380 delete connection;
00381 #if OS_TYPE == OS_WIN32
00382 exception::_throw(WSAEFAULT, &aiosockTable);
00383 #else
00384 exception::_throw(EFAULT, &aiosockTable);
00385 #endif
00386 }
00387
00388
00389 int err = 0;
00390
00391 try
00392 {
00393 if (err = communicator_->get_aiosock().connect(connection->_handle, info_._network, info_._port, INFINITE, 0))
00394 {
00395 delete connection;
00396 sockStatus(communicator_->get_aiosock(), err);
00397 }
00398
00399 communicator_->add_connection(connection);
00400 }
00401 catch (exception& x)
00402 {
00403 delete connection;
00404 throw x;
00405 }
00406
00407 return connection;
00408 }
00409
00410
00411 void
00412 msg_sock_connection::accept(size_t handle, msg_communicator* communicator_, const conf_listener& info_, terimber_aiosock_callback*& callback)
00413 {
00414 conf_connection atom;
00415 atom._address = uuid_gen();
00416
00417 msg_sock_connection* connection = new msg_sock_connection(communicator_, handle, info_, atom);
00418
00419 if (!connection)
00420 {
00421 communicator_->get_aiosock().close(handle);
00422 exception::_throw(MSG_RESULT_NOTMEMORY, &msgMsgTable);
00423 }
00424
00425 callback = connection;
00426
00427 try
00428 {
00429
00430
00431
00432 int err = communicator_->get_aiosock().receive(connection->_handle, &connection->_size_recv, sizeof(ub4_t), INFINITE, 0, 0);
00433 sockStatus(communicator_->get_aiosock(), err);
00434 connection->v_on();
00435 communicator_->add_connection(connection);
00436 }
00437 catch (exception& err)
00438 {
00439 delete connection;
00440 throw err;
00441 }
00442 }
00444 msg_sock_listener::msg_sock_listener(msg_communicator* communicator_, const conf_listener& info_) :
00445 msg_listener(communicator_, info_), _handle(0)
00446 {
00447 }
00448
00449
00450 msg_sock_listener::~msg_sock_listener()
00451 {
00452 if (_handle)
00453 {
00454 _communicator->get_aiosock().close(_handle);
00455 _handle = 0;
00456 }
00457 }
00458
00459
00460 void
00461 msg_sock_listener::v_on()
00462 {
00463 if (is_on())
00464 return;
00465
00466 _handle = _communicator->get_aiosock().create(this, true);
00467
00468 if (!_handle)
00469 exception::_throw("Can't create the socket handler");
00470
00471 int err = _communicator->get_aiosock().listen(_handle, _info._port, _info._connections, _info._network, 64, 0);
00472 sockStatus(_communicator->get_aiosock(), err);
00473
00474 msg_base::v_on();
00475 }
00476
00477
00478 void
00479 msg_sock_listener::v_off()
00480 {
00481 if (!is_on())
00482 return;
00483
00484 if (_handle)
00485 {
00486 _communicator->get_aiosock().close(_handle);
00487 _handle = 0;
00488 }
00489
00490 msg_base::v_off();
00491 }
00492
00493
00494
00495 void
00496 msg_sock_listener::v_on_error(size_t handle, int err, aiosock_type mask, void* userdata)
00497 {
00498 assert(handle == _handle);
00499 }
00500
00501
00502
00503 void
00504 msg_sock_listener::v_on_connect(size_t handle, const sockaddr_in& peeraddr, void* userdata)
00505 {
00506 assert(false);
00507 }
00508
00509
00510
00511 void
00512 msg_sock_listener::v_on_send(size_t handle, void* buf, size_t requested, size_t processed, const sockaddr_in& peeraddr, void* userdata)
00513 {
00514 assert(false);
00515 }
00516
00517
00518
00519 void
00520 msg_sock_listener::v_on_receive(size_t handle, void* buf, size_t requested, size_t processed, const sockaddr_in& peeraddr, void* userdata)
00521 {
00522 assert(false);
00523 }
00524
00525
00526
00527 void
00528 msg_sock_listener::v_on_accept(size_t handle, size_t handle_accepted, terimber_aiosock_callback*& callback, const sockaddr_in& peeraddr, void* userdata)
00529 {
00530 assert(handle == _handle);
00531 try
00532 {
00533
00534 msg_sock_connection::accept(handle_accepted, _communicator, _info, callback);
00535 }
00536 catch (exception&)
00537 {
00538 }
00539 catch (...)
00540 {
00541 assert(false);
00542 }
00543 }
00544
00545
00546 #pragma pack()
00547 END_TERIMBER_NAMESPACE