00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 int aiocomport_compiler_blocker = 0;
00030
00031
00032 #if OS_TYPE != OS_WIN32
00033
00035 #include "aiocomport/aiocomport.h"
00036 #include "base/map.hpp"
00037 #include "base/list.hpp"
00038 #include "base/stack.hpp"
00039 #include "base/common.hpp"
00040 #include "base/memory.hpp"
00041 #include "base/date.h"
00042
00043 BEGIN_TERIMBER_NAMESPACE
00044 #pragma pack(4)
00045
00047 #ifndef MAX_SOCKET
00048 #define MAX_SOCKET FD_SETSIZE
00049 #endif
00050
00052 const size_t us_timeout = 1000000;
00054 const size_t cp_working_thread_alert = 10000;
00055
00058 static aiocomport singleton;
00059
00062 class handle_desc
00063 {
00064 public:
00065 HANDLE _handle;
00066 bool _writing;
00067 bool _reading;
00068 };
00069
00072
00073 HANDLE
00074 CreateIoCompletionPort
00075 (
00076 HANDLE sock_fd,
00077 HANDLE port_fd,
00078 size_t completion_key,
00079 en_type type
00080 )
00081 {
00082 return singleton.CreateIoCompletionPort(sock_fd, port_fd, completion_key, type);
00083 }
00084
00085 bool
00086 AcceptEx
00087 (
00088 HANDLE sock_listen,
00089 LPOVERLAPPED overlapped
00090 )
00091 {
00092 return singleton.AcceptEx(sock_listen, overlapped);
00093 }
00094
00095 bool
00096 ConnectEx
00097 (
00098 HANDLE sock_fd,
00099 const struct sockaddr_in* name,
00100 LPOVERLAPPED overlapped
00101 )
00102 {
00103 return singleton.ConnectEx(sock_fd, name, overlapped);
00104 }
00105
00106 bool
00107 WSARecv
00108 (
00109 HANDLE sock_fd,
00110 void* buf,
00111 size_t len,
00112 LPOVERLAPPED overlapped
00113 )
00114 {
00115 return singleton.WSARecv(sock_fd, buf, len, overlapped);
00116 }
00117
00118 bool
00119 WSARecvFrom
00120 (
00121 HANDLE sock_fd,
00122 void* buf,
00123 size_t len,
00124 struct sockaddr_in* name,
00125 LPOVERLAPPED overlapped
00126 )
00127 {
00128 return singleton.WSARecvFrom(sock_fd, buf, len, name, overlapped);
00129 }
00130
00131 bool
00132 ReadFile
00133 (
00134 HANDLE sock_fd,
00135 void* buf,
00136 size_t len,
00137 LPOVERLAPPED overlapped
00138 )
00139 {
00140 return singleton.ReadFile(sock_fd, buf, len, overlapped);
00141 }
00142
00143 bool
00144 WSASend
00145 (
00146 HANDLE sock_fd,
00147 const void* buf,
00148 size_t len,
00149 LPOVERLAPPED overlapped
00150 )
00151 {
00152 return singleton.WSASend(sock_fd, buf, len, overlapped);
00153 }
00154
00155 bool
00156 WSASendTo
00157 (
00158 HANDLE sock_fd,
00159 const void* buf,
00160 size_t len,
00161 const struct sockaddr_in* name,
00162 LPOVERLAPPED overlapped
00163 )
00164 {
00165 return singleton.WSASendTo(sock_fd, buf, len, name, overlapped);
00166 }
00167
00168 bool
00169 WriteFile
00170 (
00171 HANDLE sock_fd,
00172 const void* buf,
00173 size_t len,
00174 LPOVERLAPPED overlapped
00175 )
00176 {
00177 return singleton.WriteFile(sock_fd, buf, len, overlapped);
00178 }
00179
00180
00181 int
00182 GetQueuedCompletionStatus
00183 (
00184 HANDLE port_fd,
00185 size_t* number_bytes,
00186 size_t* completion_key,
00187 LPOVERLAPPED* overlapped,
00188 size_t timeout_milliseconds
00189 )
00190 {
00191 return singleton.GetQueuedCompletionStatus(port_fd, number_bytes, completion_key, overlapped, timeout_milliseconds);
00192 }
00193
00194 bool
00195 PostQueuedCompletionStatus
00196 (
00197 HANDLE port_fd,
00198 size_t bytes_transferred,
00199 size_t completion_key,
00200 LPOVERLAPPED overlapped
00201 )
00202 {
00203 return singleton.PostQueuedCompletionStatus(port_fd, bytes_transferred, completion_key, overlapped);
00204 }
00205
00206 bool
00207 CloseHandle
00208 (
00209 HANDLE port_fd
00210 )
00211 {
00212 return singleton.CloseHandle(port_fd);
00213 }
00214
00215 bool
00216 CancelIo
00217 (
00218 HANDLE sock_fd,
00219 LPOVERLAPPED overlapped
00220 )
00221 {
00222 return singleton.CancelIo(sock_fd, overlapped);
00223 }
00224
00225 void
00226 SetLog
00227 (
00228 terimber_log* log
00229 )
00230 {
00231 singleton.SetLog(log);
00232 }
00233
00235 void
00236 DoXRay()
00237 {
00238
00239 singleton.DoXRay();
00240 }
00241
00243 #ifndef NO_NPTL
00244
00245 #define AIOSOCKSIGNAL (SIGRTMIN + 3)
00246 #define AIOFILESIGNAL (SIGRTMIN + 4)
00247
00248 #endif
00249
00251 aiocomport::aiocomport() :
00252 _event_thread_id(0)
00253 {
00254 struct sigaction sa;
00255 sa.sa_handler = SIG_DFL;
00256 sa.sa_flags = 0;
00257
00258 sigemptyset(&sa.sa_mask);
00259
00260 #ifndef NO_NPTL
00261 sigaddset(&sa.sa_mask, SIGIO);
00262 sigaddset(&sa.sa_mask, AIOSOCKSIGNAL);
00263 sigaddset(&sa.sa_mask, AIOFILESIGNAL);
00264 #else
00265
00266 #endif
00267
00268 sigprocmask(SIG_BLOCK, &sa.sa_mask, 0);
00269
00270 pthread_sigmask(SIG_BLOCK, &sa.sa_mask, 0);
00271
00272 #ifndef NO_NPTL
00273
00274 sigaction(SIGIO, &sa, 0);
00275 sigaction(AIOSOCKSIGNAL, &sa, 0);
00276 sigaction(AIOFILESIGNAL, &sa, 0);
00277 #else
00278
00279 #endif
00280 }
00281
00282 aiocomport::~aiocomport()
00283 {
00284 _event_thread_id = 0;
00285 }
00286
00287 HANDLE
00288 aiocomport::CreateIoCompletionPort
00289 (
00290 HANDLE sock_fd,
00291 HANDLE port_fd,
00292 size_t completion_key,
00293 en_type type
00294 )
00295 {
00296
00297 mutex_keeper keeper(_port_mtx);
00298
00299 HANDLE handle = port_fd;
00300
00301 port_attr_map_t::iterator it_port = _port_attr_map.end();
00302
00303
00304 if (port_fd == 0)
00305 {
00306
00307 if (_port_attr_map.empty())
00308 {
00309
00310 job_task task(this, 0, INFINITE, 0);
00311 _event_thread.start();
00312 _event_thread.assign_job(task);
00313 _ev_activate.wait();
00314 }
00315
00316
00317 handle = _generator.generate();
00318
00319
00320 port_attributes attr;
00321 it_port = _port_attr_map.insert(handle, attr).first;
00322 if (it_port == _port_attr_map.end())
00323 {
00324
00325 _generator.save((size_t)handle);
00326
00327
00328 if (_port_attr_map.empty())
00329 {
00330 _event_thread_id = 0;
00331
00332 keeper.unlock();
00333 _event_thread.cancel_job();
00334 _event_thread.stop();
00335 _ev_deactivate.wait();
00336 }
00337
00338 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
00339 errno = -1;
00340 return 0;
00341 }
00342 }
00343 else
00344 {
00345
00346 it_port = _port_attr_map.find(port_fd);
00347 if (it_port == _port_attr_map.end())
00348 {
00349 format_logging(0, __FILE__, __LINE__, en_log_error, "completion port ident %d not found", port_fd);
00350 errno = -1;
00351 return 0;
00352 }
00353 }
00354
00355 if (sock_fd == INVALID_SOCKET)
00356 {
00357 if (port_fd != 0)
00358 {
00359 format_logging(0, __FILE__, __LINE__, en_log_error, "can not accosiate invalid socket handle %d with completion port %d", sock_fd, port_fd);
00360 errno = -1;
00361 return 0;
00362 }
00363 }
00364 else
00365 {
00366
00367 pid_t pid = getpid();
00368
00369 int oldflags = ::fcntl(sock_fd, F_GETFL, 0);
00370 ::fcntl(sock_fd, F_SETOWN, pid);
00371
00372 if (type != TYPE_FILE)
00373 {
00374 #ifndef NO_NPTL
00375 ::fcntl(sock_fd, F_SETFL, oldflags | O_NONBLOCK | O_ASYNC);
00376
00377 ::fcntl(sock_fd, F_SETSIG, AIOSOCKSIGNAL);
00378 #else
00379 ::fcntl(sock_fd, F_SETFL, oldflags | O_NONBLOCK);
00380 #endif
00381 format_logging(0, __FILE__, __LINE__, en_log_info, "socket %d will signal to thread %d", sock_fd, pid);
00382 }
00383 else
00384 {
00385 ::fcntl(sock_fd, F_SETFL, oldflags | O_NONBLOCK);
00386 format_logging(0, __FILE__, __LINE__, en_log_info, "file %d will signal to thread %d", sock_fd, pid);
00387 }
00388
00389
00390
00391 fd_port_map_t::iterator it_fd = _fd_port_map.find(sock_fd);
00392 if (it_fd != _fd_port_map.end())
00393 {
00394 assert(port_fd != 0);
00395 format_logging(0, __FILE__, __LINE__, en_log_error, "accosiation already exists for socket handle %d with completion port %d", sock_fd, port_fd);
00396 errno = -1;
00397 return 0;
00398 }
00399
00400
00401 fd_item item(it_port, completion_key, type);
00402
00403 it_fd = _fd_port_map.insert(sock_fd, item).first;
00404 if (it_fd == _fd_port_map.end())
00405 {
00406 if (port_fd == 0)
00407 {
00408 _port_attr_map.erase(handle);
00409 _generator.save((size_t)handle);
00410
00411 if (_port_attr_map.empty())
00412 {
00413 _event_thread_id = 0;
00414
00415 keeper.unlock();
00416 _event_thread.cancel_job();
00417 _event_thread.stop();
00418 _ev_deactivate.wait();
00419 }
00420 }
00421
00422 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
00423 errno = -1;
00424 return 0;
00425 }
00426 }
00427
00428 if (sock_fd == INVALID_SOCKET)
00429 format_logging(0, __FILE__, __LINE__, en_log_info, "new completion port %d created", handle);
00430 else
00431 format_logging(0, __FILE__, __LINE__, en_log_info, "socket %d is associated with completion port %d", sock_fd, handle);
00432
00433 errno = 0;
00434 return handle;
00435 }
00436
00437 bool
00438 aiocomport::AcceptEx
00439 (
00440 HANDLE sock_listen,
00441 LPOVERLAPPED overlapped
00442 )
00443 {
00444
00445 return initiate_action(ACTION_ACCEPT, sock_listen, overlapped, 0, 0, 0);
00446 }
00447
00448 bool
00449 aiocomport::ConnectEx
00450 (
00451 HANDLE sock_fd,
00452 const struct sockaddr_in* name,
00453 LPOVERLAPPED overlapped
00454 )
00455 {
00456
00457 return initiate_action(ACTION_CONNECT, sock_fd, overlapped, 0, 0, name);
00458 }
00459
00460 bool
00461 aiocomport::WSARecv
00462 (
00463 HANDLE sock_fd,
00464 void* buf,
00465 size_t len,
00466 LPOVERLAPPED overlapped
00467 )
00468 {
00469
00470 return initiate_action(ACTION_RECV, sock_fd, overlapped, buf, len, 0);
00471 }
00472
00473 bool
00474 aiocomport::WSARecvFrom
00475 (
00476 HANDLE sock_fd,
00477 void* buf,
00478 size_t len,
00479 struct sockaddr_in* name,
00480 LPOVERLAPPED overlapped
00481 )
00482 {
00483
00484 return initiate_action(ACTION_RECV, sock_fd, overlapped, buf, len, name);
00485 }
00486
00487 bool
00488 aiocomport::ReadFile
00489 (
00490 HANDLE sock_fd,
00491 void* buf,
00492 size_t len,
00493 LPOVERLAPPED overlapped
00494 )
00495 {
00496
00497 return initiate_action(ACTION_READ, sock_fd, overlapped, buf, len, 0);
00498 }
00499
00500 bool
00501 aiocomport::WSASend
00502 (
00503 HANDLE sock_fd,
00504 const void* buf,
00505 size_t len,
00506 LPOVERLAPPED overlapped
00507 )
00508 {
00509
00510 return initiate_action(ACTION_SEND, sock_fd, overlapped, buf, len, 0);
00511 }
00512
00513 bool
00514 aiocomport::WSASendTo
00515 (
00516 HANDLE sock_fd,
00517 const void* buf,
00518 size_t len,
00519 const struct sockaddr_in* name,
00520 LPOVERLAPPED overlapped
00521 )
00522 {
00523
00524 return initiate_action(ACTION_SEND, sock_fd, overlapped, buf, len, name);
00525 }
00526
00527 bool
00528 aiocomport::WriteFile
00529 (
00530 HANDLE sock_fd,
00531 const void* buf,
00532 size_t len,
00533 LPOVERLAPPED overlapped
00534 )
00535 {
00536
00537 return initiate_action(ACTION_WRITE, sock_fd, overlapped, buf, len, 0);
00538 }
00539
00540 bool
00541 aiocomport::initiate_action(en_action action, HANDLE sock_id, LPOVERLAPPED overlapped, const void* buf, size_t len, const sockaddr_in* name)
00542 {
00543
00544 if (!overlapped)
00545 {
00546 format_logging(0, __FILE__, __LINE__, en_log_error, "overlapped structure is not specified");
00547 errno = -1;
00548 return false;
00549 }
00550
00551
00552 mutex_keeper keeper(_port_mtx);
00553
00554
00555 fd_port_map_t::iterator it_fd = _fd_port_map.find(sock_id);
00556 if (it_fd == _fd_port_map.end())
00557 {
00558 format_logging(0, __FILE__, __LINE__, en_log_error, "socket %d not found", sock_id);
00559 errno = -1;
00560 return false;
00561 }
00562
00563
00564 queue_item item;
00565 item._action = action;
00566 item.aio_fildes = sock_id;
00567 item._key = it_fd->_completion_key;
00568 item._overlapped = overlapped;
00569 item.aio_buf = (void*)buf;
00570 item.aio_nbytes = len;
00571 item.aio_offset = overlapped->offset;
00572 overlapped->hAccept = (SOCKET)INVALID_SOCKET;
00573 if (name)
00574 overlapped->remoteAddress = *name;
00575
00576 int ret = 0;
00577 SOCKET hAccept = (SOCKET)INVALID_SOCKET;
00578 socklen_t alen = sizeof(sockaddr_in);
00579
00580
00581 queue_container_iterator_t iter = it_fd->_initial_container.push_back(_queue_allocator, item);
00582
00583 if (iter == it_fd->_initial_container.end())
00584 {
00585 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory for socket %d", sock_id);
00586 errno = -1;
00587 return false;
00588 }
00589
00590
00591 switch (action)
00592 {
00593 case ACTION_CONNECT:
00594
00595 if (!name)
00596 {
00597 it_fd->_initial_container.erase(_queue_allocator, iter);
00598 format_logging(0, __FILE__, __LINE__, en_log_error, "connection address for socket %d is not specified", sock_id);
00599 errno = -1;
00600 return false;
00601 }
00602
00603 ret = ::connect(item.aio_fildes, (const sockaddr*)name, alen);
00604 break;
00605 case ACTION_ACCEPT:
00606
00607 hAccept = ::accept(item.aio_fildes, (sockaddr*)&overlapped->remoteAddress, &alen);
00608 ret = (hAccept == INVALID_SOCKET) ? -1 : 0;
00609 break;
00610 case ACTION_RECV:
00611
00612 if (it_fd->_type == TYPE_UDP && !name)
00613 {
00614 it_fd->_initial_container.erase(_queue_allocator, iter);
00615 format_logging(0, __FILE__, __LINE__, en_log_error, "peer address for UDP socket %d is required", sock_id);
00616 errno = -1;
00617 return false;
00618 }
00619
00620
00621 if (!buf || !len)
00622 {
00623 it_fd->_initial_container.erase(_queue_allocator, iter);
00624 format_logging(0, __FILE__, __LINE__, en_log_error, "null or empty buffer for socket %d found", sock_id);
00625 errno = -1;
00626 return false;
00627 }
00628
00629
00630 ret = (it_fd->_type == TYPE_TCP) ? ::recv(item.aio_fildes, (char*)item.aio_buf, item.aio_nbytes, 0) :
00631 ::recvfrom(item.aio_fildes, (char*)item.aio_buf, item.aio_nbytes, 0, (sockaddr*)&overlapped->remoteAddress, &alen);
00632 break;
00633 case ACTION_SEND:
00634
00635 if (it_fd->_type == TYPE_UDP && !name)
00636 {
00637 it_fd->_initial_container.erase(_queue_allocator, iter);
00638 format_logging(0, __FILE__, __LINE__, en_log_error, "peer address for UDP socket %d is required", sock_id);
00639 errno = -1;
00640 return false;
00641 }
00642
00643
00644 if (!buf || !len)
00645 {
00646 it_fd->_initial_container.erase(_queue_allocator, iter);
00647 format_logging(0, __FILE__, __LINE__, en_log_error, "null or empty buffer for socket %d found", sock_id);
00648 errno = -1;
00649 return false;
00650 }
00651
00652
00653 ret = (it_fd->_type == TYPE_TCP) ? ::send(item.aio_fildes, (const char*)item.aio_buf, (int)item.aio_nbytes, MSG_NOSIGNAL) :
00654 ::sendto(item.aio_fildes, (const char*)item.aio_buf, (int)item.aio_nbytes, 0, (const sockaddr*)&overlapped->remoteAddress, alen);
00655 break;
00656 case ACTION_READ:
00657 {
00658 #ifndef NO_NPTL
00659 iter->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
00660 iter->aio_sigevent.sigev_signo = AIOFILESIGNAL;
00661 #else
00662 iter->aio_sigevent.sigev_notify = SIGEV_NONE;
00663 iter->aio_sigevent.sigev_signo = 0;
00664 #endif
00665 iter->aio_sigevent.sigev_value.sival_ptr = &*iter;
00666 ret = aio_read(&*iter);
00667 }
00668 break;
00669 case ACTION_WRITE:
00670 {
00671 #ifndef NO_NPTL
00672 iter->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
00673 iter->aio_sigevent.sigev_signo = AIOFILESIGNAL;
00674 #else
00675 iter->aio_sigevent.sigev_notify = SIGEV_NONE;
00676 iter->aio_sigevent.sigev_signo = 0;
00677 #endif
00678 iter->aio_sigevent.sigev_value.sival_ptr = &*iter;
00679 ret = aio_write(&*iter);
00680 }
00681 break;
00682 default:
00683 assert(false);
00684 return false;
00685 }
00686
00687 if (ret >= 0)
00688 {
00689 if (it_fd->_type != TYPE_FILE)
00690 {
00691 switch (action)
00692 {
00693 case ACTION_CONNECT:
00694 break;
00695 case ACTION_ACCEPT:
00696
00697 iter->_overlapped->hAccept = hAccept;
00698 break;
00699 case ACTION_RECV:
00700
00701 iter->_processed = ret;
00702 break;
00703 case ACTION_SEND:
00704
00705 iter->_processed = ret;
00706 break;
00707 default:
00708 assert(false);
00709 return false;
00710 }
00711
00712
00713 it_fd->_port_iter->_completion_container.push_back(_queue_allocator, *iter);
00714
00715 it_fd->_initial_container.erase(_queue_allocator, iter);
00716
00717 it_fd->_port_iter->_queue_event.signal();
00718 return true;
00719 }
00720 }
00721 else if (
00722 errno != EWOULDBLOCK && errno != EINPROGRESS
00723 )
00724 {
00725 it_fd->_initial_container.erase(_queue_allocator, iter);
00726 format_logging(0, __FILE__, __LINE__, en_log_error, "can not initiate action for socket %d, error %d", sock_id, errno);
00727 return false;
00728 }
00729
00730 format_logging(0, __FILE__, __LINE__, en_log_info, "asynchronous action is initiated - atcion %d", action);
00731
00732 #ifdef NO_NPTL
00733 _ev_wakeup.signal();
00734 #endif
00735 return true;
00736 }
00737
00738
00739 int
00740 aiocomport::GetQueuedCompletionStatus
00741 (
00742 HANDLE port_fd,
00743 size_t* number_bytes,
00744 size_t* completion_key,
00745 LPOVERLAPPED* overlapped,
00746 size_t timeout_milliseconds
00747 )
00748 {
00749 int ret = -1;
00750
00751
00752 mutex_keeper keeper(_port_mtx);
00753
00754
00755 port_attr_map_t::iterator it_port = _port_attr_map.find(port_fd);
00756 if (it_port == _port_attr_map.end()
00757 || it_port->_go_to_exit
00758 )
00759 {
00760 format_logging(0, __FILE__, __LINE__, en_log_error, "completion port %d does not exist", port_fd);
00761 return ret;
00762 }
00763
00764
00765 ++it_port->_loop_lock;
00766
00767
00768
00769
00770
00771 keeper.unlock();
00772
00773
00774 int timeout = (timeout_milliseconds == INFINITE) ? INFINITE : timeout_milliseconds;
00775
00776 date now;
00777 event* queue_event = 0;
00778
00779 do
00780 {
00781
00782 if (timeout != INFINITE)
00783 {
00784 timeout -= (int)date::get_difference(now);
00785
00786 if (timeout < 0)
00787 {
00788 format_logging(0, __FILE__, __LINE__, en_log_error, "timeout occured for port queue %d", port_fd);
00789 errno = -1;
00790 break;
00791 }
00792 }
00793
00794
00795 keeper.lock();
00796
00797
00798 it_port = _port_attr_map.find(port_fd);
00799 if (it_port == _port_attr_map.end()
00800 || it_port->_go_to_exit
00801 )
00802 {
00803 format_logging(0, __FILE__, __LINE__, en_log_error, "completion port %d does not exist", port_fd);
00804 return ret;
00805 }
00806
00807
00808 if (!it_port->_completion_container.empty())
00809 {
00810
00811 const queue_item& item = it_port->_completion_container.front();
00812
00813 if (number_bytes)
00814 *number_bytes = item._processed;
00815
00816 if (completion_key)
00817 *completion_key = item._key;
00818
00819 if (overlapped)
00820 *overlapped = item._overlapped;
00821
00822
00823 ret = item._error;
00824
00825
00826 it_port->_completion_container.pop_front(_queue_allocator);
00827
00828
00829 format_logging(0, __FILE__, __LINE__, en_log_paranoid, "queue item processed for port %d: socket %d, key %d, bytes %d, error %d", port_fd, item.aio_fildes, item._key, item._processed, item._error);
00830
00831
00832 keeper.unlock();
00833
00834 break;
00835 }
00836
00837 queue_event = &it_port->_queue_event;
00838
00839 keeper.unlock();
00840
00841 }
00842 while (WAIT_OBJECT_0 == queue_event->wait(timeout));
00843
00844
00845 keeper.lock();
00846
00847
00848 it_port = _port_attr_map.find(port_fd);
00849 if (it_port == _port_attr_map.end())
00850 {
00851 return ret;
00852 }
00853
00854
00855 --it_port->_loop_lock;
00856 if (it_port->_loop_lock == 0
00857 && it_port->_go_to_exit
00858 )
00859 {
00860 it_port->_exit_event.signal();
00861 }
00862
00863 return ret;
00864 }
00865
00866
00867 bool
00868 aiocomport::PostQueuedCompletionStatus
00869 (
00870 HANDLE port_fd,
00871 size_t bytes_transferred,
00872 size_t completion_key,
00873 LPOVERLAPPED overlapped
00874 )
00875 {
00876
00877 mutex_keeper keeper(_port_mtx);
00878
00879
00880 port_attr_map_t::iterator it_port = _port_attr_map.find(port_fd);
00881 if (it_port == _port_attr_map.end())
00882 {
00883 format_logging(0, __FILE__, __LINE__, en_log_error, "completion port %d not found", port_fd);
00884 return false;
00885 }
00886
00887
00888 queue_item item;
00889 item._action = ACTION_USERDATA;
00890 item.aio_fildes = (int)INVALID_SOCKET;
00891 item._key = completion_key;
00892 item._overlapped = overlapped;
00893 item._processed = bytes_transferred;
00894
00895
00896 it_port->_completion_container.push_back(_queue_allocator, item);
00897
00898 it_port->_queue_event.signal();
00899
00900 format_logging(0, __FILE__, __LINE__, en_log_paranoid, "user data action initiated for port %d, key %d, bytes %d", port_fd, completion_key, bytes_transferred);
00901 return true;
00902 }
00903
00904 bool
00905 aiocomport::CloseHandle
00906 (
00907 HANDLE hObject
00908 )
00909 {
00910
00911 mutex_keeper keeper(_port_mtx);
00912
00913
00914 port_attr_map_t::iterator it_port = _port_attr_map.find(hObject);
00915 if (it_port == _port_attr_map.end())
00916 {
00917 format_logging(0, __FILE__, __LINE__, en_log_error, "completion port %d not found", hObject);
00918 return false;
00919 }
00920
00921
00922 if (it_port->_go_to_exit)
00923 {
00924 format_logging(0, __FILE__, __LINE__, en_log_error, "another thread is closing completion port %d", hObject);
00925 return false;
00926 }
00927 else
00928 it_port->_go_to_exit = true;
00929
00930
00931 if (it_port->_loop_lock)
00932 {
00933
00934 it_port->_queue_event.signal();
00935
00936
00937 event& r_event = it_port->_exit_event;
00938
00939 keeper.unlock();
00940
00941 r_event.wait(3000);
00942
00943 keeper.lock();
00944 }
00945
00946
00947 it_port = _port_attr_map.find(hObject);
00948 if (it_port != _port_attr_map.end())
00949 {
00950
00951 for (queue_container_t::iterator it_comp = it_port->_completion_container.begin(); it_comp != it_port->_completion_container.end();)
00952 it_comp = it_port->_completion_container.erase(_queue_allocator, it_comp);
00953
00954
00955 for (fd_port_map_t::iterator it_fd = _fd_port_map.begin(); it_fd != _fd_port_map.end();)
00956 {
00957 if (it_fd->_port_iter.key() == hObject)
00958 {
00959
00960 for (queue_container_t::iterator it_item = it_fd->_initial_container.begin(); it_item != it_fd->_initial_container.end();)
00961 {
00962 if (it_fd->_type == TYPE_FILE)
00963 aio_cancel(hObject, &*it_item);
00964
00965 it_item = it_fd->_initial_container.erase(_queue_allocator, it_item);
00966 }
00967
00968
00969 it_fd = _fd_port_map.erase(it_fd);
00970 }
00971 else
00972 ++it_fd;
00973 }
00974
00975
00976 _port_attr_map.erase(it_port);
00977
00978
00979 if (_port_attr_map.empty())
00980 {
00981
00982 _queue_allocator.reset();
00983
00984 _event_thread_id = 0;
00985
00986 keeper.unlock();
00987 _event_thread.cancel_job();
00988 _event_thread.stop();
00989 _ev_deactivate.wait();
00990 }
00991 }
00992
00993 format_logging(0, __FILE__, __LINE__, en_log_info, "completion port %d is closed", hObject);
00994 return true;
00995 }
00996
00997 bool
00998 aiocomport::CancelIo
00999 (
01000 HANDLE sock_fd,
01001 LPOVERLAPPED overlapped
01002 )
01003 {
01004
01005 mutex_keeper keeper(_port_mtx);
01006
01007
01008 fd_port_map_t::iterator it_fd = _fd_port_map.find(sock_fd);
01009 if (it_fd == _fd_port_map.end())
01010 {
01011 format_logging(0, __FILE__, __LINE__, en_log_error, "socket %d not found", sock_fd);
01012 return false;
01013 }
01014
01015
01016 for (queue_container_t::iterator it_item = it_fd->_initial_container.begin(); it_item != it_fd->_initial_container.end();)
01017 {
01018
01019 if (overlapped != 0 && overlapped != it_item->_overlapped)
01020 {
01021 ++it_item;
01022 continue;
01023 }
01024
01025 if (it_fd->_type == TYPE_FILE)
01026 aio_cancel(sock_fd, &*it_item);
01027
01028
01029 it_item = it_fd->_initial_container.erase(_queue_allocator, it_item);
01030 }
01031
01032
01033
01034 for (queue_container_t::iterator it_queue = it_fd->_port_iter->_completion_container.begin(); it_queue != it_fd->_port_iter->_completion_container.begin();)
01035 {
01036 if (it_queue->aio_fildes != sock_fd || overlapped != 0 && overlapped != it_queue->_overlapped)
01037 {
01038 ++it_queue;
01039 continue;
01040 }
01041
01042 it_queue = it_fd->_port_iter->_completion_container.erase(_queue_allocator, it_queue);
01043 }
01044
01045
01046
01047 if (!overlapped)
01048 {
01049 _fd_port_map.erase(it_fd);
01050 format_logging(0, __FILE__, __LINE__, en_log_info, "socket %d is removed from completion port", sock_fd);
01051 }
01052
01053 return true;
01054 }
01055
01056 void
01057 aiocomport::SetLog
01058 (
01059 terimber_log* log
01060 )
01061 {
01062
01063 log_on(log);
01064 }
01065
01066 void
01067 aiocomport::DoXRay()
01068 {
01069
01070 mutex_keeper keeper(_port_mtx);
01071
01072 size_t connect_initiated = 0, accept_initiated = 0, send_initiated = 0, recv_initiated = 0;
01073 size_t connect_completed = 0, accept_completed = 0, send_completed = 0, recv_completed = 0;
01074
01075
01076 for (fd_port_map_t::iterator it_fd = _fd_port_map.begin(); it_fd != _fd_port_map.end(); ++it_fd)
01077 {
01078 queue_container_t::iterator it_item = it_fd->_initial_container.begin();
01079 for (; it_item != it_fd->_initial_container.end(); ++it_item)
01080 {
01081 switch (it_item->_action)
01082 {
01083 case ACTION_CONNECT:
01084 ++connect_initiated;
01085 break;
01086 case ACTION_SEND:
01087 case ACTION_WRITE:
01088 ++send_initiated;
01089 break;
01090 case ACTION_ACCEPT:
01091 ++accept_initiated;
01092 break;
01093 case ACTION_RECV:
01094 case ACTION_READ:
01095 ++recv_initiated;
01096 break;
01097 default:
01098 break;
01099 }
01100 }
01101
01102 it_item = it_fd->_port_iter->_completion_container.begin();
01103 for (; it_item != it_fd->_port_iter->_completion_container.end(); ++it_item)
01104 {
01105 switch (it_item->_action)
01106 {
01107 case ACTION_CONNECT:
01108 ++connect_completed;
01109 break;
01110 case ACTION_SEND:
01111 case ACTION_WRITE:
01112 ++send_completed;
01113 break;
01114 case ACTION_ACCEPT:
01115 ++accept_completed;
01116 break;
01117 case ACTION_RECV:
01118 case ACTION_READ:
01119 ++recv_completed;
01120 break;
01121 default:
01122 break;
01123 }
01124 }
01125 }
01126
01127 keeper.unlock();
01128
01129 format_logging(0, __FILE__, __LINE__, en_log_xray, "<aiocomport><initial connect=\"%d\" accept=\"%d\" send=\"%d\" recv=\"%d\" /><completed connect=\"%d\" accept=\"%d\" send=\"%d\" recv=\"%d\" /></aiocomport>",
01130 connect_initiated, accept_initiated, send_initiated, recv_initiated,
01131 connect_completed, accept_completed, send_completed, recv_completed);
01132 }
01133
01134
01136 void
01137 aiocomport::func_sock_signal(int fd, int si_code)
01138 {
01139
01140 mutex_keeper keeper(_port_mtx);
01141
01142
01143 fd_port_map_t::iterator it_fd = _fd_port_map.find(fd);
01144 if (it_fd == _fd_port_map.end())
01145 {
01146 format_logging(0, __FILE__, __LINE__, en_log_error, "func_signal: can not find socket %d", fd);
01147 return;
01148 }
01149
01150
01151 switch (si_code)
01152 {
01153 case POLL_IN:
01154 {
01155 size_t accept_waited = 0, recv_waited = 0;
01156 for (queue_container_t::iterator it_item = it_fd->_initial_container.begin(); it_item != it_fd->_initial_container.end();)
01157 {
01158 switch (it_item->_action)
01159 {
01160 case ACTION_ACCEPT:
01161 {
01162 it_item->_processed = 0;
01163 socklen_t len = sizeof(sockaddr_in);
01164
01165
01166 it_item->_overlapped->hAccept = ::accept(it_item->aio_fildes, (sockaddr*)&it_item->_overlapped->remoteAddress, &len);
01167 if (INVALID_SOCKET != it_item->_overlapped->hAccept)
01168 {
01169
01170 it_item->_error = 0;
01171 }
01172 else if (
01173 (it_item->_error = errno) == EWOULDBLOCK
01174 )
01175 {
01176 break;
01177 }
01178
01179
01180 it_fd->_port_iter->_completion_container.push_back(_queue_allocator, *it_item);
01181
01182
01183 it_item = it_fd->_initial_container.erase(_queue_allocator, it_item);
01184
01185 ++accept_waited;
01186
01187
01188 continue;
01189 }
01190 break;
01191 case ACTION_RECV:
01192 {
01193 socklen_t alen = sizeof(sockaddr_in), len = it_item->aio_nbytes;
01194
01195
01196 int ret = (it_fd->_type == TYPE_TCP) ? ::recv(it_item->aio_fildes, (char*)it_item->aio_buf, len, 0) :
01197 ::recvfrom(it_item->aio_fildes, (char*)it_item->aio_buf, len, 0, (sockaddr*)&it_item->_overlapped->remoteAddress, &alen);
01198
01199 if (ret < 0)
01200 {
01201 it_item->_processed = 0;
01202 if (
01203 (it_item->_error = errno) == EWOULDBLOCK || errno == EAGAIN
01204 )
01205 {
01206 it_item->_error = 0;
01207 break;
01208 }
01209 }
01210 else
01211 {
01212 it_item->_processed = ret;
01213 it_item->_error = 0;
01214 }
01215
01216
01217 it_fd->_port_iter->_completion_container.push_back(_queue_allocator, *it_item);
01218
01219
01220 it_item = it_fd->_initial_container.erase(_queue_allocator, it_item);
01221
01222 ++recv_waited;
01223
01224 continue;
01225 }
01226 break;
01227 default:
01228 ++it_item;
01229 continue;
01230 }
01231
01232 break;
01233 }
01234
01235 if (accept_waited + recv_waited)
01236 {
01237 format_logging(0, __FILE__, __LINE__, en_log_info, "socket select command detected %d accept and %d recv", accept_waited, recv_waited);
01238
01239 it_fd->_port_iter->_queue_event.signal();
01240 }
01241
01242
01243 }
01244 break;
01245 case POLL_OUT:
01246 {
01247 size_t connect_waited = 0, send_waited = 0;
01248
01249
01250 for (queue_container_t::iterator it_item = it_fd->_initial_container.begin(); it_item != it_fd->_initial_container.end();)
01251 {
01252 switch (it_item->_action)
01253 {
01254 case ACTION_CONNECT:
01255 {
01256
01257 it_item->_processed = 0;
01258 it_item->_error = 0;
01259
01260
01261 it_fd->_port_iter->_completion_container.push_back(_queue_allocator, *it_item);
01262
01263 it_item = it_fd->_initial_container.erase(_queue_allocator, it_item);
01264
01265 ++connect_waited;
01266 }
01267 break;
01268 case ACTION_SEND:
01269 {
01270
01271 int ret = (it_fd->_type == TYPE_TCP) ? ::send(it_item->aio_fildes, (const char*)it_item->aio_buf, (int)it_item->aio_nbytes, 0) :
01272 ::sendto(it_item->aio_fildes, (const char*)it_item->aio_buf, (int)it_item->aio_nbytes, 0, (const sockaddr*)&it_item->_overlapped->remoteAddress, sizeof(sockaddr_in));
01273
01274 if (ret < 0)
01275 {
01276 it_item->_processed = 0;
01277 if (
01278 (it_item->_error = errno) == EWOULDBLOCK || errno == EAGAIN
01279 )
01280 {
01281 it_item->_error = 0;
01282 break;
01283 }
01284 }
01285 else
01286 {
01287 it_item->_processed = ret;
01288 it_item->_error = 0;
01289 }
01290
01291
01292 it_fd->_port_iter->_completion_container.push_back(_queue_allocator, *it_item);
01293
01294
01295 it_item = it_fd->_initial_container.erase(_queue_allocator, it_item);
01296
01297 ++send_waited;
01298
01299 continue;
01300 }
01301 break;
01302 default:
01303 ++it_item;
01304 continue;
01305 }
01306
01307 break;
01308 }
01309
01310 if (connect_waited + send_waited)
01311 {
01312 format_logging(0, __FILE__, __LINE__, en_log_info, "socket select command detected %d connect and %d send", connect_waited, send_waited);
01313
01314 it_fd->_port_iter->_queue_event.signal();
01315 }
01316 }
01317 break;
01318 case POLL_ERR:
01319 case POLL_HUP:
01320 {
01321 size_t error_waited= 0;
01322
01323 int err;
01324
01325 socklen_t errlen = sizeof(err);
01326 getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*)&err, &errlen);
01327
01328
01329 for (queue_container_t::iterator it_item = it_fd->_initial_container.begin(); it_item != it_fd->_initial_container.end();)
01330 {
01331 it_item->_processed = 0;
01332 it_item->_error = err;
01333
01334 it_fd->_port_iter->_completion_container.push_back(_queue_allocator, *it_item);
01335
01336 it_item = it_fd->_initial_container.erase(_queue_allocator, it_item);
01337
01338
01339 ++error_waited;
01340 }
01341
01342 if (error_waited)
01343 {
01344 format_logging(0, __FILE__, __LINE__, en_log_info, "socket select command detected %d errors", error_waited);
01345
01346 it_fd->_port_iter->_queue_event.signal();
01347 }
01348 }
01349
01350 break;
01351 default:
01352 format_logging(0, __FILE__, __LINE__, en_log_error, "unexpected si_code %d detected", si_code);
01353 return;
01354 }
01355 }
01356
01358 void
01359 aiocomport::func_file_signal(int fd, void* ptr)
01360 {
01361
01362 mutex_keeper keeper(_port_mtx);
01363
01364
01365 fd_port_map_t::iterator it_fd = _fd_port_map.find(fd);
01366 if (it_fd == _fd_port_map.end())
01367 {
01368 format_logging(0, __FILE__, __LINE__, en_log_error, "func_signal: can not find socket %d", fd);
01369 return;
01370 }
01371
01372
01373 queue_item* qitem = static_cast< queue_item* >(ptr);
01374
01375 for (queue_container_t::iterator it_item = it_fd->_initial_container.begin(); it_item != it_fd->_initial_container.end(); ++it_item)
01376 {
01377 if (qitem != &*it_item)
01378 continue;
01379
01380 switch (it_item->_action)
01381 {
01382 case ACTION_READ:
01383 {
01384 int ret = aio_error (&*it_item);
01385 if (ret)
01386 {
01387 it_item->_processed = 0;
01388 if ((it_item->_error = ret) == EINPROGRESS)
01389 {
01390 it_item->_error = 0;
01391 break;
01392 }
01393 }
01394 else
01395 {
01396 it_item->_processed = aio_return(&*it_item);
01397 it_item->_error = 0;
01398 }
01399
01400
01401 it_fd->_port_iter->_completion_container.push_back(_queue_allocator, *it_item);
01402
01403
01404 it_item = it_fd->_initial_container.erase(_queue_allocator, it_item);
01405
01406 format_logging(0, __FILE__, __LINE__, en_log_info, "file detected read command");
01407 it_fd->_port_iter->_queue_event.signal();
01408 }
01409 break;
01410 case ACTION_WRITE:
01411 {
01412 int ret = aio_error (&*it_item);
01413 if (ret)
01414 {
01415 it_item->_processed = 0;
01416 if ((it_item->_error = ret) == EINPROGRESS)
01417 {
01418 it_item->_error = 0;
01419 break;
01420 }
01421 }
01422 else
01423 {
01424 it_item->_processed = aio_return(&*it_item);
01425 it_item->_error = 0;
01426 }
01427
01428
01429 it_fd->_port_iter->_completion_container.push_back(_queue_allocator, *it_item);
01430
01431
01432 it_item = it_fd->_initial_container.erase(_queue_allocator, it_item);
01433
01434 format_logging(0, __FILE__, __LINE__, en_log_info, "file detected write command");
01435 it_fd->_port_iter->_queue_event.signal();
01436 }
01437 break;
01438 default:
01439 break;
01440 }
01441
01442 break;
01443 }
01444 }
01445
01447
01448
01449
01450 bool
01451 aiocomport::v_has_job(size_t ident, void* data)
01452 {
01453
01454 format_logging(0, __FILE__, __LINE__, en_log_info, "v_has_job: event_thread_id = %d", _event_thread_id);
01455 return _event_thread_id == 0;
01456 }
01457
01458 #define quant 16
01459 class desc_bag
01460 {
01461 public:
01462 desc_bag() :
01463 _desc(0), _rptr(0), _wptr(0), _mask(0)
01464 {
01465 }
01466
01467 HANDLE _desc;
01468 aiocb* _rptr;
01469 aiocb* _wptr;
01470 int _mask;
01471 };
01472
01473
01474
01475 void
01476 aiocomport::v_do_job(size_t ident, void* data)
01477 {
01478
01479 _event_thread_id = pthread_self();
01480
01481 _ev_activate.signal();
01482
01483 format_logging(0, __FILE__, __LINE__, en_log_info, "v_do_job: before while event_thread_id = %d", _event_thread_id);
01484
01485 #ifndef NO_NPTL
01486 int signo;
01487 siginfo_t info;
01488
01489
01490 timespec timeouts;
01491 timeouts.tv_sec=1;
01492 timeouts.tv_nsec=0;
01493
01494
01495 sigset_t ss;
01496 sigemptyset(&ss);
01497 sigaddset(&ss, AIOFILESIGNAL);
01498 sigaddset(&ss, SIGIO);
01499 sigaddset(&ss, AIOSOCKSIGNAL);
01500
01501
01502 while (_event_thread_id != 0)
01503 {
01504 if ((signo = sigtimedwait(&ss, &info, &timeouts)) > 0)
01505 {
01506 if (signo == AIOSOCKSIGNAL)
01507 func_sock_signal(info.si_fd, info.si_code);
01508 else if (signo == AIOFILESIGNAL)
01509 func_file_signal(info.si_fd, info.si_value.sival_ptr);
01510 else if (signo == SIGIO)
01511 {
01512
01513 format_logging(0, __FILE__, __LINE__, en_log_info, "v_do_job: RT queue overflow event_thread_id = %d", _event_thread_id);
01514
01515 if (info.si_fd)
01516 {
01517 func_sock_signal(info.si_fd, info.si_code);
01518 }
01519 }
01520 }
01521 }
01522 #else
01523 fd_set s_read, s_write, s_err;
01524
01525 timeval timeoutv;
01526
01527 TERIMBER::byte_allocator active_desc_allocator(1024*64);
01528
01529 while (_event_thread_id != 0)
01530 {
01531 active_desc_allocator.clear_extra();
01532
01533 TERIMBER::_list< desc_bag > active_desc_container;
01534
01535 FD_ZERO(&s_read);
01536 FD_ZERO(&s_write);
01537 FD_ZERO(&s_err);
01538
01539
01540 mutex_keeper keeper(_port_mtx);
01541
01542 int fdcount = _fd_port_map.size();
01543 SOCKET maxdesc = 0;
01544
01545
01546 for (fd_port_map_t::iterator it_fd = _fd_port_map.begin(); it_fd != _fd_port_map.end(); ++it_fd)
01547 {
01548 desc_bag bag;
01549 bag._desc = it_fd.key();
01550
01551 for (queue_container_t::iterator it_item = it_fd->_initial_container.begin(); it_item != it_fd->_initial_container.end(); ++it_item)
01552 {
01553 int m = (1 << it_item->_action);
01554 if (bag._mask & m)
01555 continue;
01556
01557 bag._mask |= m;
01558
01559 switch (it_item->_action)
01560 {
01561 case ACTION_ACCEPT:
01562 case ACTION_RECV:
01563 format_logging(0, __FILE__, __LINE__, en_log_info, "v_do_job: socket %d need to accept or read", bag._desc);
01564 FD_SET(bag._desc, &s_read);
01565 break;
01566 case ACTION_CONNECT:
01567 case ACTION_SEND:
01568 format_logging(0, __FILE__, __LINE__, en_log_info, "v_do_job: socket %d need to connect or send", bag._desc);
01569 FD_SET(bag._desc, &s_write);
01570 break;
01571 case ACTION_READ:
01572 format_logging(0, __FILE__, __LINE__, en_log_info, "v_do_job: file %d need to read", bag._desc);
01573 FD_SET(bag._desc, &s_read);
01574 bag._rptr = &*it_item;
01575 break;
01576 case ACTION_WRITE:
01577 format_logging(0, __FILE__, __LINE__, en_log_info, "v_do_job: file %d need to write", bag._desc);
01578 FD_SET(bag._desc, &s_write);
01579 bag._wptr = &*it_item;
01580 break;
01581 }
01582 }
01583
01584 if (!it_fd->_initial_container.empty())
01585 {
01586 FD_SET(maxdesc = bag._desc, &s_err);
01587 active_desc_container.push_back(active_desc_allocator, bag);
01588 }
01589 }
01590
01591 keeper.unlock();
01592
01593 if (maxdesc)
01594 {
01595
01596 timeoutv.tv_sec=0;
01597 timeoutv.tv_usec=quant*1000;
01598
01599 if (select(maxdesc + 1, &s_read, &s_write, &s_err, &timeoutv) > 0)
01600 {
01601 for (TERIMBER::_list< desc_bag >::const_iterator it = active_desc_container.begin(); it != active_desc_container.end(); ++it)
01602 {
01603
01604 HANDLE fd = it->_desc;
01605
01606
01607
01608 if (FD_ISSET(fd, &s_read))
01609 {
01610 if (it->_mask & ((1 << ACTION_ACCEPT) | (1 << ACTION_RECV)))
01611 {
01612 format_logging(0, __FILE__, __LINE__, en_log_info, "v_do_job: socket %d signal for recv or accept", fd);
01613 func_sock_signal(fd, POLL_IN);
01614 }
01615 else if (it->_mask & (1 << ACTION_READ))
01616 {
01617 format_logging(0, __FILE__, __LINE__, en_log_info, "v_do_job: file %d signal for read", fd);
01618 func_file_signal(fd, it->_rptr);
01619
01620 continue;
01621 }
01622 }
01623
01624 if (FD_ISSET(fd, &s_write))
01625 {
01626 if (it->_mask & ((1 << ACTION_CONNECT) | (1 << ACTION_SEND)))
01627 {
01628 format_logging(0, __FILE__, __LINE__, en_log_info, "v_do_job: socket %d signal for send or connect", fd);
01629 func_sock_signal(fd, POLL_OUT);
01630 }
01631 else if (it->_mask & (1 << ACTION_WRITE))
01632 {
01633 format_logging(0, __FILE__, __LINE__, en_log_info, "v_do_job: file %d signal for write", fd);
01634 func_file_signal(fd, it->_wptr);
01635
01636 continue;
01637 }
01638 }
01639
01640 if (FD_ISSET(fd, &s_err))
01641 {
01642 if (it->_mask & ((1 << ACTION_ACCEPT) | (1 << ACTION_CONNECT) | (1 << ACTION_SEND) | (1 << ACTION_RECV)))
01643 {
01644 format_logging(0, __FILE__, __LINE__, en_log_info, "v_do_job: socket %d signal for error", fd);
01645 func_sock_signal(fd, POLL_ERR);
01646 }
01647 else if (it->_mask & (1 << ACTION_READ))
01648 {
01649 format_logging(0, __FILE__, __LINE__, en_log_info, "v_do_job: file %d signal for error on read", fd);
01650 func_file_signal(fd, it->_rptr);
01651 }
01652 else if (it->_mask & (1 << ACTION_WRITE))
01653 {
01654 format_logging(0, __FILE__, __LINE__, en_log_info, "v_do_job: file %d signal for error on write", fd);
01655 func_file_signal(fd, it->_wptr);
01656 }
01657 }
01658 }
01659 }
01660 }
01661 else
01662 {
01663
01664 _ev_wakeup.wait(1000);
01665
01666 }
01667 }
01668 #endif
01669 _ev_deactivate.signal();
01670 }
01671
01672
01673 #pragma pack()
01674 END_TERIMBER_NAMESPACE
01675
01676 #endif