00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #ifndef _terimber_msg_user_h_
00029 #define _terimber_msg_user_h_
00030
00031 #include "aiomsg/msg_conn.h"
00032 #include "aiomsg/msg_lsnr.h"
00033
00034 BEGIN_TERIMBER_NAMESPACE
00035 #pragma pack(4)
00036
00039 class msg_wait_reply
00040 {
00041 public:
00043 msg_wait_reply( event* event_
00044 );
00046 ~msg_wait_reply()
00047 {
00048 }
00049
00052 inline
00053 void
00054 set_reply( msg_cpp* msg_
00055 )
00056 {
00057
00058 _reply = msg_;
00059
00060 _event->signal();
00061 }
00062
00064 inline
00065 msg_cpp*
00066 get_reply()
00067 {
00068
00069 msg_cpp* retVal = _reply;
00070
00071 _reply = 0;
00072
00073 return retVal;
00074 }
00075 private:
00076 event* _event;
00077 msg_cpp* _reply;
00078 };
00079
00082 class msg_wait_async_reply
00083 {
00084 public:
00086 msg_wait_async_reply(const guid_t& ident,
00087 ub8_t timeout_
00088 );
00090 ~msg_wait_async_reply()
00091 {
00092 }
00094 inline
00095 void
00096 set_reply( msg_cpp* msg_
00097 )
00098 {
00099 _reply = msg_;
00100 }
00102 inline
00103 msg_cpp*
00104 get_reply()
00105 {
00106
00107 msg_cpp* retVal = _reply;
00108
00109 _reply = 0;
00110
00111 return retVal;
00112 }
00114 inline
00115 msg_cpp*
00116 get_reply_data( guid_t& ident
00117 )
00118 {
00119
00120 msg_cpp* retVal = _reply;
00121
00122 _reply = 0;
00123
00124 ident = _ident;
00125
00126 return retVal;
00127 }
00129 inline
00130 bool
00131 is_expired(sb8_t now) const
00132 {
00133 return _expired < now;
00134 }
00135
00136 private:
00137 guid_t _ident;
00138 msg_cpp* _reply;
00139 sb8_t _expired;
00140 };
00141
00144 class msg_user_connection : public msg_connection
00145 {
00146
00149 typedef map< guid_t, msg_wait_reply > reply_map_t;
00152 typedef map< guid_t, msg_wait_async_reply > reply_async_map_t;
00155 typedef list< msg_wait_async_reply > reply_async_list_t;
00156 public:
00158 msg_user_connection(msg_communicator* communicator,
00159 msg_callback_notify* callback,
00160 const conf_connection& info,
00161 size_t additional_threads = 0
00162 );
00164 virtual ~msg_user_connection();
00167 virtual
00168 void
00169 push_msg( msg_cpp* msg
00170 );
00172 static
00173 msg_user_connection*
00174 connect( msg_communicator* communicator,
00175 msg_callback_notify* callback,
00176 const conf_connection& info,
00177 size_t additional_threads
00178 );
00180 bool
00181 send( bool copy,
00182 msg_cpp* msg,
00183 msg_cpp*& reply
00184 );
00186 guid_t
00187 send_async( bool copy,
00188 msg_cpp* msg
00189 );
00191 bool
00192 post( bool copy,
00193 msg_cpp* msg
00194 );
00196 inline
00197 const char*
00198 get_last_error() const
00199 {
00200 return _error;
00201 }
00202 protected:
00204 inline
00205 bool
00206 peek_async() const
00207 {
00208 mutex_keeper keeper(_mtx_async_wait);
00209 return !_async_list.empty();
00210 }
00212 bool
00213 pop_async( msg_cpp*& msg,
00214 guid_t& ident
00215 );
00217 bool
00218 peek_async();
00220 bool
00221 pop_async_timeouted(guid_t& ident
00222 );
00224 bool
00225 peek_async_timeouted();
00226
00228 virtual
00229 void
00230 ping_notify();
00231 protected:
00234 virtual
00235 bool
00236 v_has_job( size_t ident,
00237 void* user_data
00238 );
00240 virtual
00241 void
00242 v_do_job( size_t ident,
00243 void* user_data
00244 );
00246 virtual
00247 void
00248 wakeup();
00249
00251 virtual
00252 void
00253 v_off();
00254 private:
00256 void
00257 process_income_message();
00258 private:
00259 string_t _error;
00260 msg_callback_notify* _callback;
00261 mutex _mtx_wait;
00262 mutex _mtx_async_wait;
00263 reply_map_t _map;
00264 reply_async_map_t _async_map;
00265 reply_async_list_t _async_list;
00266
00267 size_t _additional_threads;
00268 };
00269
00270 #pragma pack()
00271 END_TERIMBER_NAMESPACE
00272
00273 #endif // _terimber_msg_comm_h_
00274