00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "osdetect.h"
00029
00031 #if OS_TYPE == OS_WIN32
00032 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00033 #include <winsock2.h>
00034 #endif
00035 #endif
00036
00038 #include "base/list.hpp"
00039 #include "aiosock/aiosock.h"
00040 #include "base/map.hpp"
00041 #include "base/stack.hpp"
00042 #include "base/memory.hpp"
00043 #include "base/common.hpp"
00044
00045
00047 terimber_aiosock_factory::terimber_aiosock_factory()
00048 {
00049 }
00050
00052 terimber_aiosock_factory::~terimber_aiosock_factory()
00053 {
00054 }
00055
00057 terimber_aiosock*
00058 terimber_aiosock_factory::get_aiosock(terimber_log* log, size_t capacity, size_t deactivate_time_msec)
00059 {
00060
00061 terimber::aiosock* obj = new terimber::aiosock(capacity, deactivate_time_msec);
00062 if (obj)
00063 {
00064
00065 obj->log_on(log);
00066
00067 obj->on();
00068 }
00069
00070 return obj;
00071 }
00072
00073
00074 BEGIN_TERIMBER_NAMESPACE
00075 #pragma pack(4)
00076
00077
00078
00079 #if OS_TYPE == OS_WIN32
00080 static exception_item aiosockMsgs[] =
00081 {
00082 { WSAEACCES, "Permission denied" },
00083 { WSAEADDRINUSE, "Address already in use" },
00084 { WSAEADDRNOTAVAIL, "Cannot assign requested address" },
00085 { WSAEAFNOSUPPORT, "Address family not supported by protocol family" },
00086 { WSAEALREADY, "Operation already in progress" },
00087 { WSAECONNABORTED, "Software caused connection abort" },
00088 { WSAECONNREFUSED, "Connection refused" },
00089 { WSAECONNRESET, "Connection reset by peer" },
00090 { WSAEDESTADDRREQ, "Destination address required" },
00091 { WSAEFAULT, "Bad address" },
00092 { WSAEHOSTDOWN, "Host is down" },
00093 { WSAEHOSTUNREACH, "No route to host" },
00094 { WSAEINPROGRESS, "Operation now in progress" },
00095 { WSAEINTR, "Interrupted function call" },
00096 { WSAEINVAL, "Invalid argument" },
00097 { WSAEISCONN, "Socket is already connected" },
00098 { WSAEMFILE, "Too many open files" },
00099 { WSAEMSGSIZE, "Message too long" },
00100 { WSAENETDOWN, "Network is down" },
00101 { WSAENETRESET, "Network dropped connection on reset" },
00102 { WSAENETUNREACH, "Network is unreachable" },
00103 { WSAENOBUFS, "No buffer space available" },
00104 { WSAENOPROTOOPT, "Bad protocol option" },
00105 { WSAENOTCONN, "Socket is not connected" },
00106 { WSAENOTSOCK, "Socket operation on non-socket" },
00107 { WSAEOPNOTSUPP, "Operation not supported" },
00108 { WSAEPFNOSUPPORT, "Protocol family not supported" },
00109 { WSAEPROCLIM, "Too many processes" },
00110 { WSAEPROTONOSUPPORT, "Protocol not supported" },
00111 { WSAEPROTOTYPE, "Protocol wrong type of socket" },
00112 { WSAESHUTDOWN, "Cannot send after socket shutdown" },
00113 { WSAESOCKTNOSUPPORT, "Socket type not supported" },
00114 { WSAETIMEDOUT, "Connection timed out" },
00115 { WSAEWOULDBLOCK, "Resource temporarily unavailable" },
00116 { WSAHOST_NOT_FOUND, "Host not found" },
00117 { WSANOTINITIALISED, "Successful WSAStartup not yet performed" },
00118 { WSANO_DATA, "Valid name, no data record of requested type" },
00119 { WSANO_RECOVERY, "This is a non-recoverable error" },
00120 { WSASYSNOTREADY, "Network subsystem is unavailable" },
00121 { WSATRY_AGAIN, "Non-authoritative host not found" },
00122 { WSAVERNOTSUPPORTED, "WINSOCK.DLL version out of range" },
00123 { WSAEDISCON, "Graceful shutdown in progress" },
00124 { HOST_NOT_FOUND, "Host not found" },
00125 { NO_DATA, "Valid name, no data record of requested type" },
00126 { NO_RECOVERY, "This is a non-recoverable error" },
00127 { TRY_AGAIN, "Non-authoritative host not found" },
00128 { 0, 0 }
00129 };
00130 #else
00131 static exception_item aiosockMsgs[] =
00132 {
00133 { EACCES, "Permission denied" },
00134 { EADDRINUSE, "Address already in use" },
00135 { EADDRNOTAVAIL, "Cannot assign requested address" },
00136 { EAFNOSUPPORT, "Address family not supported by protocol family" },
00137 { EALREADY, "Operation already in progress" },
00138 { ECONNABORTED, "Software caused connection abort" },
00139 { ECONNREFUSED, "Connection refused" },
00140 { ECONNRESET, "Connection reset by peer" },
00141 { EDESTADDRREQ, "Destination address required" },
00142 { EFAULT, "Bad address" },
00143 { EHOSTDOWN, "Host is down" },
00144 { EHOSTUNREACH, "No route to host" },
00145 { EINPROGRESS, "Operation now in progress" },
00146 { EINTR, "Interrupted function call" },
00147 { EINVAL, "Invalid argument" },
00148 { EISCONN, "Socket is already connected" },
00149 { EMFILE, "Too many open files" },
00150 { EMSGSIZE, "Message too long" },
00151 { ENETDOWN, "Network is down" },
00152 { ENETRESET, "Network dropped connection on reset" },
00153 { ENETUNREACH, "Network is unreachable" },
00154 { ENOBUFS, "No buffer space available" },
00155 { ENOPROTOOPT, "Bad protocol option" },
00156 { ENOTCONN, "Socket is not connected" },
00157 { ENOTSOCK, "Socket operation on non-socket" },
00158 { EOPNOTSUPP, "Operation not supported" },
00159 { EPFNOSUPPORT, "Protocol family not supported" },
00160 { EPROTONOSUPPORT, "Protocol not supported" },
00161 { EPROTOTYPE, "Protocol wrong type of socket" },
00162 { ESHUTDOWN, "Cannot send after socket shutdown" },
00163 { ESOCKTNOSUPPORT, "Socket type not supported" },
00164 { ETIMEDOUT, "Connection timed out" },
00165 { EWOULDBLOCK, "Resource temporarily unavailable" },
00166 { 0, 0 }
00167 };
00168 #endif
00169
00170 exception_table aiosockTable(aiosockMsgs);
00171
00172
00174 const size_t aiosock_io_initiation_ident = 1;
00176 const size_t aiosock_io_initiation_thread_alert = 1000;
00178 const size_t aiosock_completion_io_port_ident = 2;
00180 const size_t aiosock_completion_io_port_thread_alert = INFINITE;
00182 const size_t aiosock_working_ident = 3;
00184 const size_t aiosock_working_thread_alert = 60000;
00185
00186 #if OS_TYPE == OS_WIN32
00188 typedef BOOL (PASCAL *PCONNECTEX)(SOCKET s,
00189 const struct sockaddr* name,
00190 int namelen,
00191 PVOID lpSendBuffer,
00192 DWORD dwSendDataLength,
00193 LPDWORD lpdwBytesSent,
00194 LPOVERLAPPED lpOverlapped);
00195
00196 typedef BOOL (PASCAL *PACCEPTEX)(SOCKET sListenSocket,
00197 SOCKET sAcceptSocket,
00198 PVOID lpOutputBuffer,
00199 DWORD dwReceiveDataLength,
00200 DWORD dwLocalAddressLength,
00201 DWORD dwRemoteAddressLength,
00202 LPDWORD lpdwBytesReceived,
00203 LPOVERLAPPED lpOverlapped);
00204
00205 typedef BOOL (PASCAL *PDISCONNECTEX)(SOCKET hSocket,
00206 LPOVERLAPPED lpOverlapped,
00207 DWORD dwFlags,
00208 DWORD reserved);
00209
00211 const GUID GUID_MSWSOCK_ACCEPTEX = {0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}};
00212 const GUID GUID_MSWSOCK_CONNECTEX = {0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}};
00213 const GUID GUID_MSWSOCK_DISCONNECTEX = {0x7fda2e11,0x8630,0x436f,{0xa0, 0x31, 0xf5, 0x36, 0xa6, 0xee, 0xc1, 0x57}};
00214
00215 #ifndef SO_UPDATE_ACCEPT_CONTEXT
00216 #define SO_UPDATE_ACCEPT_CONTEXT 0x700B
00217 #endif
00218
00219 #ifndef SO_UPDATE_CONNECT_CONTEXT
00220 #define SO_UPDATE_CONNECT_CONTEXT 0x7010
00221 #endif
00222
00223 #endif
00224
00226 aiosock_block::aiosock_block()
00227 {
00228 clear();
00229 }
00230
00231 aiosock_block::aiosock_block(const aiosock_block& x)
00232 {
00233 *this = x;
00234 }
00235
00236 aiosock_block&
00237 aiosock_block::operator=(const aiosock_block& x)
00238 {
00239 if (this != &x)
00240 {
00241 memcpy(this, &x, sizeof(aiosock_block));
00242 }
00243
00244 return *this;
00245 }
00246
00247 void
00248 aiosock_block::clear()
00249 {
00250 memset(this, 0, sizeof(aiosock_block));
00251 }
00252
00253 void
00254 aiosock_block::settimeout(size_t timeout)
00255 {
00256 if (timeout == INFINITE)
00257 {
00258 #if OS_TYPE == OS_WIN32
00259 _timeout = 0;
00260 #else
00261 _timeout.tv_sec = _timeout.tv_usec = 0;
00262 #endif
00263 _expired = 0;
00264 }
00265 else
00266 {
00267 #if OS_TYPE == OS_WIN32
00268 _timeout = timeout;
00269 #else
00270 _timeout.tv_sec = timeout / 1000;
00271 _timeout.tv_usec = (timeout % 1000) * 1000 + 1;
00272 #endif
00273 date now;
00274 _expired = (sb8_t)now + timeout;
00275 }
00276 }
00277
00279
00280 bool
00281 aiosock::resolve_sock_error_code(int err, char* buf, size_t len)
00282 {
00283 if (!buf || !len)
00284 return false;
00285
00286 const char* ret = 0;
00287 if ((ret = aiosockTable.get_error(err)))
00288 {
00289 TERIMBER::str_template::strcpy(buf, ret, len - 1);
00290 buf[len] = 0;
00291 }
00292 else
00293 {
00294 os_get_error(err, buf, len);
00295 }
00296
00297 return true;
00298 }
00299
00300
00301 aiosock::aiosock(size_t capacity, size_t deactivate_time_msec) :
00302 _socket_map(less< size_t >(), 64)
00303 ,_reverse_map(less< aio_sock_handle >(), 64)
00304 ,_socket_generator(64)
00305 ,_outgoing_list(64)
00306 ,_aiosock_io_handle(0)
00307 ,_thread_pool(capacity + 3, deactivate_time_msec)
00308 ,_capacity(capacity)
00309 ,_on(false)
00310 ,_flag_io_port(false)
00311 {
00312 }
00313
00314 aiosock::~aiosock()
00315 {
00316
00317 off();
00318 }
00319
00320 bool
00321 aiosock::on()
00322 {
00323 if (_on)
00324 {
00325 format_logging(0, __FILE__, __LINE__, en_log_error, "aiosock already started");
00326 return false;
00327 }
00328
00329 format_logging(0, __FILE__, __LINE__, en_log_info, "starting aiosock...");
00330
00331 if (_sockStartup())
00332 {
00333 format_logging(0, __FILE__, __LINE__, en_log_error, "can not initiate socket library");
00334 return false;
00335 }
00336
00337 #if OS_TYPE != OS_WIN32
00338 TERIMBER::SetLog(this);
00339 #endif
00340 _thread_pool.log_on(this);
00341
00342 if (!_thread_pool.on())
00343 {
00344 format_logging(0, __FILE__, __LINE__, en_log_error, "can not start thread pool");
00345 return false;
00346 }
00347
00348 format_logging(0, __FILE__, __LINE__, en_log_info, "init completion port");
00349 #if OS_TYPE == OS_WIN32
00350 _aiosock_io_handle = ::CreateIoCompletionPort((HANDLE)INVALID_SOCKET, 0, 0, 0);
00351 #else
00352 _aiosock_io_handle = TERIMBER::CreateIoCompletionPort((HANDLE)INVALID_SOCKET, 0, 0, TYPE_UNKNOWN);
00353 #endif
00354
00355
00356 if (!_aiosock_io_handle)
00357 {
00358 #if OS_TYPE == OS_WIN32
00359 ::CloseHandle((HANDLE)_aiosock_io_handle);
00360 #else
00361 TERIMBER::CloseHandle(_aiosock_io_handle);
00362 #endif
00363 format_logging(0, __FILE__, __LINE__, en_log_info, "can not initiate completion port");
00364 return false;
00365 }
00366
00367 _flag_io_port = true;
00368 _thread_pool.borrow_thread(aiosock_completion_io_port_ident, 0, this, aiosock_completion_io_port_thread_alert);
00369 _start_io_port.wait();
00370
00371
00372 _thread_pool.borrow_thread(aiosock_working_ident, 0, this, aiosock_working_thread_alert);
00373
00374
00375 _in_thread.start();
00376 job_task task(this, aiosock_io_initiation_ident, aiosock_io_initiation_thread_alert, 0);
00377 _in_thread.assign_job(task);
00378
00379 _on = true;
00380
00381 format_logging(0, __FILE__, __LINE__, en_log_info, "aio socket port is initialized");
00382
00383 return _aiosock_io_handle != 0;
00384 }
00385
00386
00387
00388 void
00389 aiosock::off()
00390 {
00391 if (!_on)
00392 {
00393 format_logging(0, __FILE__, __LINE__, en_log_error, "aiosock already stopped");
00394 return;
00395 }
00396
00397 format_logging(0, __FILE__, __LINE__, en_log_info, "stoping aiosock...");
00398 _in_thread.cancel_job();
00399 _in_thread.stop();
00400
00401
00402 format_logging(0, __FILE__, __LINE__, en_log_info, "send stop message to completion port");
00403 #if OS_TYPE == OS_WIN32
00404 ::PostQueuedCompletionStatus((HANDLE)_aiosock_io_handle, 0, 0, 0);
00405 #else
00406 TERIMBER::PostQueuedCompletionStatus(_aiosock_io_handle, 0, 0, 0);
00407 #endif
00408
00409 _stop_io_port.wait();
00410 format_logging(0, __FILE__, __LINE__, en_log_info, "completion port stopped");
00411
00412
00413 #if OS_TYPE == OS_WIN32
00414
00415 ::CloseHandle((HANDLE)_aiosock_io_handle);
00416 #else
00417 TERIMBER::CloseHandle(_aiosock_io_handle);
00418 #endif
00419
00420
00421 _aiosock_io_handle = 0;
00422
00423 format_logging(0, __FILE__, __LINE__, en_log_info, "Stoping thread pool");
00424 _thread_pool.revoke_client(this);
00425 _thread_pool.off();
00426 _thread_pool.log_on(0);
00427
00428 format_logging(0, __FILE__, __LINE__, en_log_info, "Close all sockets");
00429
00430 mutex_keeper guard(_mtx);
00431 for (aiosock_socket_map_iterator_t iter = _socket_map.begin(); iter != _socket_map.end(); ++iter)
00432 {
00433 iter->_incoming_list.erase(_incoming_list_allocator, iter->_incoming_list.begin(), iter->_incoming_list.end());
00434 _cancel_socket(iter->_handle);
00435 _close_socket(iter->_handle, iter->_tcp_udp);
00436 }
00437
00438
00439 format_logging(0, __FILE__, __LINE__, en_log_info, "cleans up resources");
00440
00441
00442 _socket_generator.clear();
00443 _socket_map.clear();
00444 _reverse_map.clear();
00445 _listeners_map.clear();
00446 _delay_key_map.clear();
00447
00448
00449 _clear_block_lists();
00450
00451
00452 _incoming_list_allocator.clear_extra();
00453 _block_allocator.clear_extra();
00454
00455
00456 #if OS_TYPE != OS_WIN32
00457 TERIMBER::SetLog(0);
00458 #endif
00459
00460
00461 _sockCleanup();
00462
00463
00464 _on = false;
00465
00466 format_logging(0, __FILE__, __LINE__, en_log_info, "aio socket port is uninitialized");
00467 }
00468
00469
00470 bool
00471 aiosock::v_has_job(size_t ident, void* data)
00472 {
00473
00474 if (_aiosock_io_handle == 0)
00475 return false;
00476
00477 switch (ident)
00478 {
00479 case aiosock_completion_io_port_ident:
00480 return _flag_io_port;
00481 case aiosock_io_initiation_ident:
00482 {
00483
00484 mutex_keeper guard(_mtx);
00485
00486
00487 if (!_initial_list.empty())
00488 return true;
00489
00490
00491 for (aiosock_listener_map_t::const_iterator iter = _listeners_map.begin(); iter != _listeners_map.end(); ++iter)
00492 {
00493 if (iter->_curr_count < iter->_max_count)
00494 return true;
00495 }
00496
00497
00498 date now;
00499 sb8_t unow = (sb8_t)now;
00500
00501 for (aiosock_socket_map_iterator_t iter_socket = _socket_map.begin(); iter_socket != _socket_map.end(); ++iter_socket)
00502 {
00503 for (aiosock_pblock_alloc_list_t::iterator iter_block = iter_socket->_incoming_list.begin();
00504 iter_block != iter_socket->_incoming_list.end(); ++iter_block)
00505 {
00506
00507 aiosock_block* block = *iter_block;
00508
00509 if (block->_expired != 0
00510 && unow >= block->_expired)
00511 {
00512 return true;
00513 }
00514 }
00515 }
00516
00517
00518 return false;
00519 }
00520 case aiosock_working_ident:
00521 default:
00522 {
00523
00524 mutex_keeper guard(_mtx);
00525
00526 return !_outgoing_list.empty();
00527 }
00528 }
00529
00530 return false;
00531 }
00532
00533 void
00534 aiosock::wait_for_io_completion()
00535 {
00536
00537 _start_io_port.signal();
00538
00539 while (true)
00540 {
00541 aiosock_block* ov = 0;
00542 size_t sock_key = os_minus_one;
00543 size_t num_bytes = 0;
00544 #if OS_TYPE == OS_WIN32
00545 bool bRes = (TRUE == ::GetQueuedCompletionStatus((HANDLE)_aiosock_io_handle,
00546 (DWORD*)&num_bytes,
00547 #if defined(_MSC_VER) && (_MSC_VER > 1200)
00548 (ULONG_PTR*)
00549 #else
00550 (DWORD*)
00551 #endif
00552 &sock_key,
00553 (LPOVERLAPPED*)&ov,
00554 INFINITE));
00555
00556 int cRes = bRes ? 0 : ::GetLastError();
00557 #else
00558 int cRes = TERIMBER::GetQueuedCompletionStatus(_aiosock_io_handle,
00559 &num_bytes,
00560 &sock_key,
00561 (LPOVERLAPPED*)&ov,
00562 INFINITE);
00563
00564 if (ov)
00565 {
00566 ov->_address = ov->remoteAddress;
00567 }
00568 #endif
00569 if (!sock_key)
00570 {
00571
00572 _flag_io_port = false;
00573
00574 _stop_io_port.signal();
00575
00576 break;
00577 }
00578
00579
00580 complete_block(sock_key, ov, cRes, num_bytes);
00581 }
00582 }
00583
00584 void
00585 aiosock::complete_block(size_t sock_key, aiosock_block* ov, int err, size_t processed)
00586 {
00587
00588 mutex_keeper guard(_mtx);
00589
00590 aiosock_socket_map_iterator_t iter_sock = _socket_map.find(sock_key);
00591
00592 if (iter_sock == _socket_map.end())
00593 {
00594 format_logging(0, __FILE__, __LINE__, en_log_error, "socket key %d not found", sock_key);
00595 return;
00596 }
00597
00598
00599 for (aiosock_pblock_alloc_list_t::iterator iter_block = iter_sock->_incoming_list.begin(); iter_block != iter_sock->_incoming_list.end(); ++iter_block)
00600 {
00601
00602 if (*iter_block != ov)
00603 continue;
00604
00605
00606 aiosock_block* block = *iter_block;
00607
00608 iter_sock->_incoming_list.erase(_incoming_list_allocator, iter_block);
00609
00610 block->_err = err;
00611
00612 block->_processed = processed;
00613
00614 _outgoing_list.push_back(block);
00615
00616 guard.unlock();
00617
00618 if (!_capacity || !_thread_pool.borrow_from_range(aiosock_working_ident, aiosock_working_ident + _capacity, 0, this, aiosock_working_thread_alert))
00619 _thread_pool.borrow_thread(aiosock_working_ident, 0, this, aiosock_working_thread_alert);
00620
00621 return;
00622 }
00623
00624 #if OS_TYPE == OS_WIN32
00625
00626
00627
00628 for (aiosock_pblock_alloc_list_t::iterator iter_abounded = _abounded_list.begin(); iter_abounded != _abounded_list.end(); ++iter_abounded)
00629 {
00630 if (*iter_abounded != ov)
00631 continue;
00632
00633
00634 aiosock_block* block = *iter_abounded;
00635
00636 _abounded_list.erase(iter_abounded);
00637
00638 _put_block(block);
00639 break;
00640 }
00641
00642 #endif
00643 }
00644
00645
00646 void
00647 aiosock::v_do_job(size_t ident, void* data)
00648 {
00649 switch (ident)
00650 {
00651 case aiosock_completion_io_port_ident:
00652 wait_for_io_completion();
00653 break;
00654 case aiosock_io_initiation_ident:
00655 {
00656
00657 mutex_keeper guard(_mtx);
00658
00659 if (_initial_list.empty())
00660 {
00661
00662
00663 guard.unlock();
00664
00665
00666 if (!process_accept_blocks())
00667 {
00668
00669 process_timeouted_blocks();
00670 }
00671
00672 return;
00673 }
00674
00675
00676 aiosock_block* block = _initial_list.front();
00677
00678 _initial_list.pop_front();
00679
00680
00681 if (int err = _process_block(block))
00682 {
00683
00684 block->_err = err;
00685
00686 _outgoing_list.push_back(block);
00687
00688 guard.unlock();
00689
00690 if (!_capacity || !_thread_pool.borrow_from_range(aiosock_working_ident, aiosock_working_ident + _capacity, 0, this, aiosock_working_thread_alert))
00691 _thread_pool.borrow_thread(aiosock_working_ident, 0, this, aiosock_working_thread_alert);
00692 }
00693 }
00694 break;
00695 case aiosock_working_ident:
00696 default:
00697 {
00698 aiosock_block* block = 0;
00699 aio_sock_handle handle = 0;
00700 aio_sock_handle accept_handle = 0;
00701 terimber_aiosock_callback* client_obj = 0;
00702
00703
00704 mutex_keeper guard(_mtx);
00705
00706 if (_outgoing_list.empty())
00707 return;
00708
00709
00710 block = _outgoing_list.front();
00711
00712 _outgoing_list.pop_front();
00713
00714
00715 aiosock_socket_map_t::iterator iter_sock = _socket_map.find(block->_socket_ident);
00716
00717 if (iter_sock == _socket_map.end())
00718 {
00719 format_logging(0, __FILE__, __LINE__, en_log_error, "socket key %d not found", block->_socket_ident);
00720
00721 _put_block(block);
00722 return;
00723 }
00724 else
00725 {
00726
00727 client_obj = iter_sock->_client_obj;
00728
00729 handle = iter_sock->_handle;
00730
00731 ++iter_sock->_callback_invoking;
00732
00733
00734 if (block->_type == AIOSOCK_ACCEPT)
00735 {
00736 #if OS_TYPE == OS_WIN32
00737
00738
00739 aiosock_socket_map_t::iterator iter_accept = _socket_map.find(block->_accept_ident);
00740 if (iter_accept != _socket_map.end())
00741 accept_handle = iter_accept->_handle;
00742 #endif
00743
00744 aiosock_listener_map_t::iterator iter = _listeners_map.find(block->_socket_ident);
00745 if (iter != _listeners_map.end())
00746 --iter->_curr_count;
00747 }
00748 }
00749
00750
00751 if (block->_err)
00752 {
00753
00754
00755 guard.unlock();
00756
00757
00758 if (block->_type == AIOSOCK_ACCEPT && block->_accept_ident)
00759 {
00760
00761 close(block->_accept_ident);
00762 block->_accept_ident = 0;
00763 }
00764
00765 try
00766 {
00767
00768 client_obj->v_on_error(block->_socket_ident, block->_err, block->_type, block->_userdata);
00769 }
00770 catch (...)
00771 {
00772 format_logging(0, __FILE__, __LINE__, en_log_error, "v_on_error exception for socket %d", block->_socket_ident);
00773 assert(false);
00774 }
00775 }
00776 else
00777 {
00778
00779
00780 switch (block->_type)
00781 {
00782 case AIOSOCK_CONNECT:
00783
00784 guard.unlock();
00785
00786 #if OS_TYPE == OS_WIN32
00787
00788 setsockopt(handle, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, 0, 0);
00789 #endif
00790 try
00791 {
00792
00793 client_obj->v_on_connect(block->_socket_ident, block->_address, block->_userdata);
00794 }
00795 catch (...)
00796 {
00797 format_logging(0, __FILE__, __LINE__, en_log_error, "v_on_connect exception for socket %d", block->_socket_ident);
00798 assert(false);
00799 }
00800 break;
00801 case AIOSOCK_ACCEPT:
00802 #if OS_TYPE == OS_WIN32
00803 setsockopt(accept_handle, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&handle, sizeof(handle));
00804 #else
00805
00806 block->_accept_ident = _assign_socket(block->hAccept, client_obj, true);
00807 #endif
00808
00809 iter_sock = _socket_map.find(block->_accept_ident);
00810 if (iter_sock != _socket_map.end())
00811 {
00812
00813 terimber_aiosock_callback*& r_accept_callback = iter_sock->_client_obj;
00814
00815 guard.unlock();
00816 try
00817 {
00818
00819 client_obj->v_on_accept(block->_socket_ident, block->_accept_ident, r_accept_callback, block->_address, block->_userdata);
00820 }
00821 catch (...)
00822 {
00823 format_logging(0, __FILE__, __LINE__, en_log_error, "v_on_accept exception for socket %d", block->_socket_ident);
00824 assert(false);
00825 }
00826 }
00827 else
00828 {
00829
00830 guard.unlock();
00831 format_logging(0, __FILE__, __LINE__, en_log_error, "socket %d not found", block->_socket_ident);
00832 }
00833
00834
00835 _in_thread.wakeup();
00836 break;
00837 case AIOSOCK_SEND:
00838
00839 guard.unlock();
00840
00841 try
00842 {
00843
00844 client_obj->v_on_send(block->_socket_ident, (void*)block->_buf, block->_len, block->_processed, block->_address, block->_userdata);
00845 }
00846 catch (...)
00847 {
00848 format_logging(0, __FILE__, __LINE__, en_log_error, "v_on_send exception for socket %d", block->_socket_ident);
00849 assert(false);
00850 }
00851 break;
00852 case AIOSOCK_RECV:
00853
00854 guard.unlock();
00855
00856 try
00857 {
00858
00859 client_obj->v_on_receive(block->_socket_ident, (void*)block->_buf, block->_len, block->_processed, block->_address, block->_userdata);
00860 }
00861 catch (...)
00862 {
00863 format_logging(0, __FILE__, __LINE__, en_log_error, "v_on_receive exception for socket %d", block->_socket_ident);
00864 assert(false);
00865 }
00866 break;
00867 default:
00868 assert(false);
00869
00870 }
00871 }
00872
00873
00874 guard.lock();
00875
00876
00877
00878 iter_sock = _socket_map.find(block->_socket_ident);
00879
00880
00881 if (iter_sock == _socket_map.end())
00882 {
00883
00884 aiosock_delay_key_t::iterator iter_delay = _delay_key_map.find(block->_socket_ident);
00885
00886 if (iter_delay != _delay_key_map.end())
00887 {
00888 if (--*iter_delay <= 0)
00889 {
00890
00891 _delay_key_map.erase(iter_delay);
00892
00893 _socket_generator.save(block->_socket_ident);
00894 }
00895 }
00896 }
00897 else
00898 {
00899
00900 --iter_sock->_callback_invoking;
00901 }
00902
00903
00904 _put_block(block);
00905 }
00906 break;
00907 }
00908 }
00909
00910
00911 size_t
00912 aiosock::create(terimber_aiosock_callback* callback, bool tcp_udp)
00913 {
00914 if (!_aiosock_io_handle)
00915 {
00916 format_logging(0, __FILE__, __LINE__, en_log_error, "aiosock is not initialized");
00917 return 0;
00918 }
00919
00920
00921 #if OS_TYPE == OS_WIN32
00922 aio_sock_handle handle = ::WSASocket(PF_INET, tcp_udp ? SOCK_STREAM : SOCK_DGRAM, tcp_udp ? IPPROTO_TCP : IPPROTO_UDP, 0, 0, WSA_FLAG_OVERLAPPED);
00923 #else
00924 aio_sock_handle handle = ::socket(PF_INET, tcp_udp ? SOCK_STREAM : SOCK_DGRAM, tcp_udp ? IPPROTO_TCP : IPPROTO_UDP);
00925 #endif
00926 if (handle == INVALID_SOCKET)
00927 {
00928 format_logging(0, __FILE__, __LINE__, en_log_error, "can not create socket");
00929 return 0;
00930 }
00931
00932
00933 mutex_keeper guard(_mtx);
00934
00935 return _assign_socket(handle, callback, tcp_udp);
00936 }
00937
00938
00939 void
00940 aiosock::close(size_t ident)
00941 {
00942 if (!_aiosock_io_handle)
00943 {
00944 format_logging(0, __FILE__, __LINE__, en_log_error, "aiosock is not initialized");
00945 return;
00946 }
00947
00948
00949 mutex_keeper guard(_mtx);
00950
00951 aiosock_socket_map_iterator_t iter = _socket_map.find(ident);
00952 if (iter == _socket_map.end())
00953 {
00954 format_logging(0, __FILE__, __LINE__, en_log_error, "socket %d not found", ident);
00955 return;
00956 }
00957
00958 size_t handle = iter->_handle;
00959 bool tcp_udp = iter->_tcp_udp;
00960 bool delay_key = false;
00961
00962
00963 iter->_incoming_list.erase(_incoming_list_allocator, iter->_incoming_list.begin(), iter->_incoming_list.end());
00964
00965 if (iter->_callback_invoking)
00966 {
00967 delay_key = true;
00968 aiosock_delay_key_t::iterator iter_delay = _delay_key_map.find(ident);
00969 if (iter_delay != _delay_key_map.end())
00970 *iter_delay += iter->_callback_invoking;
00971 else
00972 _delay_key_map.insert(ident, iter->_callback_invoking);
00973 }
00974
00975 _reverse_map.erase(iter->_handle);
00976 _socket_map.erase(iter);
00977
00978
00979 for (aiosock_pblock_list_t::iterator in_iter = _initial_list.begin(); in_iter != _initial_list.end();)
00980 {
00981 aiosock_block* block = *in_iter;
00982 if (block->_socket_ident == ident)
00983 {
00984 _put_block(block);
00985 in_iter = _initial_list.erase(in_iter);
00986 }
00987 else
00988 ++in_iter;
00989 }
00990
00991 for (aiosock_pblock_list_t::iterator out_iter = _outgoing_list.begin(); out_iter != _outgoing_list.end();)
00992 {
00993 aiosock_block* block = *out_iter;
00994 if (block->_socket_ident == ident)
00995 {
00996 _put_block(block);
00997 out_iter = _outgoing_list.erase(out_iter);
00998 }
00999 else
01000 ++out_iter;
01001 }
01002
01003 _cancel_socket(handle);
01004
01005 #if OS_TYPE == OS_WIN32
01006
01007 for (aiosock_pblock_list_t::iterator iter_list = iter->_incoming_list.begin(); iter_list != iter->_incoming_list.end();)
01008 {
01009 _abounded_list.push_back(*iter_list);
01010 iter_list = iter->_incoming_list.erase(_incoming_list_allocator, iter_list);
01011 }
01012 #else
01013
01014 for (aiosock_pblock_list_t::iterator iter_list = iter->_incoming_list.begin(); iter_list != iter->_incoming_list.end();)
01015 {
01016 aiosock_block* block = *iter_list;
01017 _put_block(block);
01018 iter_list = iter->_incoming_list.erase(_incoming_list_allocator, iter_list);
01019 }
01020
01021 #endif
01022
01023 guard.unlock();
01024
01025 _close_socket(handle, tcp_udp);
01026
01027 guard.lock();
01028
01029 aiosock_listener_map_t::iterator iter_listener = _listeners_map.find(ident);
01030
01031 if (iter_listener != _listeners_map.end())
01032 _listeners_map.erase(iter_listener);
01033
01034 if (!delay_key)
01035
01036 _socket_generator.save(ident);
01037
01038 format_logging(0, __FILE__, __LINE__, en_log_info, "socket handle %u is closed", handle);
01039 }
01040
01041
01042
01043 int
01044 aiosock::bind(size_t ident, const char* address, unsigned short port)
01045 {
01046 sockaddr_in ipAddr;
01047 if (!resolve_socket_address(address, port, ipAddr))
01048 {
01049 format_logging(0, __FILE__, __LINE__, en_log_error, "can not resolve socket %d, address %s, port %hu", ident, address, port);
01050 return -1;
01051 }
01052
01053 aio_sock_handle handle = find_socket_handle(ident);
01054
01055 if (handle == INVALID_SOCKET)
01056 {
01057 format_logging(0, __FILE__, __LINE__, en_log_error, "socket %d not found", ident);
01058 return -1;
01059 }
01060
01061
01062 if (::bind(handle, (struct sockaddr*)&ipAddr, sizeof(struct sockaddr_in)))
01063 {
01064 format_logging(0, __FILE__, __LINE__, en_log_error, "can not bind socket %d, address %s, port %hu", ident, address, port);
01065 return -1;
01066 }
01067
01068 return 0;
01069 }
01070
01071
01072
01073 int
01074 aiosock::send(size_t ident, const void* buf, size_t len, size_t timeout, const sockaddr_in* toaddr, void* userdata)
01075 {
01076
01077 mutex_keeper guard(_mtx);
01078
01079 aiosock_block* block = _get_block();
01080
01081 block->settimeout(timeout);
01082
01083 block->_type = AIOSOCK_SEND;
01084
01085 block->_userdata = userdata;
01086
01087 block->_buf = (char*)buf;
01088
01089 block->_len = len;
01090
01091 return _activate_block(ident, block, toaddr);
01092 }
01093
01094
01095
01096
01097 int
01098 aiosock::receive(size_t ident, void* buf, size_t len, size_t timeout, const sockaddr_in* fromaddr, void* userdata)
01099 {
01100
01101 mutex_keeper guard(_mtx);
01102
01103 aiosock_block* block = _get_block();
01104
01105 block->settimeout(timeout);
01106
01107 block->_type = AIOSOCK_RECV;
01108
01109 block->_userdata = userdata;
01110
01111 block->_buf = (char*)buf;
01112
01113 block->_len = len;
01114
01115 return _activate_block(ident, block, fromaddr);
01116 }
01117
01118
01119
01120 int
01121 aiosock::connect(size_t ident, const char* address, unsigned short port, size_t timeout, void* userdata)
01122 {
01123 sockaddr_in ipAddr;
01124 if (!resolve_socket_address(address, port, ipAddr))
01125 {
01126 format_logging(0, __FILE__, __LINE__, en_log_error, "can not resolve socket, address %s, port %hu", address, port);
01127 return -1;
01128 }
01129
01130
01131 mutex_keeper guard(_mtx);
01132
01133 aiosock_block* block = _get_block();
01134
01135 block->settimeout(timeout);
01136
01137 block->_address = ipAddr;
01138
01139 block->_type = AIOSOCK_CONNECT;
01140
01141 block->_userdata = userdata;
01142
01143 return _activate_block(ident, block, 0);
01144 }
01145
01146
01147
01148 int
01149 aiosock::listen(size_t ident, unsigned short port, size_t max_connection, const char* address, unsigned short accept_pool, void* userdata)
01150 {
01151
01152 sockaddr_in ipAddr;
01153 if (!resolve_socket_address(address, port, ipAddr))
01154 {
01155 format_logging(0, __FILE__, __LINE__, en_log_error, "can not resolve socket %d, address %s, port %hu", ident, address, port);
01156 return -1;
01157 }
01158
01159
01160 aio_sock_handle handle = find_socket_handle(ident);
01161
01162 if (handle == INVALID_SOCKET)
01163 {
01164 format_logging(0, __FILE__, __LINE__, en_log_error, "socket %d not found", ident);
01165 return -1;
01166 }
01167
01168
01169 if (::bind(handle, (const sockaddr*)&ipAddr, sizeof(struct sockaddr_in)))
01170 {
01171 format_logging(0, __FILE__, __LINE__, en_log_error, "can not bind socket %d, address %s, port %hu", ident, address, port);
01172 return -1;
01173 }
01174
01175
01176 if (::listen(handle, (int)max_connection))
01177 {
01178 format_logging(0, __FILE__, __LINE__, en_log_error, "can not start listener %d, address %s, port %hu", ident, address, port);
01179 return -1;
01180 }
01181
01182
01183 mutex_keeper guard(_mtx);
01184
01185 aiosock_listener_map_t::iterator iter_listener = _listeners_map.find(ident);
01186 if (iter_listener == _listeners_map.end())
01187 {
01188
01189 listener_info dummy(0, accept_pool, userdata);
01190 _listeners_map.insert(ident, dummy);
01191 }
01192
01193
01194 guard.unlock();
01195
01196
01197 _in_thread.wakeup();
01198
01199 format_logging(0, __FILE__, __LINE__, en_log_info, "listener started, address %s, port %hu", address, port);
01200 return 0;
01201 }
01202
01203
01204
01205 int
01206 aiosock::getpeeraddr(size_t ident, sockaddr_in& addr)
01207 {
01208
01209 aio_sock_handle handle = find_socket_handle(ident);
01210
01211 if (handle == INVALID_SOCKET)
01212 {
01213 format_logging(0, __FILE__, __LINE__, en_log_error, "socket %d not found", ident);
01214 return -1;
01215 }
01216
01217 #if OS_TYPE == OS_WIN32
01218 int
01219 #else
01220 socklen_t
01221 #endif
01222 len = sizeof(sockaddr_in);
01223 return ::getpeername(handle, (sockaddr*)&addr, &len);
01224 }
01225
01226
01227
01228 int
01229 aiosock::getsockaddr(size_t ident, sockaddr_in& addr)
01230 {
01231
01232 aio_sock_handle handle = find_socket_handle(ident);
01233
01234 if (handle == INVALID_SOCKET)
01235 {
01236 format_logging(0, __FILE__, __LINE__, en_log_error, "socket %d not found", ident);
01237 return -1;
01238 }
01239
01240 #if OS_TYPE == OS_WIN32
01241 int
01242 #else
01243 socklen_t
01244 #endif
01245 len = sizeof(sockaddr_in);
01246 return ::getsockname(handle, (sockaddr*)&addr, &len);
01247 }
01248
01249
01250
01251 void
01252 aiosock::doxray()
01253 {
01254 mutex_keeper guard(_mtx);
01255
01256 size_t socks = _socket_map.size(),
01257 listeners = _listeners_map.size(),
01258 delay_actions = _delay_key_map.size(),
01259 initiated_actions = _initial_list.size(),
01260 completed_actions = _outgoing_list.size(),
01261 abounded_actions =
01262 #if OS_TYPE == OS_WIN32
01263 _abounded_list.size();
01264 #else
01265 0;
01266 #endif
01267
01268 guard.unlock();
01269
01270 format_logging(0, __FILE__, __LINE__, en_log_xray, "<aiosock socks=\"%d\" listeners = \"%d\" delayed=\"%d\" initiated=\"%d\" completed=\"%d\" abounded=\"%d\" />",
01271 socks, listeners, delay_actions, initiated_actions, completed_actions, abounded_actions);
01272
01273 #if OS_TYPE != OS_WIN32
01274 TERIMBER::DoXRay();
01275 #endif
01276 _thread_pool.doxray();
01277 }
01278
01279 size_t
01280 aiosock::_assign_socket(aio_sock_handle handle, terimber_aiosock_callback* callback, bool tcp_udp)
01281 {
01282
01283 aiosock_socket new_socket(tcp_udp, handle, callback);
01284
01285
01286 size_t ident = _socket_generator.generate();
01287
01288
01289 aiosock_socket_map_iterator_t iter_sock = _socket_map.insert(ident, new_socket).first;
01290 if (iter_sock == _socket_map.end())
01291 {
01292 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
01293 _close_socket(new_socket._handle, tcp_udp);
01294 _socket_generator.save(ident);
01295 return 0;
01296 }
01297
01298
01299 aiosock_reverse_map_iterator_t iter_reverse = _reverse_map.insert(handle, iter_sock).first;
01300 if (iter_reverse == _reverse_map.end())
01301 {
01302 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
01303 _close_socket(new_socket._handle, tcp_udp);
01304 _socket_map.erase(iter_sock);
01305 _socket_generator.save(ident);
01306 return 0;
01307 }
01308
01309
01310 if (tcp_udp)
01311 {
01312 linger lingerStruct;
01313
01314 lingerStruct.l_onoff = 0;
01315 lingerStruct.l_linger = 0;
01316 ::setsockopt(handle, SOL_SOCKET, SO_LINGER, (const char*)&lingerStruct, sizeof(lingerStruct));
01317
01318 int noDelay = 1;
01319 ::setsockopt(handle, IPPROTO_TCP, TCP_NODELAY, (const char*)&noDelay, sizeof(noDelay));
01320
01321 int keepAlive = 1;
01322 ::setsockopt(handle, SOL_SOCKET, SO_KEEPALIVE, (const char*)&keepAlive, sizeof(keepAlive));
01323 }
01324
01325
01326 int reUse = 1;
01327 ::setsockopt(handle, SOL_SOCKET, SO_REUSEADDR, (const char*)&reUse, sizeof(reUse));
01328
01329 #if OS_TYPE == OS_WIN32
01330
01331 if (!::CreateIoCompletionPort((HANDLE)new_socket._handle,
01332 (HANDLE)_aiosock_io_handle,
01333 (DWORD)ident,
01334 3))
01335 #else
01336
01337 if (!TERIMBER::CreateIoCompletionPort(new_socket._handle,
01338 _aiosock_io_handle,
01339 ident,
01340 tcp_udp ? TYPE_TCP : TYPE_UDP))
01341
01342 #endif
01343 {
01344 format_logging(0, __FILE__, __LINE__, en_log_error, "can not assign socket handle to completion port");
01345 _close_socket(new_socket._handle, true);
01346 _reverse_map.erase(iter_reverse);
01347 _socket_map.erase(iter_sock);
01348 _socket_generator.save(ident);
01349 return 0;
01350 }
01351
01352 format_logging(0, __FILE__, __LINE__, en_log_info, "assign socekt handle %u, ident %u is open", new_socket._handle, ident);
01353
01354 return ident;
01355 }
01356
01357 void
01358 aiosock::_close_socket(aio_sock_handle handle, bool tcp_udp)
01359 {
01360 static char rbuf[512];
01361
01362
01363 #if OS_TYPE == OS_WIN32
01364 if (tcp_udp)
01365 ::shutdown(handle, SD_SEND);
01366 #else
01367 if (tcp_udp)
01368 ::shutdown(handle, SHUT_WR);
01369 #endif
01370
01371 if (tcp_udp)
01372 {
01373
01374 fd_set recv_set;
01375 FD_ZERO(&recv_set);
01376 FD_SET(handle, &recv_set);
01377
01378 struct timeval timeout_val = {0, 1000};
01379 int res = 0;
01380
01381 do
01382 {
01383 res = ::select((int)handle + 1, &recv_set, 0, 0, &timeout_val);
01384
01385 if (res)
01386 {
01387 res = ::recv(handle, rbuf, 512, 0);
01388 if (res > 0 && res < 512)
01389 break;
01390 }
01391 }
01392 while (res > 0);
01393 }
01394
01395
01396 #if OS_TYPE == OS_WIN32
01397 if (tcp_udp)
01398 ::shutdown(handle, SD_RECEIVE);
01399
01400 ::closesocket(handle);
01401 #else
01402 if (tcp_udp)
01403 ::shutdown(handle, SHUT_RD);
01404
01405 ::close(handle);
01406 #endif
01407
01408 format_logging(0, __FILE__, __LINE__, en_log_info, "socket closed, handle: %d", handle);
01409 }
01410
01411 void
01412 aiosock::_cancel_socket(aio_sock_handle handle)
01413 {
01414
01415 #if OS_TYPE == OS_WIN32
01416 ::CancelIo((HANDLE)handle);
01417 #else
01418 TERIMBER::CancelIo(handle, 0);
01419 #endif
01420 }
01421
01422 #if OS_TYPE != OS_WIN32
01423
01425 void
01426 aiosock::_cancel_aio( aio_sock_handle handle,
01427 LPOVERLAPPED overlapped
01428 )
01429 {
01430 TERIMBER::CancelIo(handle, overlapped);
01431 }
01432
01433 #endif
01434
01435 aio_sock_handle
01436 aiosock::find_socket_handle(size_t ident)
01437 {
01438
01439 mutex_keeper guard(_mtx);
01440
01441 aiosock_socket_map_iterator_t iter = _socket_map.find(ident);
01442 return (iter != _socket_map.end()) ? iter->_handle : INVALID_SOCKET;
01443 }
01444
01445 bool
01446 aiosock::resolve_socket_address(const char* address, unsigned short port, sockaddr_in& addr)
01447 {
01448 in_addr ipAddr;
01449 memset(&addr, 0, sizeof(sockaddr_in));
01450
01451 if (address)
01452 {
01453 bool askDNS = false;
01454 for (const char* s = address; *s ; ++s)
01455 {
01456 if (!isdigit( *s ) && *s != '.')
01457 {
01458 askDNS = true;
01459 break;
01460 }
01461 }
01462
01463 if (askDNS)
01464 {
01465 struct hostent* host = gethostbyname(address);
01466 if (!host)
01467 return false;
01468
01469 memcpy(&ipAddr, host->h_addr, host->h_length);
01470 }
01471 else
01472 {
01473 ipAddr.s_addr = inet_addr(address);
01474 if (ipAddr.s_addr == INADDR_NONE)
01475 return false;
01476 }
01477 }
01478 else
01479 ipAddr.s_addr = htonl(INADDR_LOOPBACK);
01480
01481 addr.sin_family = AF_INET;
01482 addr.sin_port = htons(port);
01483 addr.sin_addr = ipAddr;
01484
01485 return true;
01486 }
01487
01488 int
01489 aiosock::_process_block(aiosock_block* block)
01490 {
01491 if (!_aiosock_io_handle)
01492 {
01493 format_logging(0, __FILE__, __LINE__, en_log_error, "aiosock is not initialized");
01494 return -1;
01495 }
01496
01497 aiosock_socket_map_iterator_t iter_sock = _socket_map.find(block->_socket_ident);
01498 if (iter_sock == _socket_map.end())
01499 {
01500 format_logging(0, __FILE__, __LINE__, en_log_error, "socket %d not found", block->_socket_ident);
01501 return -1;
01502 }
01503
01504 aio_sock_handle handle = iter_sock->_handle;
01505 terimber_aiosock_callback* client = iter_sock->_client_obj;
01506
01507
01508 iter_sock->_incoming_list.push_back(_incoming_list_allocator, block);
01509
01510
01511
01512 int res = 0;
01513 switch (block->_type)
01514 {
01515 case AIOSOCK_CONNECT:
01516 res = _process_connect(handle, block);
01517 #if OS_TYPE == OS_WIN32
01518 #else
01519 if (res == EINPROGRESS)
01520 res = EWOULDBLOCK;
01521 #endif
01522 break;
01523 case AIOSOCK_ACCEPT:
01524
01525 res = _process_accept(handle, client, block);
01526 break;
01527 case AIOSOCK_SEND:
01528 res = _process_send(handle, block, iter_sock->_tcp_udp);
01529 break;
01530 case AIOSOCK_RECV:
01531 res = _process_recv(handle, block, iter_sock->_tcp_udp);
01532 break;
01533 }
01534
01535
01536
01537
01538
01539
01540 if (!res
01541 || res
01542 #if OS_TYPE == OS_WIN32
01543 && (
01544 res != ERROR_IO_PENDING
01545 && res != WSAEWOULDBLOCK
01546 )
01547 #else
01548 && res != EWOULDBLOCK
01549 #endif
01550 )
01551 {
01552
01553 iter_sock->_incoming_list.pop_back(_incoming_list_allocator);
01554
01555
01556 block->_err = res;
01557
01558
01559 if (!res)
01560 {
01561
01562 _outgoing_list.push_back(block);
01563
01564
01565 if (!_capacity || !_thread_pool.borrow_from_range(aiosock_working_ident, aiosock_working_ident + _capacity, 0, this, aiosock_working_thread_alert))
01566 _thread_pool.borrow_thread(aiosock_working_ident, 0, this, aiosock_working_thread_alert);
01567
01568 return 0;
01569 }
01570 else
01571 {
01572
01573 return res;
01574 }
01575 }
01576 else
01577 {
01578 block->_err = 0;
01579 block->_processed = 0;
01580 return 0;
01581 }
01582 }
01583
01584
01585 int
01586 aiosock::_activate_block(size_t ident, aiosock_block* block, const sockaddr_in* paddr)
01587 {
01588
01589 aiosock_socket_map_iterator_t iter_sock = _socket_map.find(ident);
01590 if (iter_sock == _socket_map.end())
01591 {
01592 format_logging(0, __FILE__, __LINE__, en_log_error, "socket %d not found", ident);
01593 _put_block(block);
01594 return -1;
01595 }
01596
01597
01598 aio_sock_handle handle = iter_sock->_handle;
01599
01600 terimber_aiosock_callback* client = iter_sock->_client_obj;
01601
01602
01603 block->_socket_ident = ident;
01604
01605
01606 if (!iter_sock->_tcp_udp)
01607 {
01608 assert(paddr != 0);
01609 block->_address = *paddr;
01610 }
01611
01612
01613 _initial_list.push_back(block);
01614
01615
01616 _in_thread.wakeup();
01617
01618 return 0;
01619 }
01620
01621 int
01622 aiosock::_process_connect(aio_sock_handle handle, aiosock_block* block)
01623 {
01624
01625 ::setsockopt(handle, SOL_SOCKET, SO_SNDTIMEO, (const char*)&block->_timeout, sizeof(block->_timeout));
01626 #if OS_TYPE == OS_WIN32
01627
01628 GUID dummy_connect = GUID_MSWSOCK_CONNECTEX;
01629 PCONNECTEX _ConnectEx = 0;
01630 DWORD dwBytes = 0;
01631
01632 sockaddr_in ipAddr;
01633 memset(&ipAddr, 0, sizeof(sockaddr_in));
01634 ipAddr.sin_family = AF_INET;
01635 ipAddr.sin_port = htons(0);
01636
01637 if (SOCKET_ERROR == ::WSAIoctl(handle, SIO_GET_EXTENSION_FUNCTION_POINTER,
01638 &dummy_connect, sizeof(dummy_connect), &_ConnectEx, sizeof(_ConnectEx), &dwBytes, 0, 0)
01639 || SOCKET_ERROR == ::bind(handle, (const struct sockaddr*)&ipAddr, sizeof(struct sockaddr_in))
01640 )
01641 {
01642 return (block->_err = ::GetLastError());
01643 }
01644 else
01645 {
01646 return (*_ConnectEx)(handle,
01647 (const struct sockaddr*)&block->_address,
01648 sizeof(struct sockaddr_in),
01649 0,
01650 0,
01651 0,
01652 block) ?
01653 ERROR_IO_PENDING :
01654 (block->_err = ::GetLastError());
01655 }
01656
01657 #else
01658 return TERIMBER::ConnectEx(handle,
01659 &block->_address,
01660 block) ?
01661 EWOULDBLOCK :
01662 (block->_err = errno);
01663 #endif
01664 }
01665
01666 int
01667 aiosock::_process_accept(aio_sock_handle handle, terimber_aiosock_callback* client, aiosock_block* block)
01668 {
01669 #if OS_TYPE == OS_WIN32
01670
01671 aio_sock_handle accept_handle = ::socket(AF_INET, SOCK_STREAM, 0);
01672 if (accept_handle == INVALID_SOCKET)
01673 return (block->_err = ::WSAGetLastError());
01674
01675 block->_accept_ident = _assign_socket(accept_handle, client, true);
01676
01677 if (!block->_accept_ident)
01678 return WSAEINVAL;
01679
01680 GUID dummy_accept = GUID_MSWSOCK_ACCEPTEX;
01681 PACCEPTEX _AcceptEx = 0;
01682 DWORD dwBytes = 0;
01683
01684
01685 if (SOCKET_ERROR == ::WSAIoctl(handle, SIO_GET_EXTENSION_FUNCTION_POINTER,
01686 &dummy_accept, sizeof(dummy_accept), &_AcceptEx, sizeof(_AcceptEx), &dwBytes, 0, 0))
01687 {
01688 close(block->_accept_ident);
01689 return (block->_err = ::GetLastError());
01690 }
01691 else
01692 {
01693 if ((*_AcceptEx)(handle,
01694 accept_handle,
01695 block->_accept_buf,
01696 0,
01697 sizeof(struct sockaddr_in) + 16,
01698 sizeof(struct sockaddr_in) + 16,
01699 (LPDWORD)&block->_processed,
01700 block))
01701 {
01702 return ERROR_IO_PENDING;
01703 }
01704 else
01705 {
01706 if (ERROR_IO_PENDING != (block->_err = ::GetLastError()))
01707 close(block->_accept_ident);
01708
01709 return block->_err;
01710 }
01711 }
01712 #else
01713 return TERIMBER::AcceptEx(handle,
01714 block) ?
01715 EWOULDBLOCK :
01716 (block->_err = errno);
01717 #endif
01718 }
01719
01720 int
01721 aiosock::_process_send(aio_sock_handle handle, aiosock_block* block, bool tcp_udp)
01722 {
01723 int zero = (int)block->_len;
01724
01725
01726 if (tcp_udp)
01727 ::setsockopt(handle, SOL_SOCKET, SO_SNDBUF, (const char*)&zero, sizeof(zero));
01728
01729 ::setsockopt(handle, SOL_SOCKET, SO_SNDTIMEO, (const char*)&block->_timeout, sizeof(block->_timeout));
01730
01731
01732 #if OS_TYPE == OS_WIN32
01733 if (tcp_udp)
01734 {
01735
01736
01737 return ::WriteFile((HANDLE)handle,
01738 block->_buf,
01739 (DWORD)block->_len,
01740 (LPDWORD)&block->_processed,
01741 block) ?
01742 ERROR_IO_PENDING :
01743 (block->_err = ::GetLastError());
01744 }
01745 else
01746 {
01747 WSABUF buf;
01748 buf.buf = block->_buf;
01749 buf.len = (u_long)block->_len;
01750 return ::WSASendTo(handle, &buf, 1, (LPDWORD)&block->_processed, 0, (const sockaddr *)&block->_address, sizeof(block->_address), block, 0) ?
01751 ERROR_IO_PENDING :
01752 (block->_err = ::GetLastError());
01753 }
01754 #else
01755 return ((tcp_udp) ? TERIMBER::WSASend(handle, block->_buf, block->_len, block)
01756 : TERIMBER::WSASendTo(handle, block->_buf, block->_len, &block->_address, block)) ?
01757 EWOULDBLOCK :
01758 (block->_err = errno);
01759 #endif
01760 }
01761
01762 int
01763 aiosock::_process_recv(aio_sock_handle handle, aiosock_block* block, bool tcp_udp)
01764 {
01765 int zero = (int)block->_len;
01766
01767
01768 if (tcp_udp)
01769 ::setsockopt(handle, SOL_SOCKET, SO_RCVBUF, (char*)&zero, sizeof(zero));
01770
01771 ::setsockopt(handle, SOL_SOCKET, SO_RCVTIMEO, (const char*)&block->_timeout, sizeof(block->_timeout));
01772 #if OS_TYPE == OS_WIN32
01773
01774
01775 if (tcp_udp)
01776 {
01777 return ::ReadFile((HANDLE)handle,
01778 block->_buf,
01779 (DWORD)block->_len,
01780 (LPDWORD)&block->_processed,
01781 block) ?
01782 ERROR_IO_PENDING :
01783 (block->_err = ::GetLastError());
01784 }
01785 else
01786 {
01787 WSABUF buf;
01788 buf.buf = block->_buf;
01789 buf.len = (u_long)block->_len;
01790
01791 block->_accept_ident = sizeof(sockaddr_in);
01792 block->_flags = 0;
01793 return ::WSARecvFrom(handle, &buf, 1, (LPDWORD)&block->_processed, (LPDWORD)&block->_flags, (sockaddr *)&block->_address, (int*)&block->_accept_ident, block, 0) ?
01794 ERROR_IO_PENDING :
01795 (block->_err = ::GetLastError());
01796 }
01797 #else
01798 return ((tcp_udp) ? TERIMBER::WSARecv(handle, block->_buf, block->_len, block)
01799 : TERIMBER::WSARecvFrom(handle, block->_buf, block->_len, &block->_address, block)) ?
01800 EWOULDBLOCK :
01801 (block->_err = errno);
01802
01803 #endif
01804 }
01805
01806 void
01807 aiosock::process_timeouted_blocks()
01808 {
01809
01810 date now;
01811 sb8_t unow = (sb8_t)now;
01812
01813
01814 mutex_keeper guard(_mtx);
01815
01816 for (aiosock_socket_map_iterator_t iter_socket = _socket_map.begin(); iter_socket != _socket_map.end(); ++iter_socket)
01817 {
01818 size_t socket_key = iter_socket.key();
01819 for (aiosock_pblock_alloc_list_t::iterator iter_block = iter_socket->_incoming_list.begin();
01820 iter_block != iter_socket->_incoming_list.end();)
01821 {
01822
01823 aiosock_block* block = *iter_block;
01824
01825 if (block->_expired == 0
01826 || unow > block->_expired)
01827 {
01828 ++iter_block;
01829 continue;
01830 }
01831
01832
01833 aio_sock_handle handle = iter_socket->_handle;
01834 terimber_aiosock_callback* client_obj = iter_socket->_client_obj;
01835
01836
01837 iter_block = iter_socket->_incoming_list.erase(_incoming_list_allocator, iter_block);
01838
01839
01840 ++iter_socket->_callback_invoking;
01841
01842 guard.unlock();
01843
01844 format_logging(0, __FILE__, __LINE__, en_log_error, "timeouted %s action for socket %d",
01845 block->_type == AIOSOCK_CONNECT ? "connect" :
01846 (block->_type == AIOSOCK_ACCEPT ? "accept" :
01847 (block->_type == AIOSOCK_RECV ? "recv" : "send")),
01848 socket_key);
01849
01850
01851 try
01852 {
01853 client_obj->v_on_error(block->_socket_ident,
01854 #if OS_TYPE == OS_WIN32
01855 WSAETIMEDOUT
01856 #else
01857 ETIMEDOUT
01858 #endif
01859 , block->_type
01860 , block->_userdata);
01861 }
01862 catch (...)
01863 {
01864 assert(false);
01865 }
01866
01867
01868 guard.lock();
01869
01870
01871 aiosock_socket_map_t::iterator iter_find = _socket_map.find(block->_socket_ident);
01872
01873 if (iter_find == _socket_map.end())
01874 {
01875
01876 aiosock_delay_key_t::iterator iter_delay = _delay_key_map.find(block->_socket_ident);
01877
01878 if (iter_delay != _delay_key_map.end())
01879 {
01880 assert(*iter_delay > 0);
01881
01882 if (--*iter_delay <= 0)
01883 {
01884 _delay_key_map.erase(iter_delay);
01885 _socket_generator.save(block->_socket_ident);
01886 }
01887 }
01888
01889 }
01890 else
01891 {
01892
01893 assert(iter_find->_callback_invoking > 0);
01894 --iter_find->_callback_invoking;
01895 }
01896
01897
01898 #if OS_TYPE == OS_WIN32
01899
01900 _abounded_list.push_back(block);
01901 #else
01902 _cancel_aio(handle, block);
01903 _put_block(block);
01904 #endif
01905
01906 return;
01907 }
01908 }
01909 }
01910
01911 bool
01912 aiosock::process_accept_blocks()
01913 {
01914
01915 mutex_keeper guard(_mtx);
01916
01917
01918 for (aiosock_listener_map_t::iterator iter = _listeners_map.begin(); iter != _listeners_map.end(); ++iter)
01919 {
01920
01921 if (iter->_curr_count >= iter->_max_count)
01922 continue;
01923
01924
01925 aiosock_block* block = _get_block();
01926
01927 block->settimeout(INFINITE);
01928
01929 block->_type = AIOSOCK_ACCEPT;
01930
01931 block->_userdata = iter->_userdata;
01932
01933 if (!_activate_block(iter.key(), block, 0))
01934 {
01935
01936 ++iter->_curr_count;
01937 return true;
01938 }
01939 }
01940
01941 return false;
01942 }
01943
01944 #pragma pack()
01945 END_TERIMBER_NAMESPACE