00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #ifndef _terimber_aiofile_h_
00029 #define _terimber_aiofile_h_
00030
00031 #include "base/primitives.h"
00032 #include "base/stack.h"
00033 #include "base/list.h"
00034 #include "base/map.h"
00035 #include "threadpool/threadpool.h"
00036 #include "threadpool/thread.h"
00037 #include "aiofile/aiofilefactory.h"
00038 #include "aiocomport/aiocomport.h"
00039
00040 BEGIN_TERIMBER_NAMESPACE
00041
00042 #pragma pack(4)
00043
00046 class aiofile_block : public OVERLAPPED
00047 {
00048 public:
00050 aiofile_block();
00052 aiofile_block(const aiofile_block& x);
00054 aiofile_block& operator=(const aiofile_block& x);
00055
00057 void
00058 clear();
00060 void
00061 set_timeout( size_t timeout
00062 );
00063
00064 public:
00065 aiofile_type _type;
00066 char* _buf;
00067 size_t _len;
00068 size_t _file_ident;
00069 size_t _processed;
00070 int _err;
00071 void* _userdata;
00072 #if OS_TYPE == OS_WIN32
00073 size_t
00074 #else
00075 timeval
00076 #endif
00077 _timeout;
00078 sb8_t _expired;
00079 };
00080
00083 typedef HANDLE aio_file_handle;
00084
00087 class aiofile : public terimber_thread_employer,
00088 public terimber_aiofile
00089 {
00092 typedef node_allocator< base_list< aiofile_block* >::_node > aiofile_pblock_allocator_t;
00095 typedef _list< aiofile_block*, aiofile_pblock_allocator_t > aiofile_pblock_alloc_list_t;
00098 typedef list< aiofile_block* > aiofile_pblock_list_t;
00101 typedef node_allocator< aiofile_block > aiofile_block_allocator_t;
00102
00105 class listener_info
00106 {
00107 public:
00109 listener_info( size_t curr_count,
00110 size_t max_count,
00111 void* userdata
00112 ) :
00113 _curr_count(curr_count),
00114 _max_count(max_count),
00115 _userdata(userdata)
00116 {
00117 }
00118 size_t _curr_count;
00119 size_t _max_count;
00120 void* _userdata;
00121 };
00122
00125 typedef map< size_t, listener_info > aiofile_listener_map_t;
00126
00129 class aiofile_file
00130 {
00131 public:
00133 aiofile_file( aio_file_handle handle,
00134 terimber_aiofile_callback* callback
00135
00136 ) : _handle(handle), _client_obj(callback), _callback_invoking(0)
00137 {
00138 }
00140 aiofile_file(const aiofile_file& x) : _handle(x._handle), _client_obj(x._client_obj), _callback_invoking(x._callback_invoking) {}
00142 ~aiofile_file()
00143 {
00144 assert(_incoming_list.empty());
00145 }
00146
00147
00148
00149 aio_file_handle _handle;
00150 terimber_aiofile_callback* _client_obj;
00151 aiofile_pblock_alloc_list_t _incoming_list;
00152 size_t _callback_invoking;
00153 };
00154
00157 typedef map< size_t, aiofile_file > aiofile_file_map_t;
00160 typedef map< size_t, aiofile_file >::iterator aiofile_file_map_iterator_t;
00163 typedef map< aio_file_handle, aiofile_file_map_iterator_t > aiofile_reverse_map_t;
00166 typedef map< aio_file_handle, aiofile_file_map_iterator_t >::iterator aiofile_reverse_map_iterator_t;
00169 typedef map< size_t, size_t > aiofile_delay_key_t;
00170
00172 void
00173 _clear_block_lists()
00174 {
00175 if (!_outgoing_list.empty())
00176 {
00177 for (aiofile_pblock_list_t::iterator out_iter = _outgoing_list.begin(); out_iter != _outgoing_list.end(); ++out_iter)
00178 {
00179 aiofile_block* block = *out_iter;
00180 _put_block(block);
00181 }
00182
00183 _outgoing_list.clear();
00184 }
00185
00186 #if OS_TYPE == OS_WIN32
00187 if (!_abounded_list.empty())
00188 {
00189 for (aiofile_pblock_list_t::iterator tm_iter = _abounded_list.begin(); tm_iter != _abounded_list.end(); ++tm_iter)
00190 {
00191 aiofile_block* block = *tm_iter;
00192 _put_block(block);
00193 }
00194
00195 _abounded_list.clear();
00196 }
00197 #endif
00198 if (!_initial_list.empty())
00199 {
00200 for (aiofile_pblock_list_t::iterator in_iter = _initial_list.begin(); in_iter != _initial_list.end(); ++in_iter)
00201 {
00202 aiofile_block* block = *in_iter;
00203 _put_block(block);
00204 }
00205
00206 _initial_list.clear();
00207 }
00208 }
00209
00211 inline
00212 aiofile_block*
00213 _get_block()
00214 {
00215 aiofile_block* ptr = _block_allocator.allocate();
00216 if (ptr)
00217 {
00218 new(ptr) aiofile_block();
00219 }
00220
00221 return ptr;
00222 }
00223
00225 inline
00226 void
00227 _put_block(aiofile_block* block)
00228 {
00229 block->~aiofile_block();
00230 _block_allocator.deallocate(block);
00231 }
00232
00233
00234 public:
00236 aiofile(size_t capacity,
00237 size_t deactivate_time_msec
00238 );
00240 ~aiofile();
00241
00243 bool
00244 on();
00246 void
00247 off();
00248
00250 virtual
00251 size_t
00252 open( const char* file_name,
00253 bool read_write,
00254 terimber_aiofile_callback* callback
00255 );
00257 virtual
00258 void
00259 close( size_t handle
00260 );
00262 virtual
00263 int
00264 write( size_t handle,
00265 size_t offset,
00266 const void* buf,
00267 size_t len,
00268 size_t timeout,
00269 void* userdata
00270 );
00272 virtual
00273 int
00274 read( size_t handle,
00275 size_t offset,
00276 void* buf,
00277 size_t len,
00278 size_t timeout,
00279 void* userdata
00280 );
00281
00283 virtual
00284 void
00285 doxray();
00286
00287 protected:
00289 virtual
00290 bool
00291 v_has_job( size_t ident,
00292 void* data
00293 );
00295 virtual
00296 void
00297 v_do_job( size_t ident,
00298 void* data
00299 );
00300
00301 private:
00303 bool
00304 resolve_socket_address(const char* address,
00305 unsigned short port,
00306 sockaddr_in& addr
00307 );
00308
00310 aio_file_handle
00311 find_socket_handle( size_t ident
00312 );
00313
00315 void
00316 process_timeouted_blocks();
00317
00319 void
00320 complete_block( size_t sock_key,
00321 aiofile_block* ov,
00322 int err,
00323 size_t processed
00324 );
00325
00327 int
00328 _activate_block( size_t ident,
00329 aiofile_block* block
00330 );
00331
00333 size_t
00334 _assign_file( aio_file_handle handle,
00335 terimber_aiofile_callback* callback
00336 );
00337
00339 void
00340 _cancel_file( aio_file_handle handle
00341 );
00342
00343 #if OS_TYPE != OS_WIN32
00344
00346 void
00347 _cancel_aio( aio_file_handle handle,
00348 OVERLAPPED* overlapped
00349 );
00350
00351 #endif
00352
00354 void
00355 _close_file( aio_file_handle handle
00356 );
00357
00358
00360 int
00361 _process_block( aiofile_block* block
00362 );
00363
00365 int
00366 _process_write( aio_file_handle handle,
00367 aiofile_block* block
00368 );
00370 int
00371 _process_read( aio_file_handle handle,
00372 aiofile_block* block
00373 );
00374
00376 void
00377 wait_for_io_completion();
00378
00379 public:
00380
00381 mutex _mtx;
00382 aiofile_file_map_t _file_map;
00383 aiofile_reverse_map_t _reverse_map;
00384 aiofile_delay_key_t _delay_key_map;
00385 unique_key_generator _file_generator;
00386 aiofile_pblock_allocator_t _incoming_list_allocator;
00387 aiofile_block_allocator_t _block_allocator;
00388 aiofile_pblock_list_t _initial_list;
00389 aiofile_pblock_list_t _outgoing_list;
00390
00391 #if OS_TYPE == OS_WIN32
00392 aiofile_pblock_list_t _abounded_list;
00393 #endif
00394
00395 private:
00396 HANDLE _aiofile_io_handle;
00397 threadpool _thread_pool;
00398 size_t _capacity;
00399 thread _in_thread;
00400 static bool _port_init;
00401 bool _on;
00402 bool _flag_io_port;
00403 event _start_io_port;
00404 event _stop_io_port;
00405 };
00406
00407
00408
00409 #pragma pack()
00410 END_TERIMBER_NAMESPACE
00411
00412 #endif // _terimber_aioport_h_