#pragma once
#include <thread>
#include <queue>
#include <mutex>
#include <atomic>
#include <condition_variable>
using namespace std;
#define MSG_EXIT_THREAD 1
#define MSG_POST_USER_DATA 2
#define MSG_TIMER 3
struct UserData
{
int id;
string data1;
string data2;
};
struct ThreadMsg
{
ThreadMsg(int i, shared_ptr<void> m) { id = i; msg = m; }
int id;
shared_ptr<void> msg;
};
class WorkerThread
{
public:
/// Constructor
WorkerThread(const char* threadName);
/// Destructor
~WorkerThread();
/// Called once to create the worker thread
/// @return True if thread is created. False otherwise.
bool CreateThread();
/// Called once a program exit to exit the worker thread
void ExitThread();
/// Get the ID of this thread instance
/// @return The worker thread ID
thread::id GetThreadId();
/// Get the ID of the currently executing thread
/// @return The current thread ID
static thread::id GetCurrentThreadId();
/// Add a message to the thread queue
/// @param[in] data - thread specific message information
void PostMsg(shared_ptr<UserData> msg);
void(*pfnExecuteCmd)(shared_ptr<UserData> data);
private:
WorkerThread(const WorkerThread&) = delete;
WorkerThread& operator=(const WorkerThread&) = delete;
/// Entry point for the worker thread
void Process();
/// Entry point for timer thread
void TimerThread();
unique_ptr<thread> m_thread;
queue<shared_ptr<ThreadMsg>> m_queue;
mutex m_mutex;
condition_variable m_cv;
atomic<bool> m_timerExit;
const char* THREAD_NAME;
};
#include "pch.h"
#include "WorkerThread.h"
//----------------------------------------------------------------------------
// WorkerThread
//----------------------------------------------------------------------------
WorkerThread::WorkerThread(const char* threadName) : m_thread(nullptr), m_timerExit(false), THREAD_NAME(threadName)
{
pfnExecuteCmd = nullptr;
}
//----------------------------------------------------------------------------
// ~WorkerThread
//----------------------------------------------------------------------------
WorkerThread::~WorkerThread()
{
ExitThread();
}
//----------------------------------------------------------------------------
// CreateThread
//----------------------------------------------------------------------------
bool WorkerThread::CreateThread()
{
if (!m_thread)
m_thread = unique_ptr<thread>(new thread(&WorkerThread::Process, this));
return true;
}
//----------------------------------------------------------------------------
// GetThreadId
//----------------------------------------------------------------------------
thread::id WorkerThread::GetThreadId()
{
assert(m_thread != nullptr);
return m_thread->get_id();
}
//----------------------------------------------------------------------------
// GetCurrentThreadId
//----------------------------------------------------------------------------
thread::id WorkerThread::GetCurrentThreadId()
{
return this_thread::get_id();
}
//----------------------------------------------------------------------------
// ExitThread
//----------------------------------------------------------------------------
void WorkerThread::ExitThread()
{
if (!m_thread)
return;
// Create a new ThreadMsg
shared_ptr<UserData> userData(new UserData());
shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_EXIT_THREAD, userData));
// Put exit thread message into the queue
{
lock_guard<mutex> lock(m_mutex);
m_queue.push(threadMsg);
m_cv.notify_one();
}
m_thread->join();
m_thread = nullptr;
}
//----------------------------------------------------------------------------
// PostMsg
//----------------------------------------------------------------------------
void WorkerThread::PostMsg(shared_ptr<UserData> data)
{
assert(m_thread);
// Create a new ThreadMsg
shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_POST_USER_DATA, data));
// Add user data msg to queue and notify worker thread
unique_lock<mutex> lk(m_mutex);
m_queue.push(threadMsg);
m_cv.notify_one();
}
//----------------------------------------------------------------------------
// TimerThread
//----------------------------------------------------------------------------
void WorkerThread::TimerThread()
{
while (!m_timerExit)
{
// Sleep for 250mS then put a MSG_TIMER into the message queue
this_thread::sleep_for(1s);
shared_ptr<UserData> userDataTimer(new UserData());
userDataTimer->id = 100;
userDataTimer->data1 = "MSG_TIMER";
shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_TIMER, userDataTimer));
// Add timer msg to queue and notify worker thread
unique_lock<mutex> lk(m_mutex);
m_queue.push(threadMsg);
m_cv.notify_one();
}
}
//----------------------------------------------------------------------------
// Process
//----------------------------------------------------------------------------
void WorkerThread::Process()
{
m_timerExit = false;
thread timerThread(&WorkerThread::TimerThread, this);
while (1)
{
shared_ptr<ThreadMsg> msg;
{
// Wait for a message to be added to the queue
unique_lock<mutex> lk(m_mutex);
while (m_queue.empty())
m_cv.wait(lk);
if (m_queue.empty())
continue;
msg = m_queue.front();
m_queue.pop();
}
assert(msg->msg != NULL);
//if (msg->msg == NULL) {
// //DebugBreak();
// AfxDebugBreak();
//}
auto userData = static_pointer_cast<UserData>(msg->msg);
switch (msg->id)
{
case MSG_POST_USER_DATA:
{
//cout << userData->msg.c_str() << " " << userData->year << " on " << THREAD_NAME << endl;
if (pfnExecuteCmd != NULL) {
pfnExecuteCmd(userData);
}
break;
}
case MSG_TIMER:
cout << "Timer expired on " << THREAD_NAME << endl;
if (pfnExecuteCmd != NULL) {
pfnExecuteCmd(userData);
}
break;
case MSG_EXIT_THREAD:
{
m_timerExit = true;
timerThread.join();
return;
}
default:
assert(0);
}
}
}