00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 #include "osdetect.h"
00029 
00031 #if OS_TYPE == OS_WIN32
00032 #if defined(_MSC_VER) && (_MSC_VER >= 1200) 
00033 #include <winsock2.h>
00034 #endif
00035 #endif
00036 
00038 #include "base/list.hpp"
00039 #include "aiofile/aiofile.h"
00040 #include "base/map.hpp"
00041 #include "base/stack.hpp"
00042 #include "base/memory.hpp"
00043 #include "base/common.hpp"
00044 
00045 
00047 terimber_aiofile_factory::terimber_aiofile_factory()
00048 {
00049 }
00050 
00052 terimber_aiofile_factory::~terimber_aiofile_factory()
00053 {
00054 }
00055 
00057 terimber_aiofile*
00058 terimber_aiofile_factory::get_aiofile(terimber_log* log, size_t capacity, size_t deactivate_time_msec)
00059 {
00060         
00061         terimber::aiofile* obj = new terimber::aiofile(capacity, deactivate_time_msec);
00062         if (obj)
00063         {
00064                 
00065                 obj->log_on(log);
00066                 
00067                 obj->on();
00068         }
00069 
00070         return obj;
00071 }
00072 
00073 
00074 BEGIN_TERIMBER_NAMESPACE
00075 #pragma pack(4)
00076 
00077 
00079 const size_t aiofile_io_initiation_ident = 1;
00081 const size_t aiofile_io_initiation_thread_alert = 1000; 
00083 const size_t aiofile_completion_io_port_ident = 2;
00085 const size_t aiofile_completion_io_port_thread_alert = INFINITE;
00087 const size_t aiofile_working_ident = 3;
00089 const size_t aiofile_working_thread_alert = 60000; 
00090  
00091 
00093 aiofile_block::aiofile_block()
00094 { 
00095         clear();
00096 }
00097 
00098 aiofile_block::aiofile_block(const aiofile_block& x)
00099 {
00100         *this = x;
00101 }
00102 
00103 aiofile_block& 
00104 aiofile_block::operator=(const aiofile_block& x)
00105 {
00106         if (this != &x)
00107         {
00108                 memcpy(this, &x, sizeof(aiofile_block));
00109         }
00110 
00111         return *this;
00112 }
00113 
00114 void 
00115 aiofile_block::clear()
00116 {
00117         memset(this, 0, sizeof(aiofile_block));
00118 }
00119 
00120 void 
00121 aiofile_block::set_timeout(size_t timeout)
00122 {
00123         if (timeout == INFINITE)
00124         {
00125 #if OS_TYPE == OS_WIN32 
00126                 _timeout = 0;
00127 #else
00128                 _timeout.tv_sec = _timeout.tv_usec = 0;
00129 #endif
00130                 _expired = 0;
00131         }
00132         else
00133         {
00134 #if OS_TYPE == OS_WIN32 
00135                 _timeout = timeout;
00136 #else
00137                 _timeout.tv_sec = timeout / 1000;
00138                 _timeout.tv_usec = (timeout % 1000) * 1000 + 1;
00139 #endif
00140                 date now;
00141                 _expired = (sb8_t)now + timeout;
00142         }
00143 }
00144 
00145 aiofile::aiofile(size_t capacity, size_t deactivate_time_msec) : 
00146 _file_map(less< size_t >(), 64)
00147 ,_reverse_map(less< aio_file_handle >(), 64)
00148 ,_file_generator(64)
00149 ,_outgoing_list(64)
00150 ,_aiofile_io_handle(0)
00151 ,_thread_pool(capacity + 3, deactivate_time_msec) 
00152 ,_capacity(capacity)
00153 ,_on(false)
00154 ,_flag_io_port(false)
00155 {
00156 }
00157 
00158 aiofile::~aiofile()
00159 {
00160         
00161         off();
00162 }
00163 
00164 bool
00165 aiofile::on()
00166 {
00167         if (_on)
00168         {
00169                 format_logging(0, __FILE__, __LINE__, en_log_error, "aiofile already started");
00170                 return false;
00171         }
00172 
00173         format_logging(0, __FILE__, __LINE__, en_log_info, "starting aiofile...");
00174 
00175 #if OS_TYPE != OS_WIN32
00176         TERIMBER::SetLog(this);
00177 #endif
00178         _thread_pool.log_on(this);
00179 
00180         if (!_thread_pool.on())
00181         {
00182                 format_logging(0, __FILE__, __LINE__, en_log_error, "can not start thread pool");
00183                 return false;
00184         }
00185 
00186                 format_logging(0, __FILE__, __LINE__, en_log_info, "init completion port");
00187 #if OS_TYPE == OS_WIN32
00188                 _aiofile_io_handle = ::CreateIoCompletionPort((HANDLE)INVALID_SOCKET, 0, 0, 0);
00189 #else
00190                 _aiofile_io_handle = TERIMBER::CreateIoCompletionPort((HANDLE)INVALID_SOCKET, 0, 0, TYPE_UNKNOWN);
00191 #endif
00192 
00193 
00194         if (!_aiofile_io_handle)
00195         {
00196 #if OS_TYPE == OS_WIN32
00197                 ::CloseHandle((HANDLE)_aiofile_io_handle);
00198 #else
00199                 TERIMBER::CloseHandle(_aiofile_io_handle);
00200 #endif
00201                 format_logging(0, __FILE__, __LINE__, en_log_info, "can not initiate completion port");
00202                 return false;
00203         }
00204 
00205         _flag_io_port = true;
00206         _thread_pool.borrow_thread(aiofile_completion_io_port_ident, 0, this, aiofile_completion_io_port_thread_alert);
00207         _start_io_port.wait();
00208 
00209         
00210         _thread_pool.borrow_thread(aiofile_working_ident, 0, this, aiofile_working_thread_alert);
00211 
00212         
00213         _in_thread.start();
00214         job_task task(this, aiofile_io_initiation_ident, aiofile_io_initiation_thread_alert, 0);
00215         _in_thread.assign_job(task);
00216 
00217         _on = true;
00218 
00219         format_logging(0, __FILE__, __LINE__, en_log_info, "aio file port is initialized");
00220 
00221         return _aiofile_io_handle != 0;
00222 }
00223 
00224 
00225 
00226 void
00227 aiofile::off()
00228 {
00229         if (!_on)
00230         {
00231                 format_logging(0, __FILE__, __LINE__, en_log_error, "aiofile already stopped");
00232                 return;
00233         }
00234 
00235         format_logging(0, __FILE__, __LINE__, en_log_info, "stoping aiofile...");
00236         _in_thread.cancel_job();
00237         _in_thread.stop();
00238 
00239 
00240         format_logging(0, __FILE__, __LINE__, en_log_info, "send stop message to completion port");
00241 #if OS_TYPE == OS_WIN32
00242         ::PostQueuedCompletionStatus((HANDLE)_aiofile_io_handle, 0, 0, 0);
00243 #else
00244         TERIMBER::PostQueuedCompletionStatus(_aiofile_io_handle, 0, 0, 0);
00245 #endif
00246 
00247         _stop_io_port.wait();
00248         format_logging(0, __FILE__, __LINE__, en_log_info, "completion port stopped");
00249 
00250 
00251 #if OS_TYPE == OS_WIN32
00252         
00253         ::CloseHandle((HANDLE)_aiofile_io_handle);
00254 #else
00255         TERIMBER::CloseHandle(_aiofile_io_handle);
00256 #endif
00257 
00258 
00259         _aiofile_io_handle = 0;
00260 
00261         format_logging(0, __FILE__, __LINE__, en_log_info, "Stoping thread pool");
00262         _thread_pool.revoke_client(this);
00263         _thread_pool.off();
00264         _thread_pool.log_on(0);
00265 
00266         format_logging(0, __FILE__, __LINE__, en_log_info, "Close all files");
00267 
00268         mutex_keeper guard(_mtx);
00269         for (aiofile_file_map_iterator_t iter = _file_map.begin(); iter != _file_map.end(); ++iter)
00270         {
00271                 iter->_incoming_list.erase(_incoming_list_allocator, iter->_incoming_list.begin(), iter->_incoming_list.end());
00272                 _cancel_file(iter->_handle);
00273                 _close_file(iter->_handle);
00274         }
00275 
00276 
00277         format_logging(0, __FILE__, __LINE__, en_log_info, "cleans up resources");
00278 
00279         
00280         _file_generator.clear();
00281         _file_map.clear();
00282         _reverse_map.clear();
00283         _delay_key_map.clear();
00284 
00285         
00286         _clear_block_lists();
00287 
00288         
00289         _incoming_list_allocator.clear_extra();
00290         _block_allocator.clear_extra();
00291 
00292         
00293 #if OS_TYPE != OS_WIN32
00294         TERIMBER::SetLog(0);
00295 #endif
00296 
00297         
00298         _on = false;
00299 
00300         format_logging(0, __FILE__, __LINE__, en_log_info, "aio file port is uninitialized");
00301 }
00302 
00303 
00304 bool 
00305 aiofile::v_has_job(size_t ident, void* data)
00306 {
00307         
00308         if (_aiofile_io_handle == 0)
00309                 return false;
00310 
00311         switch (ident)
00312         {
00313                 case aiofile_completion_io_port_ident: 
00314                         return _flag_io_port;
00315                 case aiofile_io_initiation_ident:
00316                         {
00317                                 
00318                                 mutex_keeper guard(_mtx);
00319 
00320                                 
00321                                 if (!_initial_list.empty())
00322                                         return true;
00323 
00324                                 
00325                                 date now;
00326                                 sb8_t unow = (sb8_t)now;
00327                                 
00328                                 for (aiofile_file_map_iterator_t iter_file = _file_map.begin(); iter_file != _file_map.end(); ++iter_file)
00329                                 {
00330                                         for (aiofile_pblock_alloc_list_t::iterator iter_block = iter_file->_incoming_list.begin(); 
00331                                                                                                                                         iter_block != iter_file->_incoming_list.end(); ++iter_block)
00332                                         {
00333                                                 
00334                                                 aiofile_block* block = *iter_block;
00335 
00336                                                 if (block->_expired != 0 
00337                                                         && unow >= block->_expired) 
00338                                                 {
00339                                                         return true;
00340                                                 }
00341                                         }
00342                                 }
00343 
00344                                 
00345                                 return false;
00346                         }
00347                 case aiofile_working_ident:
00348                 default:
00349                         {
00350                                 
00351                                 mutex_keeper guard(_mtx);
00352                                 
00353                                 return !_outgoing_list.empty();
00354                         }
00355         }
00356 
00357         return false;
00358 }
00359 
00360 void 
00361 aiofile::wait_for_io_completion()
00362 {
00363         
00364         _start_io_port.signal();
00365 
00366         while (true) 
00367         {
00368                 aiofile_block* ov = 0;
00369                 size_t file_key = os_minus_one;
00370                 size_t num_bytes = 0;
00371 #if OS_TYPE == OS_WIN32
00372                 bool bRes = (TRUE == ::GetQueuedCompletionStatus((HANDLE)_aiofile_io_handle, 
00373                                                                                                                 (DWORD*)&num_bytes, 
00374 #if defined(_MSC_VER) && (_MSC_VER > 1200) 
00375                                                                                                                 (ULONG_PTR*)
00376 #else
00377                                                                                                                 (DWORD*)
00378 #endif
00379                                                                                                                 &file_key, 
00380                                                                                                                 (LPOVERLAPPED*)&ov, 
00381                                                                                                                 INFINITE));
00382 
00383                 int cRes = bRes ? 0 : ::GetLastError(); 
00384 #else
00385                 int cRes = TERIMBER::GetQueuedCompletionStatus(_aiofile_io_handle, 
00386                                                                                                                 &num_bytes, 
00387                                                                                                                 &file_key, 
00388                                                                                                                 (LPOVERLAPPED*)&ov, 
00389                                                                                                                 INFINITE);
00390 
00391 #endif
00392                 if (!file_key)
00393                 {
00394                         
00395                         _flag_io_port = false;
00396                         
00397                         _stop_io_port.signal();
00398                         
00399                         break;
00400                 }
00401 
00402                 
00403                 complete_block(file_key, ov, cRes, num_bytes);
00404         } 
00405 }
00406 
00407 void 
00408 aiofile::complete_block(size_t file_key, aiofile_block* ov, int err, size_t processed)
00409 {
00410         
00411         mutex_keeper guard(_mtx);
00412         
00413         aiofile_file_map_iterator_t iter_file = _file_map.find(file_key);
00414 
00415         if (iter_file == _file_map.end())
00416         {
00417                 format_logging(0, __FILE__, __LINE__, en_log_error, "file key %d not found", file_key);
00418                 return;
00419         }
00420 
00421         
00422         for (aiofile_pblock_alloc_list_t::iterator iter_block = iter_file->_incoming_list.begin(); iter_block != iter_file->_incoming_list.end(); ++iter_block)
00423         {
00424                 
00425                 if (*iter_block != ov)
00426                         continue;
00427 
00428                 
00429                 aiofile_block* block = *iter_block;
00430                 
00431                 iter_file->_incoming_list.erase(_incoming_list_allocator, iter_block);
00432                 
00433                 block->_err = err;
00434                 
00435                 block->_processed = processed;
00436                 
00437                 _outgoing_list.push_back(block);
00438                 
00439                 guard.unlock();
00440                 
00441                 if (!_capacity || !_thread_pool.borrow_from_range(aiofile_working_ident, aiofile_working_ident + _capacity, 0, this, aiofile_working_thread_alert))
00442                         _thread_pool.borrow_thread(aiofile_working_ident, 0, this, aiofile_working_thread_alert);
00443 
00444                 return;
00445         } 
00446 
00447         format_logging(0, __FILE__, __LINE__, en_log_info, "completed block not found for file %d, looking in abounded list", file_key); 
00448 
00449 #if OS_TYPE == OS_WIN32
00450 
00451         
00452         
00453         for (aiofile_pblock_alloc_list_t::iterator iter_abounded = _abounded_list.begin(); iter_abounded != _abounded_list.end(); ++iter_abounded)
00454         {
00455                 if (*iter_abounded != ov)
00456                         continue;
00457 
00458                 
00459                 aiofile_block* block = *iter_abounded;
00460                 
00461                 _abounded_list.erase(iter_abounded);
00462                 
00463                 _put_block(block);
00464                 return;
00465         } 
00466 
00467         format_logging(0, __FILE__, __LINE__, en_log_info, "completed block not found for file %d anywere", file_key); 
00468 #endif
00469 
00470 }
00471 
00472 
00473 void 
00474 aiofile::v_do_job(size_t ident, void* data)
00475 {
00476         switch (ident)
00477         {
00478                 case aiofile_completion_io_port_ident: 
00479                         wait_for_io_completion();
00480                         break;
00481                 case aiofile_io_initiation_ident:
00482                         {
00483                                 
00484                                 mutex_keeper guard(_mtx);
00485 
00486                                 if (_initial_list.empty())
00487                                         return;
00488 
00489                                 
00490                                 aiofile_block* block = _initial_list.front();
00491                                 
00492                                 _initial_list.pop_front();
00493 
00494                                 
00495                                 if (int err = _process_block(block))
00496                                 {
00497                                         
00498                                         block->_err = err;
00499                                         
00500                                         _outgoing_list.push_back(block);
00501                                         
00502                                         guard.unlock();
00503                                         
00504                                         if (!_capacity || !_thread_pool.borrow_from_range(aiofile_working_ident, aiofile_working_ident + _capacity, 0, this, aiofile_working_thread_alert))
00505                                                 _thread_pool.borrow_thread(aiofile_working_ident, 0, this, aiofile_working_thread_alert);
00506                                 }
00507                         }
00508                         break;
00509                 case aiofile_working_ident:
00510                 default:
00511                         {
00512                                 aiofile_block* block = 0;
00513                                 aio_file_handle handle = 0;
00514                                 aio_file_handle accept_handle = 0;
00515                                 terimber_aiofile_callback* client_obj = 0;
00516         
00517                                 
00518                                 mutex_keeper guard(_mtx);
00519 
00520                                 if (_outgoing_list.empty())
00521                                         return; 
00522 
00523                                 
00524                                 block = _outgoing_list.front();
00525                                 
00526                                 _outgoing_list.pop_front();
00527 
00528                                 
00529                                 aiofile_file_map_t::iterator iter_file = _file_map.find(block->_file_ident);
00530                                 
00531                                 if (iter_file == _file_map.end())
00532                                 {
00533                                         format_logging(0, __FILE__, __LINE__, en_log_error, "file key %d not found", block->_file_ident);
00534                                         
00535                                         _put_block(block);
00536                                         return;
00537                                 }
00538                                 else
00539                                 {
00540                                         
00541                                         client_obj = iter_file->_client_obj;
00542                                         
00543                                         handle = iter_file->_handle;
00544                                         
00545                                         ++iter_file->_callback_invoking;
00546                                 }
00547 
00548                                 
00549                                 if (block->_err)
00550                                 {
00551                                         
00552                                         
00553                                         guard.unlock();
00554 
00555                                         try
00556                                         {
00557                                                 
00558                                                 client_obj->v_on_error(block->_file_ident, block->_err, block->_type, block->_userdata);
00559                                         }
00560                                         catch (...)
00561                                         {
00562                                                 format_logging(0, __FILE__, __LINE__, en_log_error, "v_on_error exception for file %d", block->_file_ident);
00563                                                 assert(false);
00564                                         }
00565                                 }
00566                                 else
00567                                 {
00568                                         
00569                                         
00570                                         switch (block->_type)
00571                                         {
00572                                                 case AIOFILE_WRITE:
00573                                                         
00574                                                         guard.unlock();
00575 
00576                                                         try
00577                                                         {
00578                                                                 
00579                                                                 client_obj->v_on_write(block->_file_ident, (void*)block->_buf, block->_len, block->_processed, block->_userdata);
00580                                                         }
00581                                                         catch (...)
00582                                                         {
00583                                                                 format_logging(0, __FILE__, __LINE__, en_log_error, "v_on_write exception for file %d", block->_file_ident);
00584                                                                 assert(false);
00585                                                         }
00586                                                         break;
00587                                                 case AIOFILE_READ:
00588                                                         
00589                                                         guard.unlock();
00590 
00591                                                         try
00592                                                         {
00593                                                                 
00594                                                                 client_obj->v_on_read(block->_file_ident, (void*)block->_buf, block->_len, block->_processed, block->_userdata);
00595                                                         }
00596                                                         catch (...)
00597                                                         {
00598                                                                 format_logging(0, __FILE__, __LINE__, en_log_error, "v_on_read exception for file %d", block->_file_ident);
00599                                                                 assert(false);
00600                                                         }
00601                                                         break;
00602                                                 default:
00603                                                         assert(false);
00604 
00605                                         } 
00606                                 } 
00607 
00608                                 
00609                                 guard.lock();
00610                                 
00611                                 
00612                                 
00613                                 iter_file = _file_map.find(block->_file_ident);
00614                                 
00615                                 
00616                                 if (iter_file == _file_map.end())
00617                                 {
00618                                         
00619                                         aiofile_delay_key_t::iterator iter_delay = _delay_key_map.find(block->_file_ident);
00620 
00621                                         if (iter_delay != _delay_key_map.end())
00622                                         {
00623                                                 if (--*iter_delay <= 0) 
00624                                                 {
00625                                                         
00626                                                         _delay_key_map.erase(iter_delay);
00627                                                         
00628                                                         _file_generator.save(block->_file_ident);
00629                                                 }
00630                                         }
00631                                 }
00632                                 else
00633                                 {
00634                                         
00635                                         --iter_file->_callback_invoking;
00636                                 }
00637 
00638                                 
00639                                 _put_block(block);
00640                         } 
00641                         break;
00642         } 
00643 }
00644 
00645 
00646 size_t 
00647 aiofile::open(const char* file_name, bool read_write, terimber_aiofile_callback* callback)
00648 {
00649         if (!_aiofile_io_handle)
00650         {
00651                 format_logging(0, __FILE__, __LINE__, en_log_error, "aio file port is not initialized");
00652                 return 0;
00653         }
00654 
00655 
00656 #if OS_TYPE == OS_WIN32
00657         DWORD desiredAccess = (read_write ? GENERIC_READ : GENERIC_WRITE);
00658         DWORD sharedMode = (read_write ? FILE_SHARE_READ : FILE_SHARE_WRITE);
00659         DWORD creationDisposition = (read_write ? OPEN_EXISTING : CREATE_ALWAYS);
00660 
00661         HANDLE handle = ::CreateFile(file_name, desiredAccess, sharedMode, 0, creationDisposition,  FILE_FLAG_SEQUENTIAL_SCAN | FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0);
00662         if (handle == INVALID_HANDLE_VALUE)
00663         {
00664                 format_logging(0, __FILE__, __LINE__, en_log_error, "can not open file %s, access mode %s", file_name, read_write ? "read" : "write");
00665                 return 0;
00666         }
00667         ::SetFilePointer(handle, 0, 0, FILE_BEGIN);
00668 #else
00669         int oflag = (read_write ? (O_RDONLY|O_NONBLOCK) : (O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK));
00670         int oshare = S_IRWXU | S_IRWXG | S_IRWXO;
00671         HANDLE handle = ::open(file_name, oflag, oshare);
00672         if (handle == -1)
00673         {
00674                 format_logging(0, __FILE__, __LINE__, en_log_error, "can not open file %s, access mode %s", file_name, read_write ? "read" : "write");
00675                 return 0;
00676         }
00677 #endif
00678 
00679         mutex_keeper guard(_mtx);
00680         return _assign_file(handle, callback);
00681 }
00682 
00683 
00684 void 
00685 aiofile::close(size_t handle)
00686 {
00687         if (!_aiofile_io_handle)
00688         {
00689                 format_logging(0, __FILE__, __LINE__, en_log_error, "aio file port is not initialized");
00690                 return;
00691         }
00692 
00693         mutex_keeper guard(_mtx);
00694         aiofile_file_map_iterator_t iter = _file_map.find(handle);
00695 
00696         if (iter == _file_map.end())
00697         {
00698                 format_logging(0, __FILE__, __LINE__, en_log_error, "file %d not found", handle);
00699                 return;
00700         }
00701 
00702         
00703         HANDLE hfile = iter->_handle;
00704         bool delay_key = false;
00705 
00706         
00707         iter->_incoming_list.erase(_incoming_list_allocator, iter->_incoming_list.begin(), iter->_incoming_list.end());
00708         
00709         if (iter->_callback_invoking)
00710         {
00711                 delay_key = true;
00712                 aiofile_delay_key_t::iterator iter_delay = _delay_key_map.find(handle);
00713                 if (iter_delay != _delay_key_map.end())
00714                         *iter_delay += (int)iter->_callback_invoking;
00715                 else
00716                         _delay_key_map.insert(handle, (int)iter->_callback_invoking);
00717         }
00718 
00719         _reverse_map.erase(iter->_handle);
00720         _file_map.erase(iter);
00721 
00722         for (aiofile_pblock_list_t::iterator in_iter = _initial_list.begin(); in_iter != _initial_list.end();)
00723         {
00724                 aiofile_block* block = *in_iter;
00725                 if (block->_file_ident == handle)
00726                 {
00727                         _put_block(block);
00728                         in_iter = _initial_list.erase(in_iter);
00729                 }
00730                 else
00731                         ++in_iter;
00732         }
00733 
00734         for (aiofile_pblock_list_t::iterator out_iter = _outgoing_list.begin(); out_iter != _outgoing_list.end();)
00735         {
00736                 aiofile_block* block = *out_iter;
00737                 if (block->_file_ident == handle)
00738                 {
00739                         _put_block(block);
00740                         out_iter = _outgoing_list.erase(out_iter);
00741                 }
00742                 else
00743                         ++out_iter;
00744         }
00745 
00746         if (hfile)
00747         {
00748                 _cancel_file(hfile);
00749                 _close_file(hfile);
00750         }
00751 
00752 
00753 #if OS_TYPE == OS_WIN32
00754         
00755         for (aiofile_pblock_alloc_list_t::iterator iter_list = iter->_incoming_list.begin(); iter_list != iter->_incoming_list.end();)
00756         {                       
00757                 _abounded_list.push_back(*iter_list);
00758                 iter_list = iter->_incoming_list.erase(_incoming_list_allocator, iter_list);
00759         }
00760 #else
00761         
00762         for (aiofile_pblock_alloc_list_t::iterator iter_list = iter->_incoming_list.begin(); iter_list != iter->_incoming_list.end();)
00763         {                       
00764                 aiofile_block* block = *iter_list;
00765                 _put_block(block);
00766                 iter_list = iter->_incoming_list.erase(_incoming_list_allocator, iter_list);
00767         }
00768 
00769 #endif
00770 
00771         if (!delay_key)
00772                 
00773                 _file_generator.save(handle);
00774 
00775         format_logging(0, __FILE__, __LINE__, en_log_info, "file handle %u is closed", handle);
00776 }
00777 
00778 
00779 
00780 int
00781 aiofile::write(size_t handle, size_t offset, const void* buf, size_t len, size_t timeout, void* userdata)
00782 {
00783         mutex_keeper guard(_mtx);
00784         
00785         aiofile_block* block = _get_block();
00786         block->set_timeout(timeout);
00787         
00788 #if OS_TYPE == OS_WIN32
00789         block->Offset = (DWORD)offset;
00790         block->OffsetHigh = 0;
00791 #else
00792         block->offset = offset;
00793 #endif
00794 
00795         block->_type = AIOFILE_WRITE;
00796         block->_userdata = userdata;
00797 
00798         block->_buf = (char*)buf;
00799         block->_len = len;
00800 
00801         return _activate_block(handle, block);
00802 }
00803 
00804 
00805 
00806 
00807 int 
00808 aiofile::read(size_t handle, size_t offset, void* buf, size_t len, size_t timeout, void* userdata)
00809 {
00810         mutex_keeper guard(_mtx);
00811         
00812         aiofile_block* block = _get_block();
00813         block->set_timeout(timeout);
00814 
00815 #if OS_TYPE == OS_WIN32
00816         block->Offset = (DWORD)offset;
00817         block->OffsetHigh = 0;
00818 #else
00819         block->offset = offset;
00820 #endif
00821 
00822         block->_type = AIOFILE_READ;
00823         block->_userdata = userdata;
00824 
00825         block->_buf = (char*)buf;
00826         block->_len = len;
00827 
00828         return _activate_block(handle, block);
00829 }
00830 
00831 
00832 
00833 void
00834 aiofile::doxray()
00835 {
00836         mutex_keeper guard(_mtx);
00837 
00838         size_t files = _file_map.size(), 
00839                 delay_actions = _delay_key_map.size(),   
00840                 initiated_actions = _initial_list.size(),
00841                 completed_actions = _outgoing_list.size(),
00842                 abounded_actions = 
00843 #if OS_TYPE == OS_WIN32 
00844                 _abounded_list.size();
00845 #else
00846                 0;
00847 #endif
00848 
00849         guard.unlock();
00850 
00851         format_logging(0, __FILE__, __LINE__, en_log_xray, "<aiofile files=\"%d\" delayed=\"%d\" initiated=\"%d\" completed=\"%d\" abounded=\"%d\" />",
00852                 files, delay_actions, initiated_actions, completed_actions, abounded_actions);
00853 
00854 #if OS_TYPE != OS_WIN32
00855         TERIMBER::DoXRay();
00856 #endif
00857         _thread_pool.doxray();
00858 }
00859 
00860 size_t 
00861 aiofile::_assign_file(aio_file_handle handle, terimber_aiofile_callback* callback)
00862 {
00863         
00864         aiofile_file new_file(handle, callback);
00865 
00866         
00867         size_t ident = _file_generator.generate();
00868 
00869         
00870         aiofile_file_map_iterator_t iter_file = _file_map.insert(ident, new_file).first;
00871         if (iter_file == _file_map.end())
00872         {
00873                 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
00874                 _close_file(new_file._handle);
00875                 _file_generator.save(ident);
00876                 return 0;
00877         }
00878 
00879         
00880         aiofile_reverse_map_iterator_t iter_reverse = _reverse_map.insert(handle, iter_file).first;
00881         if (iter_reverse == _reverse_map.end())
00882         {
00883                 format_logging(0, __FILE__, __LINE__, en_log_error, "not enough memory");
00884                 _close_file(new_file._handle);
00885                 _file_map.erase(iter_file);
00886                 _file_generator.save(ident);
00887                 return 0;
00888         }
00889 
00890 #if OS_TYPE == OS_WIN32
00891         
00892         if (!::CreateIoCompletionPort((HANDLE)new_file._handle, 
00893                                                                 (HANDLE)_aiofile_io_handle, 
00894                                                                 (DWORD)ident, 
00895                                                                 3))
00896 #else
00897         
00898         if (!TERIMBER::CreateIoCompletionPort(new_file._handle, 
00899                                                                 _aiofile_io_handle, 
00900                                                                 ident, 
00901                                                                 TYPE_FILE))
00902 
00903 #endif
00904         {
00905                 format_logging(0, __FILE__, __LINE__, en_log_error, "can not assign file handle to completion port");
00906                 _close_file(new_file._handle);
00907                 _reverse_map.erase(iter_reverse);
00908                 _file_map.erase(iter_file);
00909                 _file_generator.save(ident);
00910                 return 0;
00911         }
00912 
00913         format_logging(0, __FILE__, __LINE__, en_log_info, "assign file handle %u, ident %u is open", new_file._handle, ident);
00914 
00915         return ident;
00916 }
00917 
00918 void 
00919 aiofile::_close_file(aio_file_handle handle)
00920 {
00921         format_logging(0, __FILE__, __LINE__, en_log_info, "file handle %d closed", handle);
00922 
00923 #if OS_TYPE == OS_WIN32
00924         ::CloseHandle(handle);
00925 #else
00926 	::close(handle);
00927 #endif
00928 }
00929 
00930 void 
00931 aiofile::_cancel_file(aio_file_handle handle)
00932 {
00933         format_logging(0, __FILE__, __LINE__, en_log_info, "actions for file handle %d aborted", handle);
00934 
00935         
00936 #if OS_TYPE == OS_WIN32
00937         ::CancelIo(handle);
00938 #else
00939         TERIMBER::CancelIo(handle, 0);
00940 #endif
00941 }
00942 
00943 #if OS_TYPE != OS_WIN32
00944 
00946 void 
00947 aiofile::_cancel_aio(           aio_file_handle handle,                         
00948                                         OVERLAPPED* overlapped                          
00949                                         )
00950 {
00951         TERIMBER::CancelIo(handle, overlapped);
00952 }
00953 
00954 #endif
00955 
00956 int 
00957 aiofile::_process_block(aiofile_block* block)
00958 {
00959         if (!_aiofile_io_handle)
00960         {
00961                 format_logging(0, __FILE__, __LINE__, en_log_error, "aiofile is not initialized");
00962                 return -1;
00963         }
00964 
00965         aiofile_file_map_iterator_t iter_file = _file_map.find(block->_file_ident);
00966         if (iter_file == _file_map.end())
00967         {
00968                 format_logging(0, __FILE__, __LINE__, en_log_error, "file %d not found", block->_file_ident);
00969                 return -1;
00970         }
00971 
00972         aio_file_handle handle = iter_file->_handle;
00973         terimber_aiofile_callback* client = iter_file->_client_obj;
00974 
00975         
00976         iter_file->_incoming_list.push_back(_incoming_list_allocator, block);
00977 
00978         
00979         
00980         int res = 0;
00981         switch (block->_type)
00982         {
00983                 case AIOFILE_WRITE:
00984                         res = _process_write(handle, block);
00985                         break;
00986                 case AIOFILE_READ:
00987                         res = _process_read(handle, block);
00988                         break;
00989                 default:
00990                         break;
00991         } 
00992 
00993         
00994         
00995         
00996         
00997 
00998         if (!res
00999                 || res 
01000 #if OS_TYPE == OS_WIN32
01001                 && (
01002                         res != ERROR_IO_PENDING
01003                 )
01004 #else   
01005                 && res != EWOULDBLOCK
01006 #endif
01007         )
01008         {
01009                 
01010                 iter_file->_incoming_list.pop_back(_incoming_list_allocator);
01011 
01012                 
01013                 block->_err = res;
01014 
01015                 
01016                 if (!res)
01017                 {
01018                         format_logging(0, __FILE__, __LINE__, en_log_info, "process block synchronously, handle %d", handle); 
01019 
01020                         
01021                         _outgoing_list.push_back(block);
01022 
01023                         
01024                         if (!_capacity || !_thread_pool.borrow_from_range(aiofile_working_ident, aiofile_working_ident + _capacity, 0, this, aiofile_working_thread_alert))
01025                                 _thread_pool.borrow_thread(aiofile_working_ident, 0, this, aiofile_working_thread_alert);
01026 
01027                         return 0;
01028                 }
01029                 else 
01030                 {
01031                         format_logging(0, __FILE__, __LINE__, en_log_info, "process block type %d failed, handle %d", block->_type, handle); 
01032                         
01033                         return res;
01034                 }
01035         }
01036         else 
01037         {
01038                 format_logging(0, __FILE__, __LINE__, en_log_info, "process block type %d succeeded, handle %d", block->_type, handle); 
01039                 block->_err = 0;
01040                 block->_processed = 0;
01041                 return 0;
01042         }
01043 }
01044 
01045 
01046 int 
01047 aiofile::_activate_block(size_t ident, aiofile_block* block)
01048 {
01049         
01050         aiofile_file_map_iterator_t iter_file = _file_map.find(ident);
01051         if (iter_file == _file_map.end())
01052         {
01053                 format_logging(0, __FILE__, __LINE__, en_log_error, "file %d not found", ident);
01054                 _put_block(block);
01055                 return -1;
01056         }
01057 
01058         
01059         aio_file_handle handle = iter_file->_handle;
01060         
01061         terimber_aiofile_callback* client = iter_file->_client_obj;
01062 
01063         
01064         block->_file_ident = ident;
01065 
01066         
01067         _initial_list.push_back(block);
01068 
01069         
01070         _in_thread.wakeup();
01071 
01072         return 0;
01073 }
01074 
01075 
01076 int
01077 aiofile::_process_write(aio_file_handle handle, aiofile_block* block)
01078 {
01079         
01080         
01081 #if OS_TYPE == OS_WIN32
01082         return ::WriteFile(handle, (LPVOID)block->_buf, (DWORD)block->_len, (LPDWORD)&block->_processed, block) ?
01083                 ERROR_IO_PENDING : 
01084                 (block->_err = ::GetLastError());
01085 #else
01086         return TERIMBER::WriteFile(handle, block->_buf, block->_len, block) ?
01087                 EWOULDBLOCK : 
01088                 (block->_err = errno);
01089 #endif
01090 }
01091 
01092 int 
01093 aiofile::_process_read(aio_file_handle handle, aiofile_block* block)
01094 {
01095         
01096         
01097 #if OS_TYPE == OS_WIN32
01098         return ::ReadFile(handle, (LPVOID)block->_buf, (DWORD)block->_len, (LPDWORD)&block->_processed, block) ?
01099                 ERROR_IO_PENDING : 
01100                 (block->_err = ::GetLastError());
01101 #else
01102         return TERIMBER::ReadFile(handle, block->_buf, block->_len, block) ?
01103                 EWOULDBLOCK : 
01104                 (block->_err = errno);
01105 #endif
01106 }
01107 
01108 void 
01109 aiofile::process_timeouted_blocks()
01110 {
01111         
01112         date now;
01113         sb8_t unow = (sb8_t)now;
01114 
01115         
01116         mutex_keeper guard(_mtx);
01117         
01118         for (aiofile_file_map_iterator_t iter_file = _file_map.begin(); iter_file != _file_map.end(); ++iter_file)
01119         {
01120                 size_t socket_key = iter_file.key();
01121                 for (aiofile_pblock_alloc_list_t::iterator iter_block = iter_file->_incoming_list.begin(); 
01122                                                                                                                 iter_block != iter_file->_incoming_list.end();)
01123                 {
01124                         
01125                         aiofile_block* block = *iter_block;
01126 
01127                         if (block->_expired == 0        
01128                                 || unow > block->_expired) 
01129                         {
01130                                 ++iter_block;
01131                                 continue;
01132                         }
01133 
01134                         
01135                         aio_file_handle handle = iter_file->_handle;
01136                         terimber_aiofile_callback* client_obj = iter_file->_client_obj;
01137 
01138                         
01139                         iter_block = iter_file->_incoming_list.erase(_incoming_list_allocator, iter_block);
01140 
01141                         
01142                         ++iter_file->_callback_invoking;
01143                         
01144                         guard.unlock();
01145 
01146                         format_logging(0, __FILE__, __LINE__, en_log_error, "timeouted %s action for file %d", 
01147                                                 (block->_type == AIOFILE_READ ? "read" : "write"),
01148                                 socket_key);
01149 
01150                         
01151                         try
01152                         {
01153                                 client_obj->v_on_error(block->_file_ident, 
01154 #if OS_TYPE == OS_WIN32
01155                                         WSAETIMEDOUT
01156 #else                                                   
01157                                         ETIMEDOUT
01158 #endif
01159                                         , block->_type
01160                                         , block->_userdata);
01161                         }
01162                         catch (...)
01163                         {
01164                                 assert(false);
01165                         }
01166 
01167                         
01168                         guard.lock();
01169                         
01170                         
01171                         aiofile_file_map_t::iterator iter_find = _file_map.find(block->_file_ident);
01172                         
01173                         if (iter_find == _file_map.end())
01174                         {
01175                                 
01176                                 aiofile_delay_key_t::iterator iter_delay = _delay_key_map.find(block->_file_ident);
01177 
01178                                 if (iter_delay != _delay_key_map.end())
01179                                 {
01180                                         assert(*iter_delay > 0);
01181 
01182                                         if (--*iter_delay <= 0)
01183                                         {
01184                                                 _delay_key_map.erase(iter_delay);
01185                                                 _file_generator.save(block->_file_ident);
01186                                         }
01187                                 }
01188                                 
01189                         }
01190                         else
01191                         {
01192                                 
01193                                 assert(iter_find->_callback_invoking > 0);
01194                                 --iter_find->_callback_invoking;
01195                         }
01196 
01197 #if OS_TYPE == OS_WIN32
01198                         
01199                         _abounded_list.push_back(block);
01200 #else
01201                         _cancel_aio(handle, block);
01202                         _put_block(block);
01203 #endif
01204                         
01205                         return;
01206                 } 
01207         } 
01208 }
01209 
01210 #pragma pack()
01211 END_TERIMBER_NAMESPACE