00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "osdetect.h"
00029
00031 #if OS_TYPE == OS_WIN32
00032 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00033 #include <winsock2.h>
00034 #endif
00035 #endif
00036
00038 #include "base/list.hpp"
00039 #include "aiofile/aiofile.h"
00040 #include "base/map.hpp"
00041 #include "base/stack.hpp"
00042 #include "base/memory.hpp"
00043 #include "base/common.hpp"
00044
00045
00047 terimber_aiofile_factory::terimber_aiofile_factory()
00048 {
00049 }
00050
00052 terimber_aiofile_factory::~terimber_aiofile_factory()
00053 {
00054 }
00055
00057 terimber_aiofile*
00058 terimber_aiofile_factory::get_aiofile(terimber_log* log, size_t capacity, size_t deactivate_time_msec)
00059 {
00060
00061 terimber::aiofile* obj = new terimber::aiofile(capacity, deactivate_time_msec);
00062 if (obj)
00063 {
00064
00065 obj->log_on(log);
00066
00067 obj->on();
00068 }
00069
00070 return obj;
00071 }
00072
00073
00074 BEGIN_TERIMBER_NAMESPACE
00075 #pragma pack(4)
00076
00077
00079 const size_t aiofile_io_initiation_ident = 1;
00081 const size_t aiofile_io_initiation_thread_alert = 1000;
00083 const size_t aiofile_completion_io_port_ident = 2;
00085 const size_t aiofile_completion_io_port_thread_alert = INFINITE;
00087 const size_t aiofile_working_ident = 3;
00089 const size_t aiofile_working_thread_alert = 60000;
00090
00091
00093 aiofile_block::aiofile_block()
00094 {
00095 clear();
00096 }
00097
00098 aiofile_block::aiofile_block(const aiofile_block& x)
00099 {
00100 *this = x;
00101 }
00102
00103 aiofile_block&
00104 aiofile_block::operator=(const aiofile_block& x)
00105 {
00106 if (this != &x)
00107 {
00108 memcpy(this, &x, sizeof(aiofile_block));
00109 }
00110
00111 return *this;
00112 }
00113
00114 void
00115 aiofile_block::clear()
00116 {
00117 memset(this, 0, sizeof(aiofile_block));
00118 }
00119
00120 void
00121 aiofile_block::set_timeout(size_t timeout)
00122 {
00123 if (timeout == INFINITE)
00124 {
00125 #if OS_TYPE == OS_WIN32
00126 _timeout = 0;
00127 #else
00128 _timeout.tv_sec = _timeout.tv_usec = 0;
00129 #endif
00130 _expired = 0;
00131 }
00132 else
00133 {
00134 #if OS_TYPE == OS_WIN32
00135 _timeout = timeout;
00136 #else
00137 _timeout.tv_sec = timeout / 1000;
00138 _timeout.tv_usec = (timeout % 1000) * 1000 + 1;
00139 #endif
00140 date now;
00141 _expired = (sb8_t)now + timeout;
00142 }
00143 }
00144
00145 aiofile::aiofile(size_t capacity, size_t deactivate_time_msec) :
00146 _file_map(less< size_t >(), 64)
00147 ,_reverse_map(less< aio_file_handle >(), 64)
00148 ,_file_generator(64)
00149 ,_outgoing_list(64)
00150 ,_aiofile_io_handle(0)
00151 ,_thread_pool(capacity + 3, deactivate_time_msec)
00152 ,_capacity(capacity)
00153 ,_on(false)
00154 ,_flag_io_port(false)
00155 {
00156 }
00157
00158 aiofile::~aiofile()
00159 {
00160
00161 off();
00162 }
00163
00164 bool
00165 aiofile::on()
00166 {
00167 if (_on)
00168 {
00169 format_logging(0, __FILE__, __LINE__, en_log_error, "aiofile already started");
00170 return false;
00171 }
00172
00173 format_logging(0, __FILE__, __LINE__, en_log_info, "starting aiofile...");
00174
00175 #if OS_TYPE != OS_WIN32
00176 TERIMBER::SetLog(this);
00177 #endif
00178 _thread_pool.log_on(this);
00179
00180 if (!_thread_pool.on())
00181 {
00182 format_logging(0, __FILE__, __LINE__, en_log_error, "can not start thread pool");
00183 return false;
00184 }
00185
00186 format_logging(0, __FILE__, __LINE__, en_log_info, "init completion port");
00187 #if OS_TYPE == OS_WIN32
00188 _aiofile_io_handle = ::CreateIoCompletionPort((HANDLE)INVALID_SOCKET, 0, 0, 0);
00189 #else
00190 _aiofile_io_handle = TERIMBER::CreateIoCompletionPort((HANDLE)INVALID_SOCKET, 0, 0, TYPE_UNKNOWN);
00191 #endif
00192
00193
00194 if (!_aiofile_io_handle)
00195 {
00196 #if OS_TYPE == OS_WIN32
00197 ::CloseHandle((HANDLE)_aiofile_io_handle);
00198 #else
00199 TERIMBER::CloseHandle(_aiofile_io_handle);
00200 #endif
00201 format_logging(0, __FILE__, __LINE__, en_log_info, "can not initiate completion port");
00202 return false;
00203 }
00204
00205 _flag_io_port = true;
00206 _thread_pool.borrow_thread(aiofile_completion_io_port_ident, 0, this, aiofile_completion_io_port_thread_alert);
00207 _start_io_port.wait();
00208
00209
00210 _thread_pool.borrow_thread(aiofile_working_ident, 0, this, aiofile_working_thread_alert);
00211
00212
00213 _in_thread.start();
00214 job_task task(this, aiofile_io_initiation_ident, aiofile_io_initiation_thread_alert, 0);
00215 _in_thread.assign_job(task);
00216
00217 _on = true;
00218
00219 format_logging(0, __FILE__, __LINE__, en_log_info, "aio file port is initialized");
00220
00221 return _aiofile_io_handle != 0;
00222 }
00223
00224
00225
00226 void
00227 aiofile::off()
00228 {
00229 if (!_on)
00230 {
00231 format_logging(0, __FILE__, __LINE__, en_log_error, "aiofile already stopped");
00232 return;
00233 }
00234
00235 format_logging(0, __FILE__, __LINE__, en_log_info, "stoping aiofile...");
00236 _in_thread.cancel_job();
00237 _in_thread.stop();
00238
00239
00240 format_logging(0, __FILE__, __LINE__, en_log_info, "send stop message to completion port");
00241 #if OS_TYPE == OS_WIN32
00242 ::PostQueuedCompletionStatus((HANDLE)_aiofile_io_handle, 0, 0, 0);
00243 #else
00244 TERIMBER::PostQueuedCompletionStatus(_aiofile_io_handle, 0, 0, 0);
00245 #endif
00246
00247 _stop_io_port.wait();
00248 format_logging(0, __FILE__, __LINE__, en_log_info, "completion port stopped");
00249
00250
00251 #if OS_TYPE == OS_WIN32
00252
00253 ::CloseHandle((HANDLE)_aiofile_io_handle);
00254 #else
00255 TERIMBER::CloseHandle(_aiofile_io_handle);
00256 #endif
00257
00258
00259 _aiofile_io_handle = 0;
00260
00261 format_logging(0, __FILE__, __LINE__, en_log_info, "Stoping thread pool");
00262 _thread_pool.revoke_client(this);
00263 _thread_pool.off();
00264 _thread_pool.log_on(0);
00265
00266 format_logging(0, __FILE__, __LINE__, en_log_info, "Close all files");
00267
00268 mutex_keeper guard(_mtx);
00269 for (aiofile_file_map_iterator_t iter = _file_map.begin(); iter != _file_map.end(); ++iter)
00270 {
00271 iter->_incoming_list.erase(_incoming_list_allocator, iter->_incoming_list.begin(), iter->_incoming_list.end());
00272 _cancel_file(iter->_handle);
00273 _close_file(iter->_handle);
00274 }
00275
00276
00277 format_logging(0, __FILE__, __LINE__, en_log_info, "cleans up resources");
00278
00279
00280 _file_generator.clear();
00281 _file_map.clear();
00282 _reverse_map.clear();
00283 _delay_key_map.clear();
00284
00285
00286 _clear_block_lists();
00287
00288
00289 _incoming_list_allocator.clear_extra();
00290 _block_allocator.clear_extra();
00291
00292
00293 #if OS_TYPE != OS_WIN32
00294 TERIMBER::SetLog(0);
00295 #endif
00296
00297
00298 _on = false;
00299
00300 format_logging(0, __FILE__, __LINE__, en_log_info, "aio file port is uninitialized");
00301 }
00302
00303
00304 bool
00305 aiofile::v_has_job(size_t ident, void* data)
00306 {
00307
00308 if (_aiofile_io_handle == 0)
00309 return false;
00310
00311 switch (ident)
00312 {
00313 case aiofile_completion_io_port_ident:
00314 return _flag_io_port;
00315 case aiofile_io_initiation_ident:
00316 {
00317
00318 mutex_keeper guard(_mtx);
00319
00320
00321 if (!_initial_list.empty())
00322 return true;
00323
00324
00325 date now;
00326 sb8_t unow = (sb8_t)now;
00327
00328 for (aiofile_file_map_iterator_t iter_file = _file_map.begin(); iter_file != _file_map.end(); ++iter_file)
00329 {
00330 for (aiofile_pblock_alloc_list_t::iterator iter_block = iter_file->_incoming_list.begin();
00331 iter_block != iter_file->_incoming_list.end(); ++iter_block)
00332 {
00333
00334 aiofile_block* block = *iter_block;
00335
00336 if (block->_expired != 0
00337 && unow >= block->_expired)
00338 {
00339 return true;
00340 }
00341 }
00342 }
00343
00344
00345 return false;
00346 }
00347 case aiofile_working_ident:
00348 default:
00349 {
00350
00351 mutex_keeper guard(_mtx);
00352
00353 return !_outgoing_list.empty();
00354 }
00355 }
00356
00357 return false;
00358 }
00359
00360 void
00361 aiofile::wait_for_io_completion()
00362 {
00363
00364 _start_io_port.signal();
00365
00366 while (true)
00367 {
00368 aiofile_block* ov = 0;
00369 size_t file_key = os_minus_one;
00370 size_t num_bytes = 0;
00371 #if OS_TYPE == OS_WIN32
00372 bool bRes = (TRUE == ::GetQueuedCompletionStatus((HANDLE)_aiofile_io_handle,
00373 (DWORD*)&num_bytes,
00374 #if defined(_MSC_VER) && (_MSC_VER > 1200)
00375 (ULONG_PTR*)
00376 #else
00377 (DWORD*)
00378 #endif
00379 &file_key,
00380 (LPOVERLAPPED*)&ov,
00381 INFINITE));
00382
00383 int cRes = bRes ? 0 : ::GetLastError();
00384 #else
00385 int cRes = TERIMBER::GetQueuedCompletionStatus(_aiofile_io_handle,
00386 &num_bytes,
00387 &file_key,
00388 (LPOVERLAPPED*)&ov,
00389 INFINITE);
00390
00391 #endif
00392 if (!file_key)
00393 {
00394
00395 _flag_io_port = false;
00396
00397 _stop_io_port.signal();
00398
00399 break;
00400 }
00401
00402
00403 complete_block(file_key, ov, cRes, num_bytes);
00404 }
00405 }
00406
00407 void
00408 aiofile::complete_block(size_t file_key, aiofile_block* ov, int err, size_t processed)
00409 {
00410
00411 mutex_keeper guard(_mtx);
00412
00413 aiofile_file_map_iterator_t iter_file = _file_map.find(file_key);
00414
00415 if (iter_file == _file_map.end())
00416 {
00417 format_logging(0, __FILE__, __LINE__, en_log_error, "file key %d not found", file_key);
00418 return;
00419 }
00420
00421
00422 for (aiofile_pblock_alloc_list_t::iterator iter_block = iter_file->_incoming_list.begin(); iter_block != iter_file->_incoming_list.end(); ++iter_block)
00423 {
00424
00425 if (*iter_block != ov)
00426 continue;
00427
00428
00429 aiofile_block* block = *iter_block;
00430
00431 iter_file->_incoming_list.erase(_incoming_list_allocator, iter_block);
00432
00433 block->_err = err;
00434
00435 block->_processed = processed;
00436
00437 _outgoing_list.push_back(block);
00438
00439 guard.unlock();
00440
00441 if (!_capacity || !_thread_pool.borrow_from_range(aiofile_working_ident, aiofile_working_ident + _capacity, 0, this, aiofile_working_thread_alert))
00442 _thread_pool.borrow_thread(aiofile_working_ident, 0, this, aiofile_working_thread_alert);
00443
00444 return;
00445 }
00446
00447 format_logging(0, __FILE__, __LINE__, en_log_info, "completed block not found for file %d, looking in abounded list", file_key);
00448
00449 #if OS_TYPE == OS_WIN32
00450
00451
00452
00453 for (aiofile_pblock_alloc_list_t::iterator iter_abounded = _abounded_list.begin(); iter_abounded != _abounded_list.end(); ++iter_abounded)
00454 {
00455 if (*iter_abounded != ov)
00456 continue;
00457
00458
00459 aiofile_block* block = *iter_abounded;
00460
00461 _abounded_list.erase(iter_abounded);
00462
00463 _put_block(block);
00464 return;
00465 }
00466
00467 format_logging(0, __FILE__, __LINE__, en_log_info, "completed block not found for file %d anywere", file_key);
00468 #endif
00469
00470 }
00471
00472
00473 void
00474 aiofile::v_do_job(size_t ident, void* data)
00475 {
00476 switch (ident)
00477 {
00478 case aiofile_completion_io_port_ident:
00479 wait_for_io_completion();
00480 break;
00481 case aiofile_io_initiation_ident:
00482 {
00483
00484 mutex_keeper guard(_mtx);
00485
00486 if (_initial_list.empty())
00487 return;
00488
00489
00490 aiofile_block* block = _initial_list.front();
00491
00492 _initial_list.pop_front();
00493
00494
00495 if (int err = _process_block(block))
00496 {
00497
00498 block->_err = err;
00499
00500 _outgoing_list.push_back(block);
00501
00502 guard.unlock();
00503
00504 if (!_capacity || !_thread_pool.borrow_from_range(aiofile_working_ident, aiofile_working_ident + _capacity, 0, this, aiofile_working_thread_alert))
00505 _thread_pool.borrow_thread(aiofile_working_ident, 0, this, aiofile_working_thread_alert);
00506 }
00507 }
00508 break;
00509 case aiofile_working_ident:
00510 default:
00511 {
00512 aiofile_block* block = 0;
00513 aio_file_handle handle = 0;
00514 aio_file_handle accept_handle = 0;
00515 terimber_aiofile_callback* client_obj = 0;
00516
00517
00518 mutex_keeper guard(_mtx);
00519
00520 if (_outgoing_list.empty())
00521 return;
00522
00523
00524 block = _outgoing_list.front();
00525
00526 _outgoing_list.pop_front();
00527
00528
00529 aiofile_file_map_t::iterator iter_file = _file_map.find(block->_file_ident);
00530
00531 if (iter_file == _file_map.end())
00532 {
00533 format_logging(0, __FILE__, __LINE__, en_log_error, "file key %d not found", block->_file_ident);
00534
00535 _put_block(block);
00536 return;
00537 }
00538 else
00539 {
00540
00541 client_obj = iter_file->_client_obj;
00542
00543 handle = iter_file->_handle;
00544
00545 ++iter_file->_callback_invoking;
00546 }
00547
00548
00549 if (block->_err)
00550 {
00551
00552
00553 guard.unlock();
00554
00555 try
00556 {
00557
00558 client_obj->v_on_error(block->_file_ident, block->_err, block->_type, block->_userdata);
00559 }
00560 catch (...)
00561 {
00562 format_logging(0, __FILE__, __LINE__, en_log_error, "v_on_error exception for file %d", block->_file_ident);
00563 assert(false);
00564 }
00565 }
00566 else
00567 {
00568
00569
00570 switch (block->_type)
00571 {
00572 case AIOFILE_WRITE:
00573
00574 guard.unlock();
00575
00576 try
00577 {
00578
00579 client_obj->v_on_write(block->_file_ident, (void*)block->_buf, block->_len, block->_processed, block->_userdata);
00580 }
00581 catch (...)
00582 {
00583 format_logging(0, __FILE__, __LINE__, en_log_error, "v_on_write exception for file %d", block->_file_ident);
00584 assert(false);
00585 }
00586 break;
00587 case AIOFILE_READ:
00588
00589 guard.unlock();
00590
00591 try
00592 {
00593
00594 client_obj->v_on_read(block->_file_ident, (void*)block->_buf, block->_len, block->_processed, block->_userdata);
00595 }
00596 catch (...)
00597 {
00598 format_logging(0, __FILE__, __LINE__, en_log_error, "v_on_read exception for file %d", block->_file_ident);
00599 assert(false);
00600 }
00601 break;
00602 default:
00603 assert(false);
00604
00605 }
00606 }
00607
00608
00609 guard.lock();
00610
00611
00612
00613 iter_file = _file_map.find(block->_file_ident);
00614
00615
00616 if (iter_file == _file_map.end())
00617 {
00618
00619 aiofile_delay_key_t::iterator iter_delay = _delay_key_map.find(block->_file_ident);
00620
00621 if (iter_delay != _delay_key_map.end())
00622 {
00623 if (--*iter_delay <= 0)
00624 {
00625
00626 _delay_key_map.erase(iter_delay);
00627
00628 _file_generator.save(block->_file_ident);
00629 }
00630 }
00631 }
00632 else
00633 {
00634
00635 --iter_file->_callback_invoking;
00636 }
00637
00638
00639 _put_block(block);
00640 }
00641 break;
00642 }
00643 }
00644
00645
00646 size_t
00647 aiofile::open(const char* file_name, bool read_write, terimber_aiofile_callback* callback)
00648 {
00649 if (!_aiofile_io_handle)
00650 {
00651 format_logging(0, __FILE__, __LINE__, en_log_error, "aio file port is not initialized");
00652 return 0;
00653 }
00654
00655
00656 #if OS_TYPE == OS_WIN32
00657 DWORD desiredAccess = (read_write ? GENERIC_READ : GENERIC_WRITE);
00658 DWORD sharedMode = (read_write ? FILE_SHARE_READ : FILE_SHARE_WRITE);
00659 DWORD creationDisposition = (read_write ? OPEN_EXISTING : CREATE_ALWAYS);
00660
00661 HANDLE handle = ::CreateFile(file_name, desiredAccess, sharedMode, 0, creationDisposition, FILE_FLAG_SEQUENTIAL_SCAN | FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0);
00662 if (handle == INVALID_HANDLE_VALUE)
00663 {
00664 format_logging(0, __FILE__, __LINE__, en_log_error, "can not open file %s, access mode %s", file_name, read_write ? "read" : "write");
00665 return 0;
00666 }
00667 ::SetFilePointer(handle, 0, 0, FILE_BEGIN);
00668 #else
00669 int oflag = (read_write ? (O_RDONLY|O_NONBLOCK) : (O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK));
00670 int oshare = S_IRWXU | S_IRWXG | S_IRWXO;
00671 HANDLE handle = ::open(file_name, oflag, oshare);
00672 if (handle == -1)
00673 {
00674 format_logging(0, __FILE__, __LINE__, en_log_error, "can not open file %s, access mode %s", file_name, read_write ? "read" : "write");
00675 return 0;
00676 }
00677 #endif
00678
00679 mutex_keeper guard(_mtx);
00680 return _assign_file(handle, callback);
00681 }
00682
00683
00684 void
00685 aiofile::close(size_t handle)
00686 {
00687 if (!_aiofile_io_handle)
00688 {
00689 format_logging(0, __FILE__, __LINE__, en_log_error, "aio file port is not initialized");
00690 return;
00691 }
00692
00693 mutex_keeper guard(_mtx);
00694 aiofile_file_map_iterator_t iter = _file_map.find(handle);
00695
00696 if (iter == _file_map.end())
00697 {
00698 format_logging(0, __FILE__, __LINE__, en_log_error, "file %d not found", handle);
00699 return;
00700 }
00701
00702
00703 HANDLE hfile = iter->_handle;
00704 bool delay_key = false;
00705
00706
00707 iter->_incoming_list.erase(_incoming_list_allocator, iter->_incoming_list.begin(), iter->_incoming_list.end());
00708
00709 if (iter->_callback_invoking)
00710 {
00711 delay_key = true;
00712 aiofile_delay_key_t::iterator iter_delay = _delay_key_map.find(handle);
00713 if (iter_delay != _delay_key_map.end())
00714 *iter_delay += (int)iter->_callback_invoking;
00715 else
00716 _delay_key_map.insert(handle, (int)iter->_callback_invoking);
00717 }
00718
00719 _reverse_map.erase(iter->_handle);
00720 _file_map.erase(iter);
00721
00722 for (aiofile_pblock_list_t::iterator in_iter = _initial_list.begin(); in_iter != _initial_list.end();)
00723 {
00724 aiofile_block* block = *in_iter;
00725 if (block->_file_ident == handle)
00726 {
00727 _put_block(block);
00728 in_iter = _initial_list.erase(in_iter);
00729 }
00730 else
00731 ++in_iter;
00732 }
00733
00734 for (aiofile_pblock_list_t::iterator out_iter = _outgoing_list.begin(); out_iter != _outgoing_list.end();)
00735 {
00736 aiofile_block* block = *out_iter;
00737 if (block->_file_ident == handle)
00738 {
00739 _put_block(block);
00740 out_iter = _outgoing_list.erase(out_iter);
00741 }
00742 else
00743 ++out_iter;
00744 }
00745
00746 if (hfile)
00747 {
00748 _cancel_file(hfile);
00749 _close_file(hfile);
00750 }
00751
00752
00753 #if OS_TYPE == OS_WIN32
00754
00755 for (aiofile_pblock_alloc_list_t::iterator iter_list = iter->_incoming_list.begin(); iter_list != iter->_incoming_list.end();)
00756 {
00757 _abounded_list.push_back(*iter_list);
00758 iter_list = iter->_incoming_list.erase(_incoming_list_allocator, iter_list);
00759 }
00760 #else
00761
00762 for (aiofile_pblock_alloc_list_t::iterator iter_list = iter->_incoming_list.begin(); iter_list != iter->_incoming_list.end();)
00763 {
00764 aiofile_block* block = *iter_list;
00765 _put_block(block);
00766 iter_list = iter->_incoming_list.erase(_incoming_list_allocator, iter_list);
00767 }
00768
00769 #endif
00770
00771 if (!delay_key)
00772
00773 _file_generator.save(handle);
00774
00775 format_logging(0, __FILE__, __LINE__, en_log_info, "file handle %u is closed", handle);
00776 }
00777
00778
00779
00780 int
00781 aiofile::write(size_t handle, size_t offset, const void* buf, size_t len, size_t timeout, void* userdata)
00782 {
00783 mutex_keeper guard(_mtx);
00784
00785 aiofile_block* block = _get_block();
00786 block->set_timeout(timeout);
00787
00788 #if OS_TYPE == OS_WIN32
00789 block->Offset = (DWORD)offset;
00790 block->OffsetHigh = 0;
00791 #else
00792 block->offset = offset;
00793 #endif
00794
00795 block->_type = AIOFILE_WRITE;
00796 block->_userdata = userdata;
00797
00798 block->_buf = (char*)buf;
00799 block->_len = len;
00800
00801 return _activate_block(handle, block);
00802 }
00803
00804
00805
00806
00807 int
00808 aiofile::read(size_t handle, size_t offset, void* buf, size_t len, size_t timeout, void* userdata)
00809 {
00810 mutex_keeper guard(_mtx);
00811
00812 aiofile_block* block = _get_block();
00813 block->set_timeout(timeout);
00814
00815 #if OS_TYPE == OS_WIN32
00816 block->Offset = (DWORD)offset;
00817 block->OffsetHigh = 0;
00818 #else
00819 block->offset = offset;
00820 #endif
00821
00822 block->_type = AIOFILE_READ;
00823 block->_userdata = userdata;
00824
00825 block->_buf = (char*)buf;
00826 block->_len = len;
00827
00828 return _activate_block(handle, block);
00829 }
00830
00831
00832
00833 void
00834 aiofile::doxray()
00835 {
00836 mutex_keeper guard(_mtx);
00837
00838 size_t files = _file_map.size(),
00839 delay_actions = _delay_key_map.size(),
00840 initiated_actions = _initial_list.size(),
00841 completed_actions = _outgoing_list.size(),
00842 abounded_actions =
00843 #if OS_TYPE == OS_WIN32
00844 _abounded_list.size();
00845 #else
00846 0;
00847 #endif
00848
00849 guard.unlock();
00850
00851 format_logging(0, __FILE__, __LINE__, en_log_xray, "<aiofile files=\"%d\" delayed=\"%d\" initiated=\"%d\" completed=\"%d\" abounded=\"%d\" />",
00852 files, delay_actions, initiated_actions, completed_actions, abounded_actions);
00853
00854 #if OS_TYPE != OS_WIN32
00855 TERIMBER::DoXRay();
00856 #endif
00857 _thread_pool.doxray();
00858 }
00859
00860 size_t
00861 aiofile::_assign_file(aio_file_handle handle, terimber_aiofile_callback* callback)
00862 {
00863
00864 aiofile_file new_file(handle, callback);
00865
00866
00867 size_t ident = _file_generator.generate();
00868
00869
00870 aiofile_file_map_iterator_t iter_file = _file_map.insert(ident, new_file).first;
00871 if (iter_file == _file_map.end())
00872 {
00873 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
00874 _close_file(new_file._handle);
00875 _file_generator.save(ident);
00876 return 0;
00877 }
00878
00879
00880 aiofile_reverse_map_iterator_t iter_reverse = _reverse_map.insert(handle, iter_file).first;
00881 if (iter_reverse == _reverse_map.end())
00882 {
00883 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
00884 _close_file(new_file._handle);
00885 _file_map.erase(iter_file);
00886 _file_generator.save(ident);
00887 return 0;
00888 }
00889
00890 #if OS_TYPE == OS_WIN32
00891
00892 if (!::CreateIoCompletionPort((HANDLE)new_file._handle,
00893 (HANDLE)_aiofile_io_handle,
00894 (DWORD)ident,
00895 3))
00896 #else
00897
00898 if (!TERIMBER::CreateIoCompletionPort(new_file._handle,
00899 _aiofile_io_handle,
00900 ident,
00901 TYPE_FILE))
00902
00903 #endif
00904 {
00905 format_logging(0, __FILE__, __LINE__, en_log_error, "can not assign file handle to completion port");
00906 _close_file(new_file._handle);
00907 _reverse_map.erase(iter_reverse);
00908 _file_map.erase(iter_file);
00909 _file_generator.save(ident);
00910 return 0;
00911 }
00912
00913 format_logging(0, __FILE__, __LINE__, en_log_info, "assign file handle %u, ident %u is open", new_file._handle, ident);
00914
00915 return ident;
00916 }
00917
00918 void
00919 aiofile::_close_file(aio_file_handle handle)
00920 {
00921 format_logging(0, __FILE__, __LINE__, en_log_info, "file handle %d closed", handle);
00922
00923 #if OS_TYPE == OS_WIN32
00924 ::CloseHandle(handle);
00925 #else
00926 ::close(handle);
00927 #endif
00928 }
00929
00930 void
00931 aiofile::_cancel_file(aio_file_handle handle)
00932 {
00933 format_logging(0, __FILE__, __LINE__, en_log_info, "actions for file handle %d aborted", handle);
00934
00935
00936 #if OS_TYPE == OS_WIN32
00937 ::CancelIo(handle);
00938 #else
00939 TERIMBER::CancelIo(handle, 0);
00940 #endif
00941 }
00942
00943 #if OS_TYPE != OS_WIN32
00944
00946 void
00947 aiofile::_cancel_aio( aio_file_handle handle,
00948 OVERLAPPED* overlapped
00949 )
00950 {
00951 TERIMBER::CancelIo(handle, overlapped);
00952 }
00953
00954 #endif
00955
00956 int
00957 aiofile::_process_block(aiofile_block* block)
00958 {
00959 if (!_aiofile_io_handle)
00960 {
00961 format_logging(0, __FILE__, __LINE__, en_log_error, "aiofile is not initialized");
00962 return -1;
00963 }
00964
00965 aiofile_file_map_iterator_t iter_file = _file_map.find(block->_file_ident);
00966 if (iter_file == _file_map.end())
00967 {
00968 format_logging(0, __FILE__, __LINE__, en_log_error, "file %d not found", block->_file_ident);
00969 return -1;
00970 }
00971
00972 aio_file_handle handle = iter_file->_handle;
00973 terimber_aiofile_callback* client = iter_file->_client_obj;
00974
00975
00976 iter_file->_incoming_list.push_back(_incoming_list_allocator, block);
00977
00978
00979
00980 int res = 0;
00981 switch (block->_type)
00982 {
00983 case AIOFILE_WRITE:
00984 res = _process_write(handle, block);
00985 break;
00986 case AIOFILE_READ:
00987 res = _process_read(handle, block);
00988 break;
00989 default:
00990 break;
00991 }
00992
00993
00994
00995
00996
00997
00998 if (!res
00999 || res
01000 #if OS_TYPE == OS_WIN32
01001 && (
01002 res != ERROR_IO_PENDING
01003 )
01004 #else
01005 && res != EWOULDBLOCK
01006 #endif
01007 )
01008 {
01009
01010 iter_file->_incoming_list.pop_back(_incoming_list_allocator);
01011
01012
01013 block->_err = res;
01014
01015
01016 if (!res)
01017 {
01018 format_logging(0, __FILE__, __LINE__, en_log_info, "process block synchronously, handle %d", handle);
01019
01020
01021 _outgoing_list.push_back(block);
01022
01023
01024 if (!_capacity || !_thread_pool.borrow_from_range(aiofile_working_ident, aiofile_working_ident + _capacity, 0, this, aiofile_working_thread_alert))
01025 _thread_pool.borrow_thread(aiofile_working_ident, 0, this, aiofile_working_thread_alert);
01026
01027 return 0;
01028 }
01029 else
01030 {
01031 format_logging(0, __FILE__, __LINE__, en_log_info, "process block type %d failed, handle %d", block->_type, handle);
01032
01033 return res;
01034 }
01035 }
01036 else
01037 {
01038 format_logging(0, __FILE__, __LINE__, en_log_info, "process block type %d succeeded, handle %d", block->_type, handle);
01039 block->_err = 0;
01040 block->_processed = 0;
01041 return 0;
01042 }
01043 }
01044
01045
01046 int
01047 aiofile::_activate_block(size_t ident, aiofile_block* block)
01048 {
01049
01050 aiofile_file_map_iterator_t iter_file = _file_map.find(ident);
01051 if (iter_file == _file_map.end())
01052 {
01053 format_logging(0, __FILE__, __LINE__, en_log_error, "file %d not found", ident);
01054 _put_block(block);
01055 return -1;
01056 }
01057
01058
01059 aio_file_handle handle = iter_file->_handle;
01060
01061 terimber_aiofile_callback* client = iter_file->_client_obj;
01062
01063
01064 block->_file_ident = ident;
01065
01066
01067 _initial_list.push_back(block);
01068
01069
01070 _in_thread.wakeup();
01071
01072 return 0;
01073 }
01074
01075
01076 int
01077 aiofile::_process_write(aio_file_handle handle, aiofile_block* block)
01078 {
01079
01080
01081 #if OS_TYPE == OS_WIN32
01082 return ::WriteFile(handle, (LPVOID)block->_buf, (DWORD)block->_len, (LPDWORD)&block->_processed, block) ?
01083 ERROR_IO_PENDING :
01084 (block->_err = ::GetLastError());
01085 #else
01086 return TERIMBER::WriteFile(handle, block->_buf, block->_len, block) ?
01087 EWOULDBLOCK :
01088 (block->_err = errno);
01089 #endif
01090 }
01091
01092 int
01093 aiofile::_process_read(aio_file_handle handle, aiofile_block* block)
01094 {
01095
01096
01097 #if OS_TYPE == OS_WIN32
01098 return ::ReadFile(handle, (LPVOID)block->_buf, (DWORD)block->_len, (LPDWORD)&block->_processed, block) ?
01099 ERROR_IO_PENDING :
01100 (block->_err = ::GetLastError());
01101 #else
01102 return TERIMBER::ReadFile(handle, block->_buf, block->_len, block) ?
01103 EWOULDBLOCK :
01104 (block->_err = errno);
01105 #endif
01106 }
01107
01108 void
01109 aiofile::process_timeouted_blocks()
01110 {
01111
01112 date now;
01113 sb8_t unow = (sb8_t)now;
01114
01115
01116 mutex_keeper guard(_mtx);
01117
01118 for (aiofile_file_map_iterator_t iter_file = _file_map.begin(); iter_file != _file_map.end(); ++iter_file)
01119 {
01120 size_t socket_key = iter_file.key();
01121 for (aiofile_pblock_alloc_list_t::iterator iter_block = iter_file->_incoming_list.begin();
01122 iter_block != iter_file->_incoming_list.end();)
01123 {
01124
01125 aiofile_block* block = *iter_block;
01126
01127 if (block->_expired == 0
01128 || unow > block->_expired)
01129 {
01130 ++iter_block;
01131 continue;
01132 }
01133
01134
01135 aio_file_handle handle = iter_file->_handle;
01136 terimber_aiofile_callback* client_obj = iter_file->_client_obj;
01137
01138
01139 iter_block = iter_file->_incoming_list.erase(_incoming_list_allocator, iter_block);
01140
01141
01142 ++iter_file->_callback_invoking;
01143
01144 guard.unlock();
01145
01146 format_logging(0, __FILE__, __LINE__, en_log_error, "timeouted %s action for file %d",
01147 (block->_type == AIOFILE_READ ? "read" : "write"),
01148 socket_key);
01149
01150
01151 try
01152 {
01153 client_obj->v_on_error(block->_file_ident,
01154 #if OS_TYPE == OS_WIN32
01155 WSAETIMEDOUT
01156 #else
01157 ETIMEDOUT
01158 #endif
01159 , block->_type
01160 , block->_userdata);
01161 }
01162 catch (...)
01163 {
01164 assert(false);
01165 }
01166
01167
01168 guard.lock();
01169
01170
01171 aiofile_file_map_t::iterator iter_find = _file_map.find(block->_file_ident);
01172
01173 if (iter_find == _file_map.end())
01174 {
01175
01176 aiofile_delay_key_t::iterator iter_delay = _delay_key_map.find(block->_file_ident);
01177
01178 if (iter_delay != _delay_key_map.end())
01179 {
01180 assert(*iter_delay > 0);
01181
01182 if (--*iter_delay <= 0)
01183 {
01184 _delay_key_map.erase(iter_delay);
01185 _file_generator.save(block->_file_ident);
01186 }
01187 }
01188
01189 }
01190 else
01191 {
01192
01193 assert(iter_find->_callback_invoking > 0);
01194 --iter_find->_callback_invoking;
01195 }
01196
01197 #if OS_TYPE == OS_WIN32
01198
01199 _abounded_list.push_back(block);
01200 #else
01201 _cancel_aio(handle, block);
01202 _put_block(block);
01203 #endif
01204
01205 return;
01206 }
01207 }
01208 }
01209
01210 #pragma pack()
01211 END_TERIMBER_NAMESPACE