00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "aiogate/aiogate.h"
00029 #include "base/map.hpp"
00030 #include "base/list.hpp"
00031 #include "base/memory.hpp"
00032 #include "base/common.hpp"
00033
00034
00035 terimber_aiogate_factory::terimber_aiogate_factory()
00036 {
00037 }
00038
00039
00040
00041
00042 terimber_aiogate*
00043 terimber_aiogate_factory::get_terimber_aiogate(terimber_log* log, size_t addional_working_threads, size_t mem_usage_level)
00044 {
00045
00046 TERIMBER::aiogate* obj = new TERIMBER::aiogate(addional_working_threads, mem_usage_level);
00047 if (obj)
00048 {
00049
00050 obj->log_on(log);
00051
00052 obj->on();
00053 }
00054
00055 return obj;
00056 }
00057
00058 BEGIN_TERIMBER_NAMESPACE
00059 #pragma pack(4)
00060
00061
00062 aiogate::aiogate(size_t addional_working_threads, size_t mem_usage) :
00063 _on(false),
00064 _pin_port(addional_working_threads, 60000),
00065 _pin_allocator(__max(__min(mem_usage / BUFFER_CHUNK, (size_t)64), (size_t)(1024*1024 / BUFFER_CHUNK)))
00066 {
00067 }
00068
00069
00070 aiogate::~aiogate()
00071 {
00072
00073 off();
00074 }
00075
00076
00077
00078 void
00079 aiogate::v_on_error(size_t handle, int err, aiosock_type mask, void* userdata)
00080 {
00081 if (mask == AIOSOCK_ACCEPT)
00082 return;
00083
00084
00085 mutex_keeper keeper(_pin_mtx);
00086
00087
00088 pin_map_t::iterator it_pin = _pin_map.find(handle);
00089 if (it_pin == _pin_map.end()
00090 || !it_pin->_still_alive)
00091 {
00092 format_logging(0, __FILE__, __LINE__, en_log_warning, "pin %d not found", handle);
00093 return;
00094 }
00095
00096
00097 switch ((size_t)userdata)
00098 {
00099 case aiogate_accept_mask:
00100 assert(mask == AIOSOCK_ACCEPT);
00101 it_pin->_in_progress_mask &= ~aiogate_accept_mask;
00102 format_logging(0, __FILE__, __LINE__, en_log_info, "pin %d closed on accept error %d", handle, err);
00103 break;
00104 case aiogate_connect_mask:
00105 assert(mask == AIOSOCK_CONNECT);
00106 it_pin->_in_progress_mask &= ~aiogate_connect_mask;
00107 format_logging(0, __FILE__, __LINE__, en_log_info, "pin %d closed on connect error %d", handle, err);
00108 break;
00109 case aiogate_send_mask:
00110 assert(mask == AIOSOCK_SEND);
00111 it_pin->_in_progress_mask &= ~aiogate_send_mask;
00112 format_logging(0, __FILE__, __LINE__, en_log_info, "pin %d closed on send error %d", handle, err);
00113 break;
00114 case aiogate_recv_mask:
00115 assert(mask == AIOSOCK_RECV);
00116 it_pin->_in_progress_mask &= ~aiogate_recv_mask;
00117 format_logging(0, __FILE__, __LINE__, en_log_info, "pin %d closed on receive error %d", handle, err);
00118 break;
00119 default:
00120 break;
00121 }
00122
00123
00124 keeper.unlock();
00125
00126 initiate_close(handle, (size_t)userdata, true);
00127
00128 }
00129
00130 bool
00131 aiogate::on()
00132 {
00133 if (_on)
00134 {
00135 format_logging(0, __FILE__, __LINE__, en_log_error, "aiogate is already on");
00136 return false;
00137 }
00138
00139
00140 _pin_port.log_on(this);
00141
00142 _pin_port.on();
00143
00144 job_task task(this, 0, INFINITE, 0);
00145
00146 _pin_thread.start();
00147 _pin_thread.assign_job(task);
00148
00149 format_logging(0, __FILE__, __LINE__, en_log_info, "aiogate is initialized");
00150
00151
00152 _on = true;
00153 return true;
00154 }
00155
00156 void
00157 aiogate::off()
00158 {
00159 if (!_on)
00160 {
00161 format_logging(0, __FILE__, __LINE__, en_log_error, "aiogate is already off");
00162 return;
00163 }
00164
00165
00166 _pin_thread.cancel_job();
00167 _pin_thread.stop();
00168
00169
00170 _pin_port.off();
00171
00172 _pin_port.log_on(0);
00173
00174
00175 mutex_keeper keeper(_pin_mtx);
00176
00177
00178 for (pin_map_t::iterator iter = _pin_map.begin(); iter != _pin_map.end();)
00179 {
00180 iter->_factory->destroy(iter->_pin);
00181 iter = _pin_map.erase(iter);
00182 }
00183
00184
00185 while (!_pin_list.empty())
00186 {
00187 pin_info_extra& info = _pin_list.front();
00188 info._factory->destroy(info._pin);
00189 _pin_list.pop_front();
00190 }
00191
00192
00193 _pin_allocator.clear_all();
00194
00195
00196 _on = false;
00197
00198 format_logging(0, __FILE__, __LINE__, en_log_info, "aiogate is uninitialized");
00199 }
00200
00201
00202
00203 void
00204 aiogate::v_on_connect(size_t handle, const sockaddr_in& peeraddr, void* userdata)
00205 {
00206
00207 mutex_keeper keeper(_pin_mtx);
00208
00209 pin_map_t::iterator it_pin = _pin_map.find(handle);
00210 if (it_pin == _pin_map.end()
00211 || !it_pin->_still_alive)
00212 {
00213 format_logging(0, __FILE__, __LINE__, en_log_warning, "pin %d not found", handle);
00214 return;
00215 }
00216
00217
00218 sockaddr_in local, remote;
00219 if (_pin_port.getsockaddr(handle, local)
00220 || _pin_port.getpeeraddr(handle, remote)
00221 )
00222 {
00223 format_logging(0, __FILE__, __LINE__, en_log_error, "can not get peer/sock address for pin %d", handle);
00224
00225 it_pin->_in_progress_mask &= ~aiogate_connect_mask;
00226
00227
00228 keeper.unlock();
00229
00230 initiate_close(handle, aiogate_connect_mask, true);
00231 return;
00232 }
00233
00234
00235 terimber_aiogate_pin* pin = it_pin->_pin;
00236
00237 lock_pin(keeper, *it_pin, aiogate_connect_mask);
00238 try
00239 {
00240
00241 pin->on_connect(local, remote, handle, this);
00242 }
00243 catch (...)
00244 {
00245 format_logging(0, __FILE__, __LINE__, en_log_error, "on_connect callback exception, handle %d", handle);
00246 assert(false);
00247 }
00248
00249 unlock_pin(keeper, handle, aiogate_connect_mask, true);
00250 format_logging(0, __FILE__, __LINE__, en_log_info, "connect initiated for pin %d", handle);
00251 }
00252
00253
00254
00255 void
00256 aiogate::v_on_send(size_t handle, void* buf, size_t requested, size_t processed, const sockaddr_in& peeraddr, void* userdata)
00257 {
00258
00259 mutex_keeper keeper(_pin_mtx);
00260
00261 pin_map_t::iterator it_pin = _pin_map.find(handle);
00262 if (it_pin == _pin_map.end()
00263 || !it_pin->_still_alive)
00264 {
00265 format_logging(0, __FILE__, __LINE__, en_log_warning, "pin %d not found", handle);
00266 return;
00267 }
00268
00269 pin_info& r_info = *it_pin;
00270
00271
00272 assert(r_info._shead->_ptr + r_info._shead->_begin == buf);
00273
00274
00275
00276 r_info._shead->_begin += (r_info._tcp_udp ? processed : requested);
00277
00278
00279 if (r_info._shead->_begin == r_info._shead->_end)
00280 {
00281
00282 fixed_size_buffer* next = r_info._shead->_next;
00283
00284 _pin_allocator.deallocate(r_info._shead);
00285
00286
00287 if (next)
00288 r_info._shead = next;
00289 else
00290 r_info._shead = r_info._stail = 0;
00291 }
00292
00293
00294 if (r_info._shead)
00295 {
00296 sockaddr_in toaddr;
00297 size_t requested = 0;
00298
00299 if (!r_info._tcp_udp)
00300 {
00301 udp_header* uheader= (udp_header*)(r_info._shead->_ptr + r_info._shead->_begin);
00302 requested = uheader->_payload;
00303 toaddr = uheader->_addr;
00304
00305 r_info._shead->_begin += sizeof(udp_header);
00306 }
00307 else
00308 {
00309 requested = r_info._shead->_end - r_info._shead->_begin;
00310 }
00311
00312 if (_pin_port.send(handle, r_info._shead->_ptr + r_info._shead->_begin, requested, r_info._send_timeout, r_info._tcp_udp ? 0 : &toaddr, (void*)(size_t)aiogate_send_mask))
00313 {
00314 format_logging(0, __FILE__, __LINE__, en_log_error, "can not initiate send for pin %d", handle);
00315
00316 r_info._in_progress_mask &= ~aiogate_send_mask;
00317
00318 keeper.unlock();
00319
00320 initiate_close(handle, aiogate_send_mask, true);
00321 return;
00322 }
00323
00324 format_logging(0, __FILE__, __LINE__, en_log_info, "send initiated for pin %d", handle);
00325 }
00326 else
00327 {
00328
00329
00330 terimber_aiogate_pin* pin = r_info._pin;
00331
00332
00333 lock_pin(keeper, r_info, aiogate_send_mask);
00334 try
00335 {
00336 pin->on_send(peeraddr);
00337 }
00338 catch (...)
00339 {
00340 format_logging(0, __FILE__, __LINE__, en_log_error, "on_send callback exception, handle %d", handle);
00341 assert(false);
00342 }
00343
00344 unlock_pin(keeper, handle, aiogate_send_mask, true);
00345
00346
00347
00348
00349 it_pin = _pin_map.find(handle);
00350
00351 if (it_pin == _pin_map.end()
00352 || !it_pin->_still_alive)
00353 {
00354
00355 return;
00356 }
00357
00358
00359 if (it_pin->_shead && !(it_pin->_in_progress_mask & aiogate_send_mask))
00360 {
00361 it_pin->_in_progress_mask |= aiogate_send_mask;
00362
00363 sockaddr_in toaddr = peeraddr;
00364 size_t requested = 0;
00365
00366 if (!r_info._tcp_udp)
00367 {
00368 udp_header* uheader= (udp_header*)(r_info._shead->_ptr + r_info._shead->_begin);
00369 requested = uheader->_payload;
00370 toaddr = uheader->_addr;
00371
00372 r_info._shead->_begin += sizeof(udp_header);
00373 }
00374 else
00375 {
00376 requested = r_info._shead->_end - r_info._shead->_begin;
00377 }
00378
00379 if (_pin_port.send(handle, it_pin->_shead->_ptr + it_pin->_shead->_begin, requested, it_pin->_send_timeout, &toaddr, (void*)(size_t)aiogate_send_mask))
00380 {
00381 format_logging(0, __FILE__, __LINE__, en_log_error, "can not initiate send for pin %d", handle);
00382
00383 it_pin->_in_progress_mask &= ~aiogate_send_mask;
00384
00385 keeper.unlock();
00386 initiate_close(handle, aiogate_send_mask, true);
00387 return;
00388 }
00389
00390 format_logging(0, __FILE__, __LINE__, en_log_info, "send initiated for pin %d", handle);
00391 }
00392 }
00393
00394 format_logging(0, __FILE__, __LINE__, en_log_info, "send processed pin %d", handle);
00395 }
00396
00397
00398
00399 void
00400 aiogate::v_on_receive(size_t handle, void* buf, size_t requested, size_t processed, const sockaddr_in& peeraddr, void* userdata)
00401 {
00402
00403 mutex_keeper keeper(_pin_mtx);
00404
00405 pin_map_t::iterator it_pin = _pin_map.find(handle);
00406 if (it_pin == _pin_map.end()
00407 || !it_pin->_still_alive)
00408 {
00409 format_logging(0, __FILE__, __LINE__, en_log_warning, "pin %d not found", handle);
00410 return;
00411 }
00412
00413 if (!processed
00414 && it_pin->_tcp_udp
00415 )
00416 {
00417 format_logging(0, __FILE__, __LINE__, en_log_info, "receive zero byte - initiate pin closure %d", handle);
00418
00419 keeper.unlock();
00420 initiate_close(handle, aiogate_recv_mask, true);
00421 return;
00422 }
00423
00424 pin_info& r_info = *it_pin;
00425
00426
00427 assert(r_info._rbuf && r_info._rbuf->_ptr == buf || !r_info._rbuf && &r_info._leader == buf);
00428
00429
00430 terimber_aiogate_pin* pin = r_info._pin;
00431
00432
00433 lock_pin(keeper, r_info, aiogate_recv_mask);
00434 bool expected_more = false;
00435
00436 bool recv_continue = false;
00437
00438 try
00439 {
00440 recv_continue = pin->on_recv((const ub1_t*)buf, processed, peeraddr, expected_more);
00441 }
00442 catch (...)
00443 {
00444 format_logging(0, __FILE__, __LINE__, en_log_error, "on_recv callback exception, handle %d", handle);
00445 assert(false);
00446 }
00447
00448 bool alive = unlock_pin(keeper, handle, aiogate_recv_mask, true);
00449
00450 keeper.unlock();
00451
00452 if (recv_continue && alive)
00453 recv(handle, expected_more, &peeraddr);
00454
00455 format_logging(0, __FILE__, __LINE__, en_log_info, "receive processed pin %d", handle);
00456 }
00457
00458
00459
00460
00461 void
00462 aiogate::v_on_accept(size_t handle, size_t handle_accepted, terimber_aiosock_callback*& callback, const sockaddr_in& peeraddr, void* userdata)
00463 {
00464
00465 mutex_keeper keeper(_pin_mtx);
00466
00467 listener_map_t::iterator it_listener = _listener_map.find(handle);
00468 if (it_listener == _listener_map.end())
00469 {
00470 format_logging(0, __FILE__, __LINE__, en_log_error, "listener %d not found", handle);
00471
00472 _pin_port.close(handle_accepted);
00473 return;
00474 }
00475
00476
00477 terimber_aiogate_pin* pin = it_listener->_factory->create(it_listener->_arg);
00478 if (!pin)
00479 {
00480 format_logging(0, __FILE__, __LINE__, en_log_error, "can not create pin on accept, listener %d", handle);
00481
00482 _pin_port.close(handle_accepted);
00483 return;
00484 }
00485
00486
00487
00488 pin_info info;
00489 info._factory = it_listener->_factory;
00490 info._pin = pin;
00491 info._tcp_udp = true;
00492
00493 pin_map_t::pairib_t it_pin;
00494 try
00495 {
00496 it_pin = _pin_map.insert(handle_accepted, info);
00497 }
00498 catch (exception&)
00499 {
00500 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
00501
00502
00503 _pin_port.close(handle_accepted);
00504
00505 it_listener->_factory->destroy(pin);
00506 return;
00507 }
00508
00509
00510 sockaddr_in local, remote;
00511 if (_pin_port.getsockaddr(handle_accepted, local)
00512 || _pin_port.getpeeraddr(handle_accepted, remote)
00513 )
00514 {
00515 format_logging(0, __FILE__, __LINE__, en_log_error, "can not get peer/sock address for pin %d", handle);
00516
00517 _pin_map.erase(it_pin.first);
00518 _pin_port.close(handle_accepted);
00519 it_listener->_factory->destroy(pin);
00520 return;
00521 }
00522
00523
00524 lock_pin(keeper, *it_pin.first, aiogate_accept_mask);
00525 try
00526 {
00527 pin->on_accept(local, remote, handle_accepted, this);
00528 }
00529 catch (...)
00530 {
00531 format_logging(0, __FILE__, __LINE__, en_log_error, "on_accept callback exception, handle %d", handle_accepted);
00532 assert(false);
00533 }
00534 unlock_pin(keeper, handle_accepted, aiogate_accept_mask, false);
00535
00536 format_logging(0, __FILE__, __LINE__, en_log_info, "accept processed pin %d", handle);
00537 }
00538
00540 void
00541 aiogate::lock_pin(mutex_keeper& mtx, pin_info& info, ub4_t mask)
00542 {
00543 assert(mtx);
00544
00545 assert(!(info._callback_invoking_mask & mask));
00546
00547 info._callback_invoking_mask |= mask;
00548 mtx.unlock();
00549 }
00550
00551 bool
00552 aiogate::unlock_pin(mutex_keeper& mtx, size_t handle, ub4_t mask, bool unmask_in_progress)
00553 {
00554 assert(!mtx);
00555
00556 mtx.lock();
00557
00558
00559 pin_map_t::iterator it_pin = _pin_map.find(handle);
00560
00561 if (it_pin == _pin_map.end())
00562 {
00563 format_logging(0, __FILE__, __LINE__, en_log_error, "pin %d not found", handle);
00564 return false;
00565 }
00566
00567 assert(it_pin->_callback_invoking_mask & mask);
00568
00569
00570 it_pin->_callback_invoking_mask &= ~mask;
00571
00572
00573 if (unmask_in_progress)
00574 it_pin->_in_progress_mask &= ~mask;
00575
00576 if (it_pin->_callback_invoking_mask)
00577 return true;
00578
00579
00580 if (!it_pin->_still_alive)
00581 {
00582
00583 mtx.unlock();
00584 initiate_close(handle, mask, true);
00585 return false;
00586 }
00587
00588 return true;
00589 }
00590
00592
00593
00594
00595 size_t
00596 aiogate::listen( const char* address,
00597 unsigned short port,
00598 size_t max_connection,
00599 unsigned short buffered_acceptors,
00600 terimber_aiogate_pin_factory* factory,
00601 void* arg)
00602 {
00603 if (!factory)
00604 {
00605 format_logging(0, __FILE__, __LINE__, en_log_error, "factory pointer is null");
00606 return 0;
00607 }
00608
00609 size_t ident = _pin_port.create(this, true);
00610 if (!ident)
00611 {
00612 format_logging(0, __FILE__, __LINE__, en_log_error, "aiosock can not create socket");
00613 return 0;
00614 }
00615
00616
00617
00618 mutex_keeper keeper(_pin_mtx);
00619
00620 listener_info info(factory, arg);
00621 listener_map_t::iterator it_listener = _listener_map.insert(ident, info).first;
00622
00623 if (it_listener == _listener_map.end())
00624 {
00625
00626
00627 _pin_port.close(ident);
00628 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
00629 return 0;
00630 }
00631
00632
00633 if (_pin_port.listen(ident, port, max_connection, address, buffered_acceptors, (void*)(size_t)aiogate_accept_mask))
00634 {
00635 format_logging(0, __FILE__, __LINE__, en_log_error, "can not start listener on address %d and port %hu", address, port);
00636
00637 _listener_map.erase(it_listener);
00638
00639 _pin_port.close(ident);
00640 return 0;
00641 }
00642
00643
00644 format_logging(0, __FILE__, __LINE__, en_log_info, "listener started on address %d and port %hu", address, port);
00645 return ident;
00646 }
00647
00648
00649
00650 void
00651 aiogate::deaf(size_t ident)
00652 {
00653
00654 mutex_keeper keeper(_pin_mtx);
00655
00656
00657 listener_map_t::iterator it_listener = _listener_map.find(ident);
00658 if (it_listener != _listener_map.end())
00659 {
00660
00661 _listener_map.erase(it_listener);
00662
00663 _pin_port.close(ident);
00664
00665 format_logging(0, __FILE__, __LINE__, en_log_info, "listener %d stopped", ident);
00666 }
00667 }
00668
00669
00670
00671
00672
00673
00674 size_t
00675 aiogate::connect( const char* remote,
00676 unsigned short rport,
00677 const char* local,
00678 unsigned short lport,
00679 size_t timeout,
00680 terimber_aiogate_pin_factory* factory,
00681 void* arg)
00682 {
00683 if (!factory)
00684 {
00685 format_logging(0, __FILE__, __LINE__, en_log_error, "factory pointer is null");
00686 return 0;
00687 }
00688
00689 size_t ident = _pin_port.create(this, true);
00690 if (!ident)
00691 {
00692 format_logging(0, __FILE__, __LINE__, en_log_error, "can not create socket");
00693 return 0;
00694 }
00695
00696
00697
00698 terimber_aiogate_pin* pin = factory->create(arg);
00699 if (!pin)
00700 {
00701 format_logging(0, __FILE__, __LINE__, en_log_error, "can not create pin for argument %d", (size_t)arg);
00702
00703 _pin_port.close(ident);
00704 return 0;
00705 }
00706
00707
00708
00709
00710 mutex_keeper keeper(_pin_mtx);
00711
00712
00713 pin_info info;
00714 info._factory = factory;
00715 info._pin = pin;
00716 info._tcp_udp = true;
00717
00718 pin_map_t::iterator it_pin = _pin_map.insert(ident, info).first;
00719 if (it_pin == _pin_map.end())
00720 {
00721 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
00722
00723 _pin_port.close(ident);
00724 factory->destroy(pin);
00725 return 0;
00726 }
00727
00728
00729
00730 if (local && lport && _pin_port.bind(ident, local, lport))
00731 {
00732 format_logging(0, __FILE__, __LINE__, en_log_error, "can not bind specified local host %s and port %hu", local, lport);
00733 _pin_map.erase(it_pin);
00734
00735 _pin_port.close(ident);
00736 factory->destroy(pin);
00737 return 0;
00738 }
00739
00740 it_pin->_in_progress_mask |= aiogate_connect_mask;
00741
00742 if (_pin_port.connect(ident, remote, rport, timeout, (void*)(size_t)aiogate_connect_mask))
00743 {
00744 format_logging(0, __FILE__, __LINE__, en_log_error, "can not initiate connect to host %s and port %hu", remote, rport);
00745
00746 it_pin->_in_progress_mask &= ~aiogate_connect_mask;
00747
00748 _pin_map.erase(it_pin);
00749
00750 _pin_port.close(ident);
00751 factory->destroy(pin);
00752 return 0;
00753 }
00754
00755 format_logging(0, __FILE__, __LINE__, en_log_info, "connect initiated to host %s and port %hu", remote, rport);
00756 return ident;
00757 }
00758
00759
00760
00761
00762 size_t
00763 aiogate::bind(const char* address,
00764 unsigned short port,
00765 terimber_aiogate_pin_factory* factory,
00766 void* arg
00767 )
00768 {
00769 if (!factory)
00770 {
00771 format_logging(0, __FILE__, __LINE__, en_log_error, "factory pointer is null");
00772 return 0;
00773 }
00774
00775 size_t ident = _pin_port.create(this, false);
00776 if (!ident)
00777 {
00778 format_logging(0, __FILE__, __LINE__, en_log_error, "can not create socket");
00779 return 0;
00780 }
00781
00782
00783
00784 terimber_aiogate_pin* pin = factory->create(arg);
00785 if (!pin)
00786 {
00787 format_logging(0, __FILE__, __LINE__, en_log_error, "can not create pin for argument %d", (size_t)arg);
00788
00789 _pin_port.close(ident);
00790 return 0;
00791 }
00792
00793
00794
00795
00796 mutex_keeper keeper(_pin_mtx);
00797
00798
00799 pin_info info;
00800 info._factory = factory;
00801 info._pin = pin;
00802 info._tcp_udp = false;
00803
00804 pin_map_t::iterator it_pin = _pin_map.insert(ident, info).first;
00805 if (it_pin == _pin_map.end())
00806 {
00807 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
00808
00809 _pin_port.close(ident);
00810 factory->destroy(pin);
00811 return 0;
00812 }
00813
00814 if (_pin_port.bind(ident, address, port))
00815 {
00816 format_logging(0, __FILE__, __LINE__, en_log_error, "can not bind specified host %s and port %hu", address, port);
00817 _pin_map.erase(it_pin);
00818
00819 _pin_port.close(ident);
00820 factory->destroy(pin);
00821 return 0;
00822 }
00823
00824
00825 sockaddr_in local;
00826 if (_pin_port.getsockaddr(ident, local))
00827 {
00828 format_logging(0, __FILE__, __LINE__, en_log_error, "can not get peer/sock address for pin %d", ident);
00829 _pin_map.erase(it_pin);
00830
00831 _pin_port.close(ident);
00832 factory->destroy(pin);
00833 return 0;
00834 }
00835
00836
00837
00838 lock_pin(keeper, *it_pin, aiogate_bind_mask);
00839 try
00840 {
00841
00842 pin->on_bind(local, ident, this);
00843 }
00844 catch (...)
00845 {
00846 format_logging(0, __FILE__, __LINE__, en_log_error, "on_bind callback exception, handle %d", ident);
00847 assert(false);
00848 }
00849 unlock_pin(keeper, ident, aiogate_bind_mask, true);
00850
00851 format_logging(0, __FILE__, __LINE__, en_log_info, "bind processed pin %d", ident);
00852
00853 return ident;
00854 }
00855
00856
00857
00858
00859
00860 bool
00861 aiogate::send( size_t ident,
00862 const void* buf,
00863 size_t len,
00864 const sockaddr_in* toaddr
00865 )
00866 {
00867 if (!buf || !len)
00868 return false;
00869
00870 terimber_aiogate_buffer bulk;
00871 bulk.buf = buf;
00872 bulk.len = len;
00873 return send_bulk(ident, &bulk, 1, toaddr);
00874 }
00875
00876
00877 bool
00878 aiogate::send_bulk( size_t ident,
00879 const terimber_aiogate_buffer* bulk,
00880 size_t count,
00881 const sockaddr_in* toaddr
00882 )
00883 {
00884 if (!count || !bulk)
00885 {
00886 format_logging(0, __FILE__, __LINE__, en_log_error, "buffer pointer is null or count is zero, ident %d", ident);
00887 return false;
00888 }
00889
00890 mutex_keeper keeper(_pin_mtx);
00891 pin_map_t::iterator it_pin = _pin_map.find(ident);
00892 if (it_pin == _pin_map.end())
00893 {
00894 format_logging(0, __FILE__, __LINE__, en_log_error, "pin %d not found", ident);
00895 return false;
00896 }
00897
00898 if (!it_pin->_tcp_udp
00899 && !toaddr)
00900 {
00901 format_logging(0, __FILE__, __LINE__, en_log_error, "pin %d is UDP connection - address is required", ident);
00902 return false;
00903 }
00904
00905
00906 for (size_t index = 0; index < count; ++index)
00907 {
00908 const void* buf = bulk[index].buf;
00909 size_t len = bulk[index].len;
00910
00911 if (!buf || !len)
00912 continue;
00913
00914
00915 if (!it_pin->_tcp_udp && len > BUFFER_CHUNK + sizeof(udp_header))
00916 {
00917 format_logging(0, __FILE__, __LINE__, en_log_error, "pin %d is UDP connection - chunk size exceeeds the allowed maximum", ident);
00918 return false;
00919 }
00920
00921 size_t offset = 0;
00922
00923 while (len)
00924 {
00925 if (!it_pin->_stail)
00926 {
00927 fixed_size_buffer* p = _pin_allocator.allocate();
00928
00929 if (!p)
00930 {
00931 format_logging(0, __FILE__, __LINE__, en_log_error, "no enough memory");
00932
00933 return false;
00934 }
00935 else
00936 p = new (p) fixed_size_buffer();
00937
00938
00939 it_pin->_shead = it_pin->_stail = p;
00940 }
00941
00942 if (it_pin->_tcp_udp
00943 && it_pin->_stail->_end < BUFFER_CHUNK
00944 || !it_pin->_tcp_udp
00945 && (BUFFER_CHUNK - it_pin->_stail->_end) >= (len + sizeof(udp_header))
00946 )
00947 {
00948
00949 size_t clen = __min(BUFFER_CHUNK - it_pin->_stail->_end, len);
00950
00951
00952 if (it_pin->_tcp_udp)
00953 {
00954 memcpy(it_pin->_stail->_ptr + it_pin->_stail->_end, (const ub1_t*)buf + offset, clen);
00955
00956 it_pin->_stail->_end += clen;
00957 }
00958 else
00959 {
00960
00961 udp_header h;
00962 h._addr = *toaddr;
00963 h._payload = (ub4_t)len;
00964 memcpy(it_pin->_stail->_ptr + it_pin->_stail->_end, &h, sizeof(h));
00965 it_pin->_stail->_end += sizeof(h);
00966 memcpy(it_pin->_stail->_ptr + it_pin->_stail->_end, (const ub1_t*)buf + offset, clen);
00967 it_pin->_stail->_end += clen;
00968 }
00969
00970 len -= clen;
00971 offset += clen;
00972 }
00973 else
00974 {
00975
00976 fixed_size_buffer* p = _pin_allocator.allocate();
00977
00978 if (!p)
00979 {
00980 format_logging(0, __FILE__, __LINE__, en_log_error, "no enough memory");
00981
00982 return false;
00983 }
00984 else
00985 p = new(p) fixed_size_buffer();
00986
00987 it_pin->_stail->_next = p;
00988 it_pin->_stail = p;
00989 }
00990 }
00991 }
00992
00993 if (!(it_pin->_in_progress_mask & aiogate_send_mask))
00994 {
00995 it_pin->_in_progress_mask |= aiogate_send_mask;
00996
00997 sockaddr_in peeraddr;
00998 size_t requested = 0;
00999
01000 if (!it_pin->_tcp_udp)
01001 {
01002 udp_header* uheader= (udp_header*)(it_pin->_shead->_ptr + it_pin->_shead->_begin);
01003 requested = uheader->_payload;
01004 peeraddr = uheader->_addr;
01005
01006 it_pin->_shead->_begin += sizeof(udp_header);
01007 }
01008 else
01009 {
01010 requested = it_pin->_shead->_end - it_pin->_shead->_begin;
01011 }
01012
01013 if (_pin_port.send(ident, it_pin->_shead->_ptr + it_pin->_shead->_begin, requested, INFINITE, it_pin->_tcp_udp ? 0 : &peeraddr, (void*)(size_t)aiogate_send_mask))
01014 {
01015 format_logging(0, __FILE__, __LINE__, en_log_error, "can not initiate send for pin %d", ident);
01016 it_pin->_in_progress_mask &= ~aiogate_send_mask;
01017 return false;
01018 }
01019 }
01020
01021 format_logging(0, __FILE__, __LINE__, en_log_paranoid, "send bulk initiated for pin %d", ident);
01022 return true;
01023 }
01024
01025
01026
01027
01028
01029
01030 bool
01031 aiogate::recv( size_t ident,
01032 bool expect_delivery,
01033 const sockaddr_in* fromaddr
01034 )
01035 {
01036
01037 mutex_keeper keeper(_pin_mtx);
01038 pin_map_t::iterator it_pin = _pin_map.find(ident);
01039 if (it_pin == _pin_map.end())
01040 {
01041 format_logging(0, __FILE__, __LINE__, en_log_error, "pin %d not found", ident);
01042 return false;
01043 }
01044
01045 if (it_pin->_in_progress_mask & aiogate_recv_mask)
01046 {
01047 format_logging(0, __FILE__, __LINE__, en_log_paranoid, "pin %d is already in receive mode", ident);
01048 return true;
01049 }
01050
01051 if (expect_delivery)
01052 {
01053 if (!it_pin->_rbuf)
01054 {
01055 it_pin->_rbuf = _pin_allocator.allocate();
01056 if (!it_pin->_rbuf)
01057 {
01058 format_logging(0, __FILE__, __LINE__, en_log_error, "no enough memory");
01059 return false;
01060 }
01061 else
01062 it_pin->_rbuf = new (it_pin->_rbuf) fixed_size_buffer();
01063 }
01064 }
01065 else
01066 {
01067 if (it_pin->_rbuf)
01068 {
01069 _pin_allocator.deallocate(it_pin->_rbuf);
01070 it_pin->_rbuf = 0;
01071 }
01072 }
01073
01074
01075 it_pin->_in_progress_mask |= aiogate_recv_mask;
01076
01077
01078 if (_pin_port.receive(ident, expect_delivery ? (void*)it_pin->_rbuf->_ptr : &it_pin->_leader, expect_delivery ? BUFFER_CHUNK : sizeof(it_pin->_leader), it_pin->_recv_timeout, fromaddr, (void*)(size_t)aiogate_recv_mask))
01079 {
01080 format_logging(0, __FILE__, __LINE__, en_log_error, "can not initiate receive for pin %d", ident);
01081 it_pin->_in_progress_mask &= ~aiogate_recv_mask;
01082 return false;
01083 }
01084
01085 format_logging(0, __FILE__, __LINE__, en_log_paranoid, "receive initiated for pin %d", ident);
01086 return true;
01087 }
01088
01089
01090
01091 bool
01092 aiogate::set_send_timeout(size_t ident,
01093 size_t timeout
01094 )
01095 {
01096 mutex_keeper keeper(_pin_mtx);
01097 pin_map_t::iterator it = _pin_map.find(ident);
01098 if (it == _pin_map.end())
01099 {
01100 format_logging(0, __FILE__, __LINE__, en_log_info, "pin %d not found", ident);
01101 return false;
01102 }
01103
01104 it->_send_timeout = timeout;
01105 return true;
01106 }
01107
01108
01109
01110 bool
01111 aiogate::set_recv_timeout(size_t ident,
01112 size_t timeout
01113 )
01114 {
01115 mutex_keeper keeper(_pin_mtx);
01116 pin_map_t::iterator it = _pin_map.find(ident);
01117 if (it == _pin_map.end())
01118 {
01119 format_logging(0, __FILE__, __LINE__, en_log_error, "pin %d not found", ident);
01120 return false;
01121 }
01122
01123 it->_recv_timeout = timeout;
01124 return true;
01125 }
01126
01127
01128
01129 bool
01130 aiogate::close(size_t ident)
01131 {
01132
01133 mutex_keeper keeper(_pin_mtx);
01134 pin_map_t::iterator it = _pin_map.find(ident);
01135 if (it == _pin_map.end())
01136 {
01137 format_logging(0, __FILE__, __LINE__, en_log_error, "pin %d not found", ident);
01138 return false;
01139 }
01140
01141 keeper.unlock();
01142
01143 initiate_close(ident, 0, true);
01144
01145
01146 format_logging(0, __FILE__, __LINE__, en_log_info, "closure procedure for pin %d initiated", ident);
01147 return true;
01148 }
01149
01150
01151 bool
01152 aiogate::v_has_job(size_t ident, void* user_data)
01153 {
01154
01155 mutex_keeper keeper(_pin_mtx);
01156 return !_pin_list.empty();
01157 }
01158
01159
01160 void
01161 aiogate::v_do_job(size_t ident, void* user_data)
01162 {
01163
01164 mutex_keeper keeper(_pin_mtx);
01165 if (_pin_list.empty())
01166 return;
01167
01168 pin_info_extra info = _pin_list.front();
01169 _pin_list.pop_front();
01170
01171 keeper.unlock();
01172
01173 final_close(info);
01174 }
01175
01177 void
01178 aiogate::initiate_close(size_t ident, size_t mask, bool invoke_callback)
01179 {
01180
01181 mutex_keeper keeper(_pin_mtx);
01182 pin_map_t::iterator it = _pin_map.find(ident);
01183 if (it == _pin_map.end()
01184 || it->_callback_invoking_mask)
01185
01186 {
01187
01188 it->_still_alive = false;
01189 return;
01190 }
01191
01192
01193 pin_info_extra info(*it, ident, mask, invoke_callback);
01194 _pin_list.push_back(info);
01195 _pin_map.erase(it);
01196 keeper.unlock();
01197 _pin_thread.wakeup();
01198 }
01199
01200 void
01201 aiogate::final_close(pin_info_extra& info)
01202 {
01203
01204 try
01205 {
01206 if (info._invoke_callback)
01207 info._pin->on_close((ub4_t)info._mask);
01208 }
01209 catch (...)
01210 {
01211 format_logging(0, __FILE__, __LINE__, en_log_error, "on_clode callback exception, handle %d", info._ident);
01212 assert(false);
01213 }
01214
01215
01216 _pin_port.close(info._ident);
01217
01218
01219
01220 mutex_keeper keeper(_pin_mtx);
01221
01222
01223 if (info._rbuf)
01224 _pin_allocator.deallocate(info._rbuf);
01225
01226
01227 while (info._shead)
01228 {
01229 fixed_size_buffer* buf = info._shead;
01230 info._shead = info._shead->_next;
01231 _pin_allocator.deallocate(buf);
01232 }
01233
01234 keeper.unlock();
01235
01236
01237 info._factory->destroy(info._pin);
01238 }
01239
01241
01242 void
01243 aiogate::doxray()
01244 {
01245 mutex_keeper guard(_pin_mtx);
01246
01247 size_t pins = _pin_map.size(),
01248 listeners = _listener_map.size(),
01249 closed = _pin_list.size(),
01250 memory = _pin_allocator.capacity() * _pin_allocator.count();
01251
01252 guard.unlock();
01253
01254 format_logging(0, __FILE__, __LINE__, en_log_xray, "<aiogate pins=\"%d\" listeners=\"%d\" closed=\"%d\" memory=\"%d\" />",
01255 pins, listeners, closed, memory);
01256
01257 _pin_port.doxray();
01258 }
01259
01260 #pragma pack()
01261 END_TERIMBER_NAMESPACE