Thread portable class


Gate portable class


Timer class


Pool class


Terimber 2.0


About C++


Downloads Products & Services Support Clients Open Source About



Home / Open source / Thread

Thread and supported portable classes

We are publishing the simple and at the same time very poweful multiplatform implementation of thread class.
To have deal with multithreaded environment we have to create synchronization objects first of all.
For correspondent OS define OS_TYPE like:

#define OS_WIN32	0
#define OS_UNIX	1
#define OS_LINUX	2


For Linux OS the follow defines will be useful:

typedef unsigned int size_t;
const size_t os_def_size = 4096;
const size_t os_minus_one = ~0;
#define INFINITE 		os_minus_one
#define INVALID_SOCKET 	os_minus_one
#define ERROR_INVALID_HANDLE os_minus_one
#define WAIT_OBJECT_0 	0

#define WSABASEERR	10000
#define WSAEFAULT		(WSABASEERR+14)

#define WSAENOTSOCK	(WSABASEERR+38)
#define WSAETIMEDOUT	(WSABASEERR+60)
#define WAIT_FAILED     os_minus_one
#define WAIT_TIMEOUT	0x00000102L
#define SOCKET_ERROR    os_minus_one
#define WSAECONNRESET	(WSABASEERR+54)
#define WSAEMSGSIZE	(WSABASEERR+40)
#define _MAX_PATH       260
#define _O_RDONLY       O_RDONLY
#define _O_BINARY       0x0L
#define _S_IREAD        S_IREAD
#define _O_WRONLY       O_WRONLY
#define _O_CREAT        O_CREAT
#define _O_TRUNC        O_TRUNC
#define _S_IWRITE       S_IWRITE         


#define OS_TYPE OS_WIN32 // for Windows
#define OS_TYPE OS_LINUX // for Linux
#define OS_TYPE OS_UNIX // for Unix

Header file starts here.

/*
 * The Software License
 * ====================================================================
 * Copyright (c) 2003-.The Terimber Corporation. All rights
 * reserved.
 * ====================================================================
 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED.  IN NO EVENT SHALL THE TERIMBER CORPORATION OR
 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 * ====================================================================
*/

// thread.h

#ifndef _terimber_thread_h_
#define _terimber_thread_h_

//////////////////////////////////////////////////////////////////////
// mutex
// class supports mutex
class mutex
{
	// prevent copy objects
	mutex(const mutex& src);
	mutex& operator=(const mutex& src);

public:
#if OS_TYPE == OS_WIN32
	typedef CRITICAL_SECTION _HANDLE_;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	typedef pthread_mutex_t _HANDLE_;
#else
#error code is not implemented
#endif
	
	//
	// constructor
	//
	mutex();
	//
	// destructor
	//
	~mutex();
	//
	// lock mutex
	// NB!!! can be used in const member function
	//
	bool lock() const;
	//
	// unlock mutex
	// NB!!! can be used in const member function
	//
	void unlock() const;
	//
	// try to lock mutex
	// NB!!! can be used in const member function
	//
	bool trylock() const;
	//
	// return handle
	//

	operator _HANDLE_*() const 
	{ 
		return &_handle;
	}

private:
	mutable _HANDLE_ _handle; // handle of mutex resource
};

// automatic mutex locker
// any class that supports two const functions: lock() & unlock()
// can be used as template argument
class mutex_keeper
{
public:
	//
	// constructor
	//
	mutex_keeper(const mutex& mtx, bool use_try = false) : _mtx(mtx) 
	{ 
		_locked = use_try ? _mtx.trylock() : _mtx.lock();
	}
	//
	// destructor
	//
	~mutex_keeper() { if (_locked) _mtx.unlock(); }
	//
	// unlock explicitly
	//
	inline void unlock() const { if (_locked) { _mtx.unlock(); _locked = false; } }
	//
	// lock explicitly
	//
	inline void lock() const { if (!_locked) { _mtx.lock(); _locked = true; } }
	//
	// check the lock state
	//
	inline operator bool() const { return _locked; }
	inline bool operator!() const { return !_locked; }
private:
	const mutex&	_mtx; // mutex
	mutable bool	_locked; // lock flag for explicit unlock			
};

/////////////////////////////////////////////////////////////////
// event
// class supports event
class event
{
	// prevent copy objects
	event(const event& src);
	event& operator=(const event& src);
public:

#if OS_TYPE == OS_WIN32
	typedef HANDLE _HANDLE_;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	typedef struct HANDLE {
		mutex				_mtx;
		pthread_cond_t		_cond;
		bool				_manual_reset;
		bool				_signaled; 
	} *_HANDLE_;


#else
#error code is not implemented
#endif

	//
	// constructor
	//
	event(bool manual_reset = false, bool init_state = false);
	//
	// destructor delete event
	//
	~event();
	//
	// signal event
	// NB!!! can be used in const member function
	//
	void signal() const;
	//
	// nonsignal event
	// NB!!! can be used in const member function
	//
	void nonsignal() const;
	//
	// return handle
	//

	operator _HANDLE_() const 
	{ 
		return
#if OS_TYPE == OS_WIN32
		_handle;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
		&_handle; 
#else
#error code is not implemented
#endif
	}

	//
	// wait for signal state
	// NB!!! can be used in const member function
	//
	size_t wait(size_t timeout = INFINITE) const;
private:
	mutable HANDLE _handle; // keep event resource
};

// class supports semaphores
class semaphore
{
	// prevent copy objects
	semaphore(const semaphore& src);
	semaphore& operator=(const semaphore& src);

public:
#if OS_TYPE == OS_WIN32
	typedef HANDLE _HANDLE_;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	typedef struct HANDLE {
		mutex				_mtx;
		pthread_cond_t		_cond;
		size_t			_max_count;
		size_t			_init_count;
	} *_HANDLE_;


#else
#error code is not implemented
#endif

	//
	// constructor
	//
	semaphore(size_t initial_count = 1, size_t max_count = 1);
	//
	// destructor
	//
	~semaphore();
	//
	// release semaphore
	// NB!!! can be used in const member function
	//
	bool release(size_t count = 1) const;
	//
	// wait for signal state
	// NB!!! can be used in const member function
	//
	size_t wait(size_t timeout = INFINITE) const;	
private:
	mutable HANDLE _handle; // keep semaphore resource
};

// automatic semaphore accessor
// any class that supports two const functions: wait(size_t) & release()
// can be used as template argument
class semaphore_keeper
{
public:
	//
	// constructor
	//
	semaphore_keeper(const semaphore& sema, size_t timeout = INFINITE) : _sema(sema) { _approved = WAIT_OBJECT_0 == _sema.wait(timeout); }
	//
	// destructor
	//
	~semaphore_keeper() { if (_approved) _sema.release(); }
	//
	// check the access state
	//
	inline operator bool() const { return _approved; }
	inline bool operator!() const { return !_approved; }
	
private:
	const semaphore&	_sema; // semaphore
	bool			_approved; // result of accessing		
};

// class supports free-thread counter with a signal
// when count reachs zero
class counter
{
	// prevent copy objects
	counter(const counter& src);
	counter& operator=(const counter& src);
public:
	//
	// constructor
	//
	counter(size_t init_count = 0);
	//
	// increment counter
	//
	size_t increment() const;
	//
	// decrement counter
	//
	size_t decrement() const;
	//
	// event will be fired when counter will equal zero
	//
	size_t wait(size_t timeout = INFINITE) const;
private:
	mutable size_t	_count; // keep counter
	event			_event; // event
	mutex			_mutex; // mutex
};

// class supports free-thread restrict source access
// there are two sides: server (1) and clients (N)
// 
class gate
{	
	// prevent copy objects
	gate(const gate& src);
	gate& operator=(const gate& src);
public:
	//
	// constructor
	//
	gate(size_t max_count);
	//
	// client calls when start using resource
	//
	bool entry(size_t timeout = INFINITE) const;
	//
	// client calls when finish using resource
	//
	void leave() const;
	//
	// server call to prevent entries of new clients 
	// and wait untill all clients will leave resource
	//
	bool lock(size_t timeout = INFINITE) const;
	//
	// allows new client to come
	//
	void unlock() const;
private:
	semaphore	_sema;		// semaphore
	size_t	_max_count; // max visitors
};

// automatic gate keeper
class gate_keeper
{
public:
	//
	// constructor
	//
	gate_keeper(const gate& gate_, size_t timeout = INFINITE) : _gate(gate_) { _locked = _gate.lock(timeout); }
	//
	// destructor
	//
	~gate_keeper() { if (_locked) _gate.unlock(); }
	//
	// check the locked state
	//
	inline operator bool() const { return _locked; }
	inline bool operator!() const { return !_locked; }
private:
	bool		_locked; // flag that gate was locked
	const gate&	_gate; // gate
};

// class supports automatic entry/leave of gate
class gate_client
{
public:
	//
	// constructor
	//
	gate_client(const gate& gate_, size_t timeout = INFINITE) : _gate(gate_) { _inside = _gate.entry(timeout); }
	//
	// destructor
	//
	~gate_client() { if (_inside) _gate.leave(); }
	// check access flag
	inline operator bool() const { return _inside; }
	inline bool operator!() const { return !_inside; }
private:
	bool		_inside; // flag that client was allowed to enter to the gate
	const gate&	_gate; // gate
};


// base job employer class
// employer is responsible for synchronization
// while job request and job processing
class job_employer
{
public:
	//
	// destructor
	//
	virtual ~job_employer() {}
	//
	// the real implementation depends on what to do
	// leave for future implementation
	//
	virtual bool v_has_job(size_t ident, void* user_data) = 0;
	virtual void v_do_job(size_t ident, void* user_data) = 0;
};

// class to send task to client thread
class job_task
{
public:
	//
	// constructor
	//
	job_task(job_employer* employer, size_t ident, size_t timeout, void* user_data) :
		_employer(employer), _ident(ident), _timeout(timeout), _user_data(user_data) {}
	//
	// copy constructor
	//
	job_task(const job_task& src) { *this = src; }
	//
	// assign operator
	//
	job_task& operator=(const job_task& src)
	{
		if (this != &src)
		{
			_employer = src._employer;
			_ident = src._ident; 
			_timeout = src._timeout;
			_user_data = src._user_data;
		}
		return *this;
	}
	//
	// clear job
	//
	void clear()
	{
		_employer = 0;
		_ident = 0; 
		_timeout = INFINITE;
		_user_data = 0;
	}

	job_employer*	_employer;	// pointer to the class which will be doing the real job
	size_t		_ident;		// ident, employer might to differ the different task inside do_job function 
	size_t		_timeout;	// timeout
	void*			_user_data; // additional data for employer class
};

// thread states
enum thread_state
{
	THREAD_CLOSE,		// thread is not running at all
	THREAD_STARTING,	// thread is starting
	THREAD_RUNNING,		// thread is running and doing job
	THREAD_SLEEPING,	// thread is running and sleeping for waikup 
	THREAD_WAITING,		// thread is running and waiting for job 
	THREAD_STOPPING		// thread is stopping
};

// safe thread class
class thread
{	
	// thread run function
#if OS_TYPE == OS_WIN32
	static unsigned int __stdcall start_thread(void* data);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	static void* start_thread(void* data);
#else
#error code is not implemented
#endif

	// prevent copy objects
	thread(const thread& src);
	thread& operator=(const thread& src);
public:
	//
	// constructor
	//
	thread();
	//
	// destructor
	//
	~thread();
	//
	// returns current state - thread safe
	//
	thread_state get_state() const; 
	//
	// start thread, return false if thread already started
	//
	bool start();
	//
	// assign new employer - thread safe, returns false if pointer is null 
	// or thread is not running
	// job_wait_timeout is parameter for new job waiting, if INFINITE, 
	// user has to call wakeup function to activate thread
	//
	bool assign_job(const job_task& job_task, size_t job_wait_timeout = INFINITE);
	//
	// cancel current job - thread safe, returns false if thread is not running
	//
	bool cancel_job(size_t job_wait_timeout = INFINITE);
	//
	// stop thread, returns false if thread already stopped
	//
	bool stop();
	//
	// signals thread to wakeup, if thread is in waiting state
	//
	void wakeup() const;
private:
	//
	// returns current state
	//
	void set_state(thread_state new_state);
	//
	// check current state, should be THREAD_RUNNING
	// then check does employer have a job
	//
	bool get_job();
	//
	// lock semaphore while calling employer function
	//
	void job_doing();
	//
	// wait for either new job (return true) or finish work (return false)
	//
	bool sleep();
private:
	job_task	_job_task;	// keep pointer to employer object
	mutex		_mtx;		// mutex for thread safe access to thread state
	event		_ev_start;	// event is activated inside thread and deactivated automatically
	event		_ev_wakeup;	// event is activated by caller and deactivated automatically
	event		_ev_end;	// event is activated inside thread just before exit and deactivated automatically
	mutex		_mtx_job;	// mutex for job tasj access - to share access to emplyer object
	thread_state	_state;		// keeps thread state
#if OS_TYPE == OS_WIN32
	HANDLE
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	pthread_t
#else
#error code is not implemented
#endif
	_handle; // keep thread resources
};

#endif // _terimber_thread_h_

Here the source file begins


// thread.cpp

#include "thread.h"


///////////////////////////////////////////////////////////
// constructor
mutex::mutex()
{ 
#if OS_TYPE == OS_WIN32
	InitializeCriticalSection(&_handle); 
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	pthread_mutex_init(&_handle, 0);
#else
#error code is not implemented
#endif
}

// destructor
mutex::~mutex()
{ 
#if OS_TYPE == OS_WIN32
	DeleteCriticalSection(&_handle);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	pthread_mutex_destroy(&_handle);
#else
#error code is not implemented
#endif
}

// NB!!! can be used in const member function
bool 
mutex::lock() const
{ 
#if OS_TYPE == OS_WIN32
	EnterCriticalSection(&_handle);
	return true;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	return !pthread_mutex_lock(&_handle);
#else
#error code is not implemented
#endif
}

// unlock mutex
// NB!!! can be used in const member function
void 
mutex::unlock() const
{ 
#if OS_TYPE == OS_WIN32
	LeaveCriticalSection(&_handle); 
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	pthread_mutex_unlock(&_handle);
#else
#error code is not implemented
#endif
}

// NB!!! can be used in const member function
bool 
mutex::trylock() const
{ 
#if OS_TYPE == OS_WIN32
	return TRUE == TryEnterCriticalSection(&_handle);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	return !pthread_mutex_trylock(&_handle);
#else
#error code is not implemented
#endif
}

///////////////////////////////////////////////////////////
// constructor create event
event::event(bool manual_reset, bool init_state)
{ 
#if OS_TYPE == OS_WIN32
	// create event
	_handle = CreateEvent(0, manual_reset, init_state, 0); 
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	// init condition thread
	pthread_cond_init(&_handle._cond, 0);
	// set initial setting
	_handle._manual_reset = manual_reset;
	_handle._signaled = init_state;
#else
#error code is not implemented
#endif
}

// destructor
event::~event()
{ 
#if OS_TYPE == OS_WIN32
	// close event handle
	CloseHandle(_handle); 
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	// destroy condition thread
	pthread_cond_destroy(&_handle._cond);
#else
#error code is not implemented
#endif
}
	
// signal event
// NB!!! can be used in const member function
void 
event::signal() const
{ 
#if OS_TYPE == OS_WIN32
	// set event to the signal state
	SetEvent(_handle); 
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	// lock mutex	
	mutex_keeper keeper(_handle._mtx);

	_handle._manual_reset ?  // manual reset
		pthread_cond_broadcast(&_handle._cond) : // Unblock everyone waiting
		pthread_cond_signal(&_handle._cond);	// Unblock just one waiter

	// set state if any
	_handle._signaled = true;
#else
#error code is not implemented
#endif
}

// nonsignal event
// NB!!! can be used in const member function
void 
event::nonsignal() const
{ 
#if OS_TYPE == OS_WIN32
	// set event to the non signal state
	ResetEvent(_handle); 
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	mutex_keeper keeper(_handle._mtx);
	_handle._signaled = false;
#else
#error code is not implemented
#endif
}

// wait for signal state
// NB!!! can be used in const member function
size_t 
event::wait(size_t timeout) const
{ 
#if OS_TYPE == OS_WIN32
	// wait signal
	return WaitForSingleObject(_handle, timeout); 
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX

	// If the event is not in the signaled state, we will have to wait.
	mutex_keeper keeper(_handle._mtx);

	int ret = 0;  // If we don't do anything it's because it's free already

	if (!_handle._signaled) // if already in a signal state just return
	{
		if (!timeout) // no time for waiting
			return WAIT_TIMEOUT;
		else
		{
			timespec timeout_;
			if (INFINITE != timeout)
			{
				// set timeout
				timeval now_;
				gettimeofday(&now_, 0);
				timeout_.tv_sec = now_.tv_sec + timeout / 1000;
				timeout_.tv_nsec = (((timeout % 1000) * 1000 + now_.tv_usec) % 1000000) * 1000;
			}

			// wait until condition thread returns control
			do
			{
				ret = (INFINITE == timeout ? pthread_cond_wait(&_handle._cond, _handle._mtx) : 
								pthread_cond_timedwait(&_handle._cond, _handle._mtx, &timeout_));
			} 
			while (!ret && !_handle._signaled);
		}
	}

	// adjust signaled member
	switch (ret)
	{
		case 0: // success
			if (!_handle._manual_reset)
				_handle._signaled = false;

			return WAIT_OBJECT_0;
			
		case ETIMEDOUT:
		default:
			return WAIT_TIMEOUT;
	}
#else
#error code is not implemented
#endif
}
	
///////////////////////////////////////////////////////////
// constructor create semaphore
semaphore::semaphore(size_t initial_count, size_t max_count)
{ 
	// check reasonable params
	assert(max_count != 0);
	assert(initial_count <= max_count);

#if OS_TYPE == OS_WIN32
	// create semaphore
	_handle = CreateSemaphore(0, initial_count, max_count, 0);
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	// create condition thread
	pthread_cond_init(&_handle._cond, 0);
	_handle._max_count = max_count;
	_handle._init_count = initial_count;
#else
#error code is not implemented
#endif
}

// destructor
semaphore::~semaphore()
{ 
#if OS_TYPE == OS_WIN32
	// close semaphore handle
	CloseHandle(_handle); 
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	// destroy condition thread
	pthread_cond_destroy(&_handle._cond);
#else
#error code is not implemented
#endif
}

// release semaphore
// NB!!! can be used in const member function
bool 
semaphore::release(size_t count) const
{ 
	if (!count) // nothing to do
		return false;

#if OS_TYPE == OS_WIN32
	// release semaphore
	return TRUE == ReleaseSemaphore(_handle, count, 0); 
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX

	// lock mutex
	mutex_keeper keeper(_handle._mtx);

	if (_handle._init_count >= _handle._max_count)
		return false;
	// increment semaphore counter
	if (_handle._init_count + count > _handle._max_count)
		_handle._init_count = _handle._max_count;
	else
		_handle._init_count += count;

	// set condition thread
	count == 1 ?
		pthread_cond_signal(&_handle._cond) :
		pthread_cond_broadcast(&_handle._cond);

	return true;
#else
#error code is not implemented
#endif
}

// wait for signal state
// NB!!! can be used in const member function
size_t 
semaphore::wait(size_t timeout) const
{ 
#if OS_TYPE == OS_WIN32
	// wait for semaphore event
	return WaitForSingleObject(_handle, timeout); 
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	
	mutex_keeper keeper(_handle._mtx);

	int ret = 0;; // If we don't do anything it's because it's available already

	if (!_handle._init_count) // if we can acquire resource
	{
		if (!timeout)
			return WAIT_TIMEOUT;
		else
		{
			timespec timeout_;
			if (INFINITE != timeout)
			{
				// set timeout
				timeval now_;
				gettimeofday(&now_, 0);
				timeout_.tv_sec = now_.tv_sec + timeout / 1000;
				timeout_.tv_nsec = (((timeout % 1000) * 1000 + now_.tv_usec) % 1000000) * 1000;
			}

			// wait until condition thread returns control
			do
			{
				ret = (INFINITE == timeout ? pthread_cond_wait(&_handle._cond, _handle._mtx) : 
								pthread_cond_timedwait(&_handle._cond, _handle._mtx, &timeout_));
			} 
			while (!ret && !_handle._init_count);
		}
	}

	// adjust count member
	switch (ret)
	{
		case 0: // success
			--_handle._init_count;
			return WAIT_OBJECT_0;
			
		case ETIMEDOUT:
		default:
			return WAIT_TIMEOUT;
	}
#else
#error code is not implemented
#endif
}

///////////////////////////////////////////////////////////
counter::counter(size_t init_count) : _count(init_count),
	_event(true, init_count == 0)
{}

size_t 
counter::increment() const
{ 
	mutex_keeper keeper(_mutex);
	_event.nonsignal();
	return ++_count;
}

size_t 
counter::decrement() const
{ 
	mutex_keeper keeper(_mutex);
	if (_count == 1) 
			_event.signal();
	_count != 0 ? --_count : 0;
	return _count;
}

size_t 
counter::wait(size_t timeout) const
{ 
	return _event.wait(timeout); 
}

///////////////////////////////////////////////////////////
gate::gate(size_t max_count) :
	_sema(max_count, max_count),
	_max_count(max_count)
{ 
}

// client calls when start using resource
bool 
gate::entry(size_t timeout) const
{ 
	return WAIT_OBJECT_0 == _sema.wait(timeout);
}

// client calls when finish using resource
void 
gate::leave() const
{ 
	_sema.release(); 
}

// server call to prevent entries of new clients 
// and wait untill all clients will leave resource
bool 
gate::lock(size_t timeout) const
{ 
	size_t count = 0;
	for (; count < _max_count && WAIT_OBJECT_0 == _sema.wait(timeout); ++count); 
	return count == _max_count;
}

// free resource for client entries
void 
gate::unlock() const
{ 
	_sema.release(_max_count); 
}


///////////////////////////////////////////////////////////
// static 
#if OS_TYPE == OS_WIN32
unsigned int __stdcall 
thread::start_thread(void* data)
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
void* 
thread::start_thread(void* data)
#else
#error code is not implemented
#endif
{
	// copy this pointer
	thread* _this = (thread*)data;
	// signal about start thread;
	_this->_ev_start.signal();
	// do working
	while (_this->sleep())
	{
		// set state
		_this->set_state(THREAD_RUNNING);
		// go do job
		// use try block
		try
		{
			while (_this->get_job() )			
				_this->job_doing();	
		}
		catch (...)
		{
			// well, well ...
			// employer has thrown exception - bad guy
			// but we are still alive and will continue to work
			assert(false);
		}
	}

	// need set close thread event to signal state
	// to notify parent thread about exiting from thread
	_this->_ev_end.signal();
	// leave thread
	return 0;
}

thread::thread() :
	_handle(0),
	_job_task(0, 0, INFINITE, 0),
	_state(THREAD_CLOSE)
{
}
	
thread::~thread() 
{
	if (_handle)
		stop();
}

thread_state 
thread::get_state() const
{
	// lock mutex
	mutex_keeper keeper(_mtx);
	return _state;
}

void 
thread::set_state(thread_state new_state)
{
	// lock mutex
	mutex_keeper keeper(_mtx);
	// set state
	if (_state != THREAD_STOPPING)
		_state = new_state;
}

bool 
thread::start()
{
	// analyze thread handle
#if OS_TYPE == OS_WIN32
	if (::GetCurrentThread() == _handle)
		return false;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	if (pthread_self() == _handle)
		return false;
#else
#error code is not implemented
#endif
	// check state
	// start only stop thread
	if (_state != THREAD_CLOSE)
		return false;

	// set state
	set_state(THREAD_STARTING);
	// nonsignal event
	_ev_start.nonsignal();
	_ev_end.nonsignal();
	// create thread
#if OS_TYPE == OS_WIN32
	// dummy thread ID
	unsigned int threadID = 0;
	if (!(_handle = (void*)_beginthreadex(0, 0, start_thread, this, 0, &threadID)))
		return false;

#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	if (pthread_create(&_handle, 0, start_thread, this))
		return false;
#else
#error code is not implemented
#endif

	// wait for start
	_ev_start.wait();
	// sleep, waiting for job
	set_state(THREAD_WAITING);
	return true;
}

bool 
thread::assign_job(const job_task& job_task, size_t job_wait_timeout)
{
	// analyze thread handle
#if OS_TYPE == OS_WIN32
	if (::GetCurrentThread() == _handle)
		return false;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	if (pthread_self() == _handle)
		return false;
#else
#error code is not implemented
#endif

	// check state
	switch (_state)
	{
		case THREAD_WAITING:
		case THREAD_SLEEPING:
		case THREAD_RUNNING:
			{
				mutex_keeper keeper(_mtx_job);
				_job_task = job_task;
			}
			break;
		default:
			return false;
	}

	// wakeup thread
	_ev_wakeup.signal();
	return true;
}

bool 
thread::cancel_job(size_t job_wait_timeout)
{	
	// analyze thread handle
#if OS_TYPE == OS_WIN32
	if (::GetCurrentThread() == _handle)
		return false;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	if (pthread_self() == _handle)
		return false;
#else
#error code is not implemented
#endif

	// wait for semaphore
	//semaphore_keeper keeper(_sema, job_wait_timeout);

	// check state
	//switch (get_state())
	switch (_state)
	{
		case THREAD_SLEEPING:
		case THREAD_RUNNING:
			{
				mutex_keeper keeper(_mtx_job);
				_job_task.clear();
			}
			break;
		default:
			return true;
	}


	// set state to nothing
	set_state(THREAD_WAITING);
	return _state == THREAD_WAITING;
}

bool 
thread::stop()
{
	// analyze thread handle
#if OS_TYPE == OS_WIN32
	if (::GetCurrentThread() == _handle)
		return false;
#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	if (pthread_self() == _handle)
		return false;
#else
#error code is not implemented
#endif

	// check state
	if (_state == THREAD_CLOSE)
		return false;

	// set state to stop
	set_state(THREAD_STOPPING);

	// wake up thread
	_ev_wakeup.signal();
	// wait end thread
	_ev_end.wait();
#if OS_TYPE == OS_WIN32
	// wait windows thread ending
	if (WAIT_TIMEOUT != WaitForSingleObject(_handle, 3000))
		// close handle couse we started with beginthreadex function
		CloseHandle(_handle);
	else // troubles while close thread gracefully
		TerminateThread(_handle, 0);

#elif OS_TYPE == OS_UNIX || OS_TYPE == OS_LINUX
	if (pthread_detach(_handle) != 0)
		pthread_cancel(_handle);
#else
#error code is not implemented
#endif

	_handle = 0;

	// set state to close
	{
		// lock mutex
		mutex_keeper keeper(_mtx);
		_state = THREAD_CLOSE;
	}

	return true;
}

void 
thread::wakeup() const
{
	_ev_wakeup.signal();
}

bool 
thread::get_job()
{
	if (_state != THREAD_RUNNING) // check thread state
		return false;

	mutex_keeper keeper(_mtx_job);
	job_task task = _job_task;

	if (!task._employer) // check pointer
		return false;

	keeper.unlock();
	return task._employer->v_has_job(task._ident, task._user_data); // check job
}

void 
thread::job_doing()
{
	mutex_keeper keeper(_mtx_job);
	job_task task = _job_task;
	
	if (!task._employer) // check pointer
		return;

	keeper.unlock();
	task._employer->v_do_job(task._ident, task._user_data); // check job
}

bool 
thread::sleep()
{
	// check stop 
	if (_state == THREAD_STOPPING)
		return false;
	// set state for waiting
	set_state(THREAD_SLEEPING);

	// lock job resources
	size_t wait_timeout = INFINITE;
	{
		mutex_keeper keeper(_mtx_job);
		wait_timeout = _job_task._timeout;
	}

	// wait wake up signal
	_ev_wakeup.wait(wait_timeout);
	// check state
	return _state == THREAD_STOPPING ? false : true;
}



© Copyright Terimber 2003-.