00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #ifndef _terimber_msg_que_h_
00029 #define _terimber_msg_que_h_
00030
00031 #include "base/except.h"
00032 #include "base/primitives.h"
00033 #include "threadpool/threadpoolfactory.h"
00034 #include "aiomsg/msg_cpp.h"
00035 #include "aiomsg/msg_base.h"
00036
00037 BEGIN_TERIMBER_NAMESPACE
00038 #pragma pack(4)
00039
00042 template < size_t P = 3, size_t C = 1024 >
00043 class msg_queue
00044 {
00046 enum en_msg_queue
00047 {
00048 PRIORITY = P,
00049 CAPACITY = C
00050 };
00051 public:
00053 inline
00054 bool
00055 block();
00057 inline
00058 bool
00059 is_block();
00060 protected:
00062 msg_queue();
00064 inline
00065 bool
00066 pop( msg_cpp*& item
00067 );
00069 inline
00070 bool
00071 peek();
00073 inline
00074 bool
00075 touch( size_t& top_priority
00076 );
00078 virtual
00079 void
00080 wakeup() = 0;
00082 inline
00083 void
00084 push( msg_cpp* item
00085 );
00087 inline
00088 bool
00089 unblock();
00090
00091 private:
00093 static
00094 inline
00095 ub1_t
00096 _check( msg_cpp* item
00097 );
00098 private:
00099 bool _blocked;
00100 mutex _mtx_queue;
00101 list< msg_cpp* > _queue[PRIORITY];
00102 };
00103
00104 template < size_t P, size_t C >
00105 msg_queue< P, C >::msg_queue() :
00106 _blocked(false)
00107 {
00108 }
00109
00110 template < size_t P, size_t C >
00111 inline
00112 ub1_t
00113 msg_queue< P, C >::_check(msg_cpp* item)
00114 {
00115 return item->priority >= PRIORITY ? PRIORITY - 1 : (ub1_t)item->priority;
00116 }
00117
00118 template < size_t P, size_t C >
00119 inline
00120 void
00121 msg_queue< P, C >::push(msg_cpp* item)
00122 {
00123
00124 mutex_keeper keeper(_mtx_queue);
00125 if (_blocked)
00126 exception::_throw("Queue has been blocked");
00127
00128 list< msg_cpp* >& q = _queue[_check(item)];
00129 if (q.size() == CAPACITY)
00130 exception::_throw("Queue max capacity has been reached");
00131
00132 q.push_back(item);
00133
00134 wakeup();
00135 }
00136
00137 template < size_t P, size_t C >
00138 inline
00139 bool
00140 msg_queue< P, C >::pop(msg_cpp*& item)
00141 {
00142
00143 mutex_keeper keeper(_mtx_queue);
00144
00145 for (ub1_t index = 0; index < PRIORITY; ++index)
00146 {
00147 if (!_queue[index].empty())
00148 {
00149
00150 item = _queue[index].front();
00151
00152 _queue[index].pop_front();
00153 return true;
00154 }
00155 }
00156 return false;
00157 }
00158
00159 template < size_t P, size_t C >
00160 inline
00161 bool
00162 msg_queue< P, C >::peek()
00163 {
00164
00165 mutex_keeper keeper(_mtx_queue);
00166
00167 for (ub1_t index = 0; index < PRIORITY; ++index)
00168 {
00169 if (!_queue[index].empty())
00170 return true;
00171 }
00172 return false;
00173 }
00174
00175 template < size_t P, size_t C >
00176 inline
00177 bool
00178 msg_queue< P, C >::touch(size_t& top_priority)
00179 {
00180
00181 mutex_keeper keeper(_mtx_queue);
00182
00183 for (ub1_t index = 0; index < PRIORITY; ++index)
00184 {
00185 if (!_queue[index].empty())
00186 {
00187 top_priority = index;
00188 return true;
00189 }
00190 }
00191 return false;
00192 }
00193
00194 template < size_t P, size_t C >
00195 inline
00196 bool
00197 msg_queue< P, C >::block()
00198 {
00199
00200 mutex_keeper keeper(_mtx_queue);
00201 return !_blocked ? (_blocked = true) : false;
00202 }
00203
00204 template < size_t P, size_t C >
00205 inline
00206 bool
00207 msg_queue< P, C >::unblock()
00208 {
00209
00210 mutex_keeper keeper(_mtx_queue);
00211 return _blocked ? !(_blocked = false) : false;
00212 }
00213
00214 template < size_t P, size_t C >
00215 inline
00216 bool
00217 msg_queue< P, C >::is_block()
00218 {
00219
00220 mutex_keeper keeper(_mtx_queue);
00221 return _blocked;
00222 }
00223
00224
00225 class msg_communicator;
00228
00229 class msg_queue_processor : public msg_base,
00230 public msg_queue< 3 >,
00231 public terimber_thread_employer
00232 {
00233 public:
00235 msg_queue_processor(msg_communicator* communicator
00236 );
00238 ~msg_queue_processor();
00239 protected:
00241 virtual
00242 bool
00243 v_has_job( size_t ident,
00244 void* user_data
00245 );
00246
00247 protected:
00249 virtual
00250 void
00251 v_on();
00253 virtual
00254 void
00255 v_off();
00256 };
00257
00258 #pragma pack()
00259 END_TERIMBER_NAMESPACE
00260
00261 #endif // _terimber_msg_que_h_
00262