Home / Open source / Thread
Thread and supported portable classes
We are publishing the simple and at the same time very poweful multiplatform implementation of thread class.
To have deal with multithreaded environment we have to create synchronization objects first of all.
For correspondent OS define OS_TYPE like:
#define OS_WIN32 0
#define OS_UNIX 1
#define OS_LINUX 2
For Linux OS the follow defines will be useful:
typedef unsigned int size_t;
const size_t os_def_size = 4096;
const size_t os_minus_one = ~0;
#define INFINITE os_minus_one
#define INVALID_SOCKET os_minus_one
#define ERROR_INVALID_HANDLE os_minus_one
#define WAIT_OBJECT_0 0
#define WSABASEERR 10000
#define WSAEFAULT (WSABASEERR+14)
#define WSAENOTSOCK (WSABASEERR+38)
#define WSAETIMEDOUT (WSABASEERR+60)
#define WAIT_FAILED os_minus_one
#define WAIT_TIMEOUT 0x00000102L
#define SOCKET_ERROR os_minus_one
#define WSAECONNRESET (WSABASEERR+54)
#define WSAEMSGSIZE (WSABASEERR+40)
#define _MAX_PATH 260
#define _O_RDONLY O_RDONLY
#define _O_BINARY 0x0L
#define _S_IREAD S_IREAD
#define _O_WRONLY O_WRONLY
#define _O_CREAT O_CREAT
#define _O_TRUNC O_TRUNC
#define _S_IWRITE S_IWRITE
#define OS_TYPE OS_WIN32 // for Windows
#define OS_TYPE OS_LINUX // for Linux
#define OS_TYPE OS_UNIX // for Unix
Header file starts here.
/*
* The Software License
* ====================================================================
* Copyright (c) 2003-.The Terimber Corporation. All rights
* reserved.
* ====================================================================
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE TERIMBER CORPORATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*/
// thread.h
#ifndef _terimber_thread_h_
#define _terimber_thread_h_
//////////////////////////////////////////////////////////////////////
// mutex
// class supports mutex
class mutex
{
// prevent copy objects
mutex(const mutex& src);
mutex& operator=(const mutex& src);
public:
#if OS_TYPE == OS_WIN32
typedef CRITICAL_SECTION _HANDLE_;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
typedef pthread_mutex_t _HANDLE_;
#else
#error code is not implemented
#endif
//
// constructor
//
mutex();
//
// destructor
//
~mutex();
//
// lock mutex
// NB!!! can be used in const member function
//
bool lock() const;
//
// unlock mutex
// NB!!! can be used in const member function
//
void unlock() const;
//
// try to lock mutex
// NB!!! can be used in const member function
//
bool trylock() const;
//
// return handle
//
operator _HANDLE_*() const
{
return &_handle;
}
private:
mutable _HANDLE_ _handle; // handle of mutex resource
};
// automatic mutex locker
// any class that supports two const functions: lock() & unlock()
// can be used as template argument
class mutex_keeper
{
public:
//
// constructor
//
mutex_keeper(const mutex& mtx, bool use_try = false) : _mtx(mtx)
{
_locked = use_try ? _mtx.trylock() : _mtx.lock();
}
//
// destructor
//
~mutex_keeper() { if (_locked) _mtx.unlock(); }
//
// unlock explicitly
//
inline void unlock() const { if (_locked) { _mtx.unlock(); _locked = false; } }
//
// lock explicitly
//
inline void lock() const { if (!_locked) { _mtx.lock(); _locked = true; } }
//
// check the lock state
//
inline operator bool() const { return _locked; }
inline bool operator!() const { return !_locked; }
private:
const mutex& _mtx; // mutex
mutable bool _locked; // lock flag for explicit unlock
};
/////////////////////////////////////////////////////////////////
// event
// class supports event
class event
{
// prevent copy objects
event(const event& src);
event& operator=(const event& src);
public:
#if OS_TYPE == OS_WIN32
typedef HANDLE _HANDLE_;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
typedef struct HANDLE {
mutex _mtx;
pthread_cond_t _cond;
bool _manual_reset;
bool _signaled;
} *_HANDLE_;
#else
#error code is not implemented
#endif
//
// constructor
//
event(bool manual_reset = false, bool init_state = false);
//
// destructor delete event
//
~event();
//
// signal event
// NB!!! can be used in const member function
//
void signal() const;
//
// nonsignal event
// NB!!! can be used in const member function
//
void nonsignal() const;
//
// return handle
//
operator _HANDLE_() const
{
return
#if OS_TYPE == OS_WIN32
_handle;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
&_handle;
#else
#error code is not implemented
#endif
}
//
// wait for signal state
// NB!!! can be used in const member function
//
size_t wait(size_t timeout = INFINITE) const;
private:
mutable HANDLE _handle; // keep event resource
};
// class supports semaphores
class semaphore
{
// prevent copy objects
semaphore(const semaphore& src);
semaphore& operator=(const semaphore& src);
public:
#if OS_TYPE == OS_WIN32
typedef HANDLE _HANDLE_;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
typedef struct HANDLE {
mutex _mtx;
pthread_cond_t _cond;
size_t _max_count;
size_t _init_count;
} *_HANDLE_;
#else
#error code is not implemented
#endif
//
// constructor
//
semaphore(size_t initial_count = 1, size_t max_count = 1);
//
// destructor
//
~semaphore();
//
// release semaphore
// NB!!! can be used in const member function
//
bool release(size_t count = 1) const;
//
// wait for signal state
// NB!!! can be used in const member function
//
size_t wait(size_t timeout = INFINITE) const;
private:
mutable HANDLE _handle; // keep semaphore resource
};
// automatic semaphore accessor
// any class that supports two const functions: wait(size_t) & release()
// can be used as template argument
class semaphore_keeper
{
public:
//
// constructor
//
semaphore_keeper(const semaphore& sema, size_t timeout = INFINITE) : _sema(sema) { _approved = WAIT_OBJECT_0 == _sema.wait(timeout); }
//
// destructor
//
~semaphore_keeper() { if (_approved) _sema.release(); }
//
// check the access state
//
inline operator bool() const { return _approved; }
inline bool operator!() const { return !_approved; }
private:
const semaphore& _sema; // semaphore
bool _approved; // result of accessing
};
// class supports free-thread counter with a signal
// when count reachs zero
class counter
{
// prevent copy objects
counter(const counter& src);
counter& operator=(const counter& src);
public:
//
// constructor
//
counter(size_t init_count = 0);
//
// increment counter
//
size_t increment() const;
//
// decrement counter
//
size_t decrement() const;
//
// event will be fired when counter will equal zero
//
size_t wait(size_t timeout = INFINITE) const;
private:
mutable size_t _count; // keep counter
event _event; // event
mutex _mutex; // mutex
};
// class supports free-thread restrict source access
// there are two sides: server (1) and clients (N)
//
class gate
{
// prevent copy objects
gate(const gate& src);
gate& operator=(const gate& src);
public:
//
// constructor
//
gate(size_t max_count);
//
// client calls when start using resource
//
bool entry(size_t timeout = INFINITE) const;
//
// client calls when finish using resource
//
void leave() const;
//
// server call to prevent entries of new clients
// and wait untill all clients will leave resource
//
bool lock(size_t timeout = INFINITE) const;
//
// allows new client to come
//
void unlock() const;
private:
semaphore _sema; // semaphore
size_t _max_count; // max visitors
};
// automatic gate keeper
class gate_keeper
{
public:
//
// constructor
//
gate_keeper(const gate& gate_, size_t timeout = INFINITE) : _gate(gate_) { _locked = _gate.lock(timeout); }
//
// destructor
//
~gate_keeper() { if (_locked) _gate.unlock(); }
//
// check the locked state
//
inline operator bool() const { return _locked; }
inline bool operator!() const { return !_locked; }
private:
bool _locked; // flag that gate was locked
const gate& _gate; // gate
};
// class supports automatic entry/leave of gate
class gate_client
{
public:
//
// constructor
//
gate_client(const gate& gate_, size_t timeout = INFINITE) : _gate(gate_) { _inside = _gate.entry(timeout); }
//
// destructor
//
~gate_client() { if (_inside) _gate.leave(); }
// check access flag
inline operator bool() const { return _inside; }
inline bool operator!() const { return !_inside; }
private:
bool _inside; // flag that client was allowed to enter to the gate
const gate& _gate; // gate
};
// base job employer class
// employer is responsible for synchronization
// while job request and job processing
class job_employer
{
public:
//
// destructor
//
virtual ~job_employer() {}
//
// the real implementation depends on what to do
// leave for future implementation
//
virtual bool v_has_job(size_t ident, void* user_data) = 0;
virtual void v_do_job(size_t ident, void* user_data) = 0;
};
// class to send task to client thread
class job_task
{
public:
//
// constructor
//
job_task(job_employer* employer, size_t ident, size_t timeout, void* user_data) :
_employer(employer), _ident(ident), _timeout(timeout), _user_data(user_data) {}
//
// copy constructor
//
job_task(const job_task& src) { *this = src; }
//
// assign operator
//
job_task& operator=(const job_task& src)
{
if (this != &src)
{
_employer = src._employer;
_ident = src._ident;
_timeout = src._timeout;
_user_data = src._user_data;
}
return *this;
}
//
// clear job
//
void clear()
{
_employer = 0;
_ident = 0;
_timeout = INFINITE;
_user_data = 0;
}
job_employer* _employer; // pointer to the class which will be doing the real job
size_t _ident; // ident, employer might to differ the different task inside do_job function
size_t _timeout; // timeout
void* _user_data; // additional data for employer class
};
// thread states
enum thread_state
{
THREAD_CLOSE, // thread is not running at all
THREAD_STARTING, // thread is starting
THREAD_RUNNING, // thread is running and doing job
THREAD_SLEEPING, // thread is running and sleeping for waikup
THREAD_WAITING, // thread is running and waiting for job
THREAD_STOPPING // thread is stopping
};
// safe thread class
class thread
{
// thread run function
#if OS_TYPE == OS_WIN32
static unsigned int __stdcall start_thread(void* data);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
static void* start_thread(void* data);
#else
#error code is not implemented
#endif
// prevent copy objects
thread(const thread& src);
thread& operator=(const thread& src);
public:
//
// constructor
//
thread();
//
// destructor
//
~thread();
//
// returns current state - thread safe
//
thread_state get_state() const;
//
// start thread, return false if thread already started
//
bool start();
//
// assign new employer - thread safe, returns false if pointer is null
// or thread is not running
// job_wait_timeout is parameter for new job waiting, if INFINITE,
// user has to call wakeup function to activate thread
//
bool assign_job(const job_task& job_task, size_t job_wait_timeout = INFINITE);
//
// cancel current job - thread safe, returns false if thread is not running
//
bool cancel_job(size_t job_wait_timeout = INFINITE);
//
// stop thread, returns false if thread already stopped
//
bool stop();
//
// signals thread to wakeup, if thread is in waiting state
//
void wakeup() const;
private:
//
// returns current state
//
void set_state(thread_state new_state);
//
// check current state, should be THREAD_RUNNING
// then check does employer have a job
//
bool get_job();
//
// lock semaphore while calling employer function
//
void job_doing();
//
// wait for either new job (return true) or finish work (return false)
//
bool sleep();
private:
job_task _job_task; // keep pointer to employer object
mutex _mtx; // mutex for thread safe access to thread state
event _ev_start; // event is activated inside thread and deactivated automatically
event _ev_wakeup; // event is activated by caller and deactivated automatically
event _ev_end; // event is activated inside thread just before exit and deactivated automatically
mutex _mtx_job; // mutex for job tasj access - to share access to emplyer object
thread_state _state; // keeps thread state
#if OS_TYPE == OS_WIN32
HANDLE
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
pthread_t
#else
#error code is not implemented
#endif
_handle; // keep thread resources
};
#endif // _terimber_thread_h_
Here the source file begins
// thread.cpp
#include "thread.h"
///////////////////////////////////////////////////////////
// constructor
mutex::mutex()
{
#if OS_TYPE == OS_WIN32
InitializeCriticalSection(&_handle);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
pthread_mutex_init(&_handle, 0);
#else
#error code is not implemented
#endif
}
// destructor
mutex::~mutex()
{
#if OS_TYPE == OS_WIN32
DeleteCriticalSection(&_handle);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
pthread_mutex_destroy(&_handle);
#else
#error code is not implemented
#endif
}
// NB!!! can be used in const member function
bool
mutex::lock() const
{
#if OS_TYPE == OS_WIN32
EnterCriticalSection(&_handle);
return true;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
return !pthread_mutex_lock(&_handle);
#else
#error code is not implemented
#endif
}
// unlock mutex
// NB!!! can be used in const member function
void
mutex::unlock() const
{
#if OS_TYPE == OS_WIN32
LeaveCriticalSection(&_handle);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
pthread_mutex_unlock(&_handle);
#else
#error code is not implemented
#endif
}
// NB!!! can be used in const member function
bool
mutex::trylock() const
{
#if OS_TYPE == OS_WIN32
return TRUE == TryEnterCriticalSection(&_handle);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
return !pthread_mutex_trylock(&_handle);
#else
#error code is not implemented
#endif
}
///////////////////////////////////////////////////////////
// constructor create event
event::event(bool manual_reset, bool init_state)
{
#if OS_TYPE == OS_WIN32
// create event
_handle = CreateEvent(0, manual_reset, init_state, 0);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
// init condition thread
pthread_cond_init(&_handle._cond, 0);
// set initial setting
_handle._manual_reset = manual_reset;
_handle._signaled = init_state;
#else
#error code is not implemented
#endif
}
// destructor
event::~event()
{
#if OS_TYPE == OS_WIN32
// close event handle
CloseHandle(_handle);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
// destroy condition thread
pthread_cond_destroy(&_handle._cond);
#else
#error code is not implemented
#endif
}
// signal event
// NB!!! can be used in const member function
void
event::signal() const
{
#if OS_TYPE == OS_WIN32
// set event to the signal state
SetEvent(_handle);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
// lock mutex
mutex_keeper keeper(_handle._mtx);
_handle._manual_reset ? // manual reset
pthread_cond_broadcast(&_handle._cond) : // Unblock everyone waiting
pthread_cond_signal(&_handle._cond); // Unblock just one waiter
// set state if any
_handle._signaled = true;
#else
#error code is not implemented
#endif
}
// nonsignal event
// NB!!! can be used in const member function
void
event::nonsignal() const
{
#if OS_TYPE == OS_WIN32
// set event to the non signal state
ResetEvent(_handle);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
mutex_keeper keeper(_handle._mtx);
_handle._signaled = false;
#else
#error code is not implemented
#endif
}
// wait for signal state
// NB!!! can be used in const member function
size_t
event::wait(size_t timeout) const
{
#if OS_TYPE == OS_WIN32
// wait signal
return WaitForSingleObject(_handle, timeout);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
// If the event is not in the signaled state, we will have to wait.
mutex_keeper keeper(_handle._mtx);
int ret = 0; // If we don't do anything it's because it's free already
if (!_handle._signaled) // if already in a signal state just return
{
if (!timeout) // no time for waiting
return WAIT_TIMEOUT;
else
{
timespec timeout_;
if (INFINITE != timeout)
{
// set timeout
timeval now_;
gettimeofday(&now_, 0);
timeout_.tv_sec = now_.tv_sec + timeout / 1000;
timeout_.tv_nsec = (((timeout % 1000) * 1000 + now_.tv_usec) % 1000000) * 1000;
}
// wait until condition thread returns control
do
{
ret = (INFINITE == timeout ? pthread_cond_wait(&_handle._cond, _handle._mtx) :
pthread_cond_timedwait(&_handle._cond, _handle._mtx, &timeout_));
}
while (!ret && !_handle._signaled);
}
}
// adjust signaled member
switch (ret)
{
case 0: // success
if (!_handle._manual_reset)
_handle._signaled = false;
return WAIT_OBJECT_0;
case ETIMEDOUT:
default:
return WAIT_TIMEOUT;
}
#else
#error code is not implemented
#endif
}
///////////////////////////////////////////////////////////
// constructor create semaphore
semaphore::semaphore(size_t initial_count, size_t max_count)
{
// check reasonable params
assert(max_count != 0);
assert(initial_count <= max_count);
#if OS_TYPE == OS_WIN32
// create semaphore
_handle = CreateSemaphore(0, initial_count, max_count, 0);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
// create condition thread
pthread_cond_init(&_handle._cond, 0);
_handle._max_count = max_count;
_handle._init_count = initial_count;
#else
#error code is not implemented
#endif
}
// destructor
semaphore::~semaphore()
{
#if OS_TYPE == OS_WIN32
// close semaphore handle
CloseHandle(_handle);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
// destroy condition thread
pthread_cond_destroy(&_handle._cond);
#else
#error code is not implemented
#endif
}
// release semaphore
// NB!!! can be used in const member function
bool
semaphore::release(size_t count) const
{
if (!count) // nothing to do
return false;
#if OS_TYPE == OS_WIN32
// release semaphore
return TRUE == ReleaseSemaphore(_handle, count, 0);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
// lock mutex
mutex_keeper keeper(_handle._mtx);
if (_handle._init_count >= _handle._max_count)
return false;
// increment semaphore counter
if (_handle._init_count + count > _handle._max_count)
_handle._init_count = _handle._max_count;
else
_handle._init_count += count;
// set condition thread
count == 1 ?
pthread_cond_signal(&_handle._cond) :
pthread_cond_broadcast(&_handle._cond);
return true;
#else
#error code is not implemented
#endif
}
// wait for signal state
// NB!!! can be used in const member function
size_t
semaphore::wait(size_t timeout) const
{
#if OS_TYPE == OS_WIN32
// wait for semaphore event
return WaitForSingleObject(_handle, timeout);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
mutex_keeper keeper(_handle._mtx);
int ret = 0;; // If we don't do anything it's because it's available already
if (!_handle._init_count) // if we can acquire resource
{
if (!timeout)
return WAIT_TIMEOUT;
else
{
timespec timeout_;
if (INFINITE != timeout)
{
// set timeout
timeval now_;
gettimeofday(&now_, 0);
timeout_.tv_sec = now_.tv_sec + timeout / 1000;
timeout_.tv_nsec = (((timeout % 1000) * 1000 + now_.tv_usec) % 1000000) * 1000;
}
// wait until condition thread returns control
do
{
ret = (INFINITE == timeout ? pthread_cond_wait(&_handle._cond, _handle._mtx) :
pthread_cond_timedwait(&_handle._cond, _handle._mtx, &timeout_));
}
while (!ret && !_handle._init_count);
}
}
// adjust count member
switch (ret)
{
case 0: // success
--_handle._init_count;
return WAIT_OBJECT_0;
case ETIMEDOUT:
default:
return WAIT_TIMEOUT;
}
#else
#error code is not implemented
#endif
}
///////////////////////////////////////////////////////////
counter::counter(size_t init_count) : _count(init_count),
_event(true, init_count == 0)
{}
size_t
counter::increment() const
{
mutex_keeper keeper(_mutex);
_event.nonsignal();
return ++_count;
}
size_t
counter::decrement() const
{
mutex_keeper keeper(_mutex);
if (_count == 1)
_event.signal();
_count != 0 ? --_count : 0;
return _count;
}
size_t
counter::wait(size_t timeout) const
{
return _event.wait(timeout);
}
///////////////////////////////////////////////////////////
gate::gate(size_t max_count) :
_sema(max_count, max_count),
_max_count(max_count)
{
}
// client calls when start using resource
bool
gate::entry(size_t timeout) const
{
return WAIT_OBJECT_0 == _sema.wait(timeout);
}
// client calls when finish using resource
void
gate::leave() const
{
_sema.release();
}
// server call to prevent entries of new clients
// and wait untill all clients will leave resource
bool
gate::lock(size_t timeout) const
{
size_t count = 0;
for (; count < _max_count && WAIT_OBJECT_0 == _sema.wait(timeout); ++count);
return count == _max_count;
}
// free resource for client entries
void
gate::unlock() const
{
_sema.release(_max_count);
}
///////////////////////////////////////////////////////////
// static
#if OS_TYPE == OS_WIN32
unsigned int __stdcall
thread::start_thread(void* data)
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
void*
thread::start_thread(void* data)
#else
#error code is not implemented
#endif
{
// copy this pointer
thread* _this = (thread*)data;
// signal about start thread;
_this->_ev_start.signal();
// do working
while (_this->sleep())
{
// set state
_this->set_state(THREAD_RUNNING);
// go do job
// use try block
try
{
while (_this->get_job() )
_this->job_doing();
}
catch (...)
{
// well, well ...
// employer has thrown exception - bad guy
// but we are still alive and will continue to work
assert(false);
}
}
// need set close thread event to signal state
// to notify parent thread about exiting from thread
_this->_ev_end.signal();
// leave thread
return 0;
}
thread::thread() :
_handle(0),
_job_task(0, 0, INFINITE, 0),
_state(THREAD_CLOSE)
{
}
thread::~thread()
{
if (_handle)
stop();
}
thread_state
thread::get_state() const
{
// lock mutex
mutex_keeper keeper(_mtx);
return _state;
}
void
thread::set_state(thread_state new_state)
{
// lock mutex
mutex_keeper keeper(_mtx);
// set state
if (_state != THREAD_STOPPING)
_state = new_state;
}
bool
thread::start()
{
// analyze thread handle
#if OS_TYPE == OS_WIN32
if (::GetCurrentThread() == _handle)
return false;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
if (pthread_self() == _handle)
return false;
#else
#error code is not implemented
#endif
// check state
// start only stop thread
if (_state != THREAD_CLOSE)
return false;
// set state
set_state(THREAD_STARTING);
// nonsignal event
_ev_start.nonsignal();
_ev_end.nonsignal();
// create thread
#if OS_TYPE == OS_WIN32
// dummy thread ID
unsigned int threadID = 0;
if (!(_handle = (void*)_beginthreadex(0, 0, start_thread, this, 0, &threadID)))
return false;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
if (pthread_create(&_handle, 0, start_thread, this))
return false;
#else
#error code is not implemented
#endif
// wait for start
_ev_start.wait();
// sleep, waiting for job
set_state(THREAD_WAITING);
return true;
}
bool
thread::assign_job(const job_task& job_task, size_t job_wait_timeout)
{
// analyze thread handle
#if OS_TYPE == OS_WIN32
if (::GetCurrentThread() == _handle)
return false;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
if (pthread_self() == _handle)
return false;
#else
#error code is not implemented
#endif
// check state
switch (_state)
{
case THREAD_WAITING:
case THREAD_SLEEPING:
case THREAD_RUNNING:
{
mutex_keeper keeper(_mtx_job);
_job_task = job_task;
}
break;
default:
return false;
}
// wakeup thread
_ev_wakeup.signal();
return true;
}
bool
thread::cancel_job(size_t job_wait_timeout)
{
// analyze thread handle
#if OS_TYPE == OS_WIN32
if (::GetCurrentThread() == _handle)
return false;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
if (pthread_self() == _handle)
return false;
#else
#error code is not implemented
#endif
// wait for semaphore
//semaphore_keeper keeper(_sema, job_wait_timeout);
// check state
//switch (get_state())
switch (_state)
{
case THREAD_SLEEPING:
case THREAD_RUNNING:
{
mutex_keeper keeper(_mtx_job);
_job_task.clear();
}
break;
default:
return true;
}
// set state to nothing
set_state(THREAD_WAITING);
return _state == THREAD_WAITING;
}
bool
thread::stop()
{
// analyze thread handle
#if OS_TYPE == OS_WIN32
if (::GetCurrentThread() == _handle)
return false;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
if (pthread_self() == _handle)
return false;
#else
#error code is not implemented
#endif
// check state
if (_state == THREAD_CLOSE)
return false;
// set state to stop
set_state(THREAD_STOPPING);
// wake up thread
_ev_wakeup.signal();
// wait end thread
_ev_end.wait();
#if OS_TYPE == OS_WIN32
// wait windows thread ending
if (WAIT_TIMEOUT != WaitForSingleObject(_handle, 3000))
// close handle couse we started with beginthreadex function
CloseHandle(_handle);
else // troubles while close thread gracefully
TerminateThread(_handle, 0);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
if (pthread_detach(_handle) != 0)
pthread_cancel(_handle);
#else
#error code is not implemented
#endif
_handle = 0;
// set state to close
{
// lock mutex
mutex_keeper keeper(_mtx);
_state = THREAD_CLOSE;
}
return true;
}
void
thread::wakeup() const
{
_ev_wakeup.signal();
}
bool
thread::get_job()
{
if (_state != THREAD_RUNNING) // check thread state
return false;
mutex_keeper keeper(_mtx_job);
job_task task = _job_task;
if (!task._employer) // check pointer
return false;
keeper.unlock();
return task._employer->v_has_job(task._ident, task._user_data); // check job
}
void
thread::job_doing()
{
mutex_keeper keeper(_mtx_job);
job_task task = _job_task;
if (!task._employer) // check pointer
return;
keeper.unlock();
task._employer->v_do_job(task._ident, task._user_data); // check job
}
bool
thread::sleep()
{
// check stop
if (_state == THREAD_STOPPING)
return false;
// set state for waiting
set_state(THREAD_SLEEPING);
// lock job resources
size_t wait_timeout = INFINITE;
{
mutex_keeper keeper(_mtx_job);
wait_timeout = _job_task._timeout;
}
// wait wake up signal
_ev_wakeup.wait(wait_timeout);
// check state
return _state == THREAD_STOPPING ? false : true;
}
|
|