ref: 316f740630f6a9cff3ce0c32e66cc419ae4a5507
parent: ac6cf877d655add210284c3dff6a621ada1e8a7d
parent: d4f09d9048f5d7388358d3838fe02c5377cb79b9
author: sijchen <sijchen@cisco.com>
date: Thu Mar 3 04:47:20 EST 2016
Merge pull request #2390 from sijchen/th012 [Common] put CWelsThreadPool to singleTon for future usage
--- a/codec/common/inc/WelsTask.h
+++ b/codec/common/inc/WelsTask.h
@@ -59,7 +59,12 @@
virtual ~IWelsTask() { }
virtual int Execute() = 0;
- private:
+
+ IWelsTaskSink* GetSink() {
+ return m_pSink;
+ };
+
+ protected:
IWelsTaskSink* m_pSink;
};
--- a/codec/common/inc/WelsThreadPool.h
+++ b/codec/common/inc/WelsThreadPool.h
@@ -56,6 +56,7 @@
virtual WELS_THREAD_ERROR_CODE OnTaskCancelled (IWelsTask* pTask) = 0;
};
+
class CWelsThreadPool : public CWelsThread, public IWelsTaskThreadSink {
public:
enum {
@@ -62,9 +63,15 @@
DEFAULT_THREAD_NUM = 4,
};
- CWelsThreadPool (IWelsThreadPoolSink* pSink = NULL, int32_t iMaxThreadNum = DEFAULT_THREAD_NUM);
+ CWelsThreadPool (IWelsThreadPoolSink* pSink = NULL);
virtual ~CWelsThreadPool();
+ static WELS_THREAD_ERROR_CODE SetThreadNum (int32_t iMaxThreadNum);
+
+ static CWelsThreadPool& AddReference (IWelsThreadPoolSink* pSink = NULL);
+ void RemoveInstance();
+ static bool IsReferenced();
+
//IWelsTaskThreadSink
virtual WELS_THREAD_ERROR_CODE OnTaskStart (CWelsTaskThread* pThread, IWelsTask* pTask);
virtual WELS_THREAD_ERROR_CODE OnTaskStop (CWelsTaskThread* pThread, IWelsTask* pTask);
@@ -77,8 +84,9 @@
return m_iMaxThreadNum;
}
+
protected:
- WELS_THREAD_ERROR_CODE Init (int32_t iMaxThreadNum = DEFAULT_THREAD_NUM);
+ WELS_THREAD_ERROR_CODE Init (IWelsThreadPoolSink* pSink);
WELS_THREAD_ERROR_CODE Uninit();
WELS_THREAD_ERROR_CODE CreateIdleThread();
@@ -95,7 +103,13 @@
void ClearWaitedTasks();
private:
- int32_t m_iMaxThreadNum;
+ WELS_THREAD_ERROR_CODE StopAllRunning();
+ void UpdateSink (IWelsThreadPoolSink* pSink);
+
+ static int32_t m_iRefCount;
+ static CWelsLock m_cInitLock;
+ static int32_t m_iMaxThreadNum;
+
CWelsCircleQueue<IWelsTask>* m_cWaitedTasks;
CWelsCircleQueue<CWelsTaskThread>* m_cIdleThreads;
CWelsList<CWelsTaskThread>* m_cBusyThreads;
--- a/codec/common/src/WelsTaskThread.cpp
+++ b/codec/common/src/WelsTaskThread.cpp
@@ -42,6 +42,8 @@
namespace WelsCommon {
CWelsTaskThread::CWelsTaskThread (IWelsTaskThreadSink* pSink) : m_pSink (pSink) {
+ WelsThreadSetName ("CWelsTaskThread");
+
m_uiID = (uintptr_t) (this);
m_pTask = NULL;
}
--- a/codec/common/src/WelsThreadPool.cpp
+++ b/codec/common/src/WelsThreadPool.cpp
@@ -43,70 +43,119 @@
namespace WelsCommon {
+int32_t CWelsThreadPool::m_iRefCount = 0;
+CWelsLock CWelsThreadPool::m_cInitLock;
+int32_t CWelsThreadPool::m_iMaxThreadNum = DEFAULT_THREAD_NUM;
-CWelsThreadPool::CWelsThreadPool (IWelsThreadPoolSink* pSink, int32_t iMaxThreadNum) :
- m_pSink (pSink) {
- m_cWaitedTasks = new CWelsCircleQueue<IWelsTask>();
- m_cIdleThreads = new CWelsCircleQueue<CWelsTaskThread>();
- m_cBusyThreads = new CWelsList<CWelsTaskThread>();
- m_iMaxThreadNum = 0;
+CWelsThreadPool::CWelsThreadPool (IWelsThreadPoolSink* pSink) :
+ m_cWaitedTasks (NULL), m_cIdleThreads (NULL), m_cBusyThreads (NULL), m_pSink (pSink) {
+}
- if (NULL == m_cWaitedTasks || NULL == m_cIdleThreads || NULL == m_cBusyThreads) {
- WELS_DELETE_OP(m_cWaitedTasks);
- WELS_DELETE_OP(m_cIdleThreads);
- WELS_DELETE_OP(m_cBusyThreads);
- return;
+CWelsThreadPool::~CWelsThreadPool() {
+ //fprintf(stdout, "CWelsThreadPool::~CWelsThreadPool: delete %x, %x, %x\n", m_cWaitedTasks, m_cIdleThreads, m_cBusyThreads);
+ if (0 != m_iRefCount) {
+ m_iRefCount = 0;
+ Uninit();
}
+}
- if (WELS_THREAD_ERROR_OK != Init (iMaxThreadNum)) {
- Uninit();
+WELS_THREAD_ERROR_CODE CWelsThreadPool::SetThreadNum (int32_t iMaxThreadNum) {
+ CWelsAutoLock cLock (m_cInitLock);
- WELS_DELETE_OP(m_cWaitedTasks);
- WELS_DELETE_OP(m_cIdleThreads);
- WELS_DELETE_OP(m_cBusyThreads);
+ if (m_iRefCount != 0) {
+ return WELS_THREAD_ERROR_GENERAL;
}
+
+ if (iMaxThreadNum <= 0) {
+ iMaxThreadNum = 1;
+ }
+ m_iMaxThreadNum = iMaxThreadNum;
+ return WELS_THREAD_ERROR_OK;
}
+CWelsThreadPool& CWelsThreadPool::AddReference (IWelsThreadPoolSink* pSink) {
+ CWelsAutoLock cLock (m_cInitLock);
+ static CWelsThreadPool m_cThreadPoolSelf (pSink);
+ if (m_iRefCount == 0) {
+ //TODO: will remove this afterwards
+ if (WELS_THREAD_ERROR_OK != m_cThreadPoolSelf.Init(pSink)) {
+ m_cThreadPoolSelf.Uninit();
+ }
+ m_cThreadPoolSelf.UpdateSink (pSink);
+ }
-CWelsThreadPool::~CWelsThreadPool() {
- Uninit();
+ //fprintf(stdout, "m_iRefCount=%d, pSink=%x, iMaxThreadNum=%d\n", m_iRefCount, pSink, iMaxThreadNum);
- WELS_DELETE_OP(m_cWaitedTasks);
- WELS_DELETE_OP(m_cIdleThreads);
- WELS_DELETE_OP(m_cBusyThreads);
+ ++ m_iRefCount;
+ //fprintf(stdout, "m_iRefCount2=%d\n", m_iRefCount);
+ return m_cThreadPoolSelf;
}
+void CWelsThreadPool::RemoveInstance() {
+ CWelsAutoLock cLock (m_cInitLock);
+ //fprintf(stdout, "m_iRefCount=%d\n", m_iRefCount);
+ -- m_iRefCount;
+ if (0 == m_iRefCount) {
+ StopAllRunning();
+ m_pSink = NULL;
+ Uninit();
+ //fprintf(stdout, "m_iRefCount=%d, IdleThreadNum=%d, BusyThreadNum=%d, WaitedTask=%d\n", m_iRefCount, GetIdleThreadNum(), GetBusyThreadNum(), GetWaitedTaskNum());
+ }
+}
+
+bool CWelsThreadPool::IsReferenced() {
+ CWelsAutoLock cLock (m_cInitLock);
+ return (m_iRefCount>0);
+}
+
+void CWelsThreadPool::UpdateSink (IWelsThreadPoolSink* pSink) {
+ m_pSink = pSink;
+ //fprintf(stdout, "UpdateSink: m_pSink=%x\n", m_pSink);
+ //fprintf(stdout, "m_iRefCount=%d, IdleThreadNum=%d, BusyThreadNum=%d, WaitedTask=%d\n", m_iRefCount, GetIdleThreadNum(), GetBusyThreadNum(), GetWaitedTaskNum());
+}
+
+
WELS_THREAD_ERROR_CODE CWelsThreadPool::OnTaskStart (CWelsTaskThread* pThread, IWelsTask* pTask) {
AddThreadToBusyList (pThread);
-
+ //fprintf(stdout, "CWelsThreadPool::AddThreadToBusyList: Task %x at Thread %x\n", pTask, pThread);
return WELS_THREAD_ERROR_OK;
}
WELS_THREAD_ERROR_CODE CWelsThreadPool::OnTaskStop (CWelsTaskThread* pThread, IWelsTask* pTask) {
+ //fprintf(stdout, "CWelsThreadPool::OnTaskStop 0: Task %x at Thread %x Finished\n", pTask, pThread);
+
RemoveThreadFromBusyList (pThread);
AddThreadToIdleQueue (pThread);
+ //fprintf(stdout, "CWelsThreadPool::OnTaskStop 1: Task %x at Thread %x Finished, m_pSink=%x\n", pTask, pThread, m_pSink);
- if (m_pSink) {
- m_pSink->OnTaskExecuted (pTask);
+ if (pTask->GetSink()) {
+ pTask->GetSink()->OnTaskExecuted();
}
+ //if (m_pSink) {
+ // m_pSink->OnTaskExecuted (pTask);
+ //}
+ //fprintf(stdout, "CWelsThreadPool::OnTaskStop 2: Task %x at Thread %x Finished\n", pTask, pThread);
- //WELS_INFO_TRACE("ThreadPool: Task "<<(uint32_t)pTask<<" Finished, Thread "<<(uint32_t)pThread<<" put to idle list");
-
SignalThread();
+
+ //fprintf(stdout, "ThreadPool: Task %x at Thread %x Finished\n", pTask, pThread);
return WELS_THREAD_ERROR_OK;
}
-WELS_THREAD_ERROR_CODE CWelsThreadPool::Init (int32_t iMaxThreadNum) {
+WELS_THREAD_ERROR_CODE CWelsThreadPool::Init (IWelsThreadPoolSink* pSink) {
+ //fprintf(stdout, "Enter WelsThreadPool Init\n");
+
CWelsAutoLock cLock (m_cLockPool);
- //WELS_INFO_TRACE("Enter WelsThreadPool Init");
- int32_t i;
+ m_cWaitedTasks = new CWelsCircleQueue<IWelsTask>();
+ m_cIdleThreads = new CWelsCircleQueue<CWelsTaskThread>();
+ m_cBusyThreads = new CWelsList<CWelsTaskThread>();
+ if (NULL == m_cWaitedTasks || NULL == m_cIdleThreads || NULL == m_cBusyThreads) {
+ return WELS_THREAD_ERROR_GENERAL;
+ }
- if (iMaxThreadNum <= 0) iMaxThreadNum = 1;
- m_iMaxThreadNum = iMaxThreadNum;
-
- for (i = 0; i < m_iMaxThreadNum; i++) {
+ for (int32_t i = 0; i < m_iMaxThreadNum; i++) {
if (WELS_THREAD_ERROR_OK != CreateIdleThread()) {
return WELS_THREAD_ERROR_GENERAL;
}
@@ -119,9 +168,8 @@
return WELS_THREAD_ERROR_OK;
}
-WELS_THREAD_ERROR_CODE CWelsThreadPool::Uninit() {
+WELS_THREAD_ERROR_CODE CWelsThreadPool::StopAllRunning() {
WELS_THREAD_ERROR_CODE iReturn = WELS_THREAD_ERROR_OK;
- CWelsAutoLock cLock (m_cLockPool);
ClearWaitedTasks();
@@ -134,6 +182,18 @@
iReturn = WELS_THREAD_ERROR_GENERAL;
}
+ return iReturn;
+}
+
+WELS_THREAD_ERROR_CODE CWelsThreadPool::Uninit() {
+ WELS_THREAD_ERROR_CODE iReturn = WELS_THREAD_ERROR_OK;
+ CWelsAutoLock cLock (m_cLockPool);
+
+ iReturn = StopAllRunning();
+ if (WELS_THREAD_ERROR_OK != iReturn) {
+ return iReturn;
+ }
+
m_cLockIdleTasks.Lock();
while (m_cIdleThreads->size() > 0) {
DestroyThread (m_cIdleThreads->begin());
@@ -141,14 +201,17 @@
}
m_cLockIdleTasks.Unlock();
- m_iMaxThreadNum = 0;
Kill();
+ WELS_DELETE_OP(m_cWaitedTasks);
+ WELS_DELETE_OP(m_cIdleThreads);
+ WELS_DELETE_OP(m_cBusyThreads);
+
return iReturn;
}
void CWelsThreadPool::ExecuteTask() {
- //fprintf(stdout, "ThreadPool: schedule tasks\n");
+ //fprintf(stdout, "ThreadPool: scheduled tasks: ExecuteTask\n");
CWelsTaskThread* pThread = NULL;
IWelsTask* pTask = NULL;
while (GetWaitedTaskNum() > 0) {
@@ -165,20 +228,21 @@
WELS_THREAD_ERROR_CODE CWelsThreadPool::QueueTask (IWelsTask* pTask) {
CWelsAutoLock cLock (m_cLockPool);
- //fprintf(stdout, "ThreadPool: QueueTask = %x\n", pTask);
+ //fprintf(stdout, "CWelsThreadPool::QueueTask: %d, pTask=%x\n", m_iRefCount, pTask);
if (GetWaitedTaskNum() == 0) {
CWelsTaskThread* pThread = GetIdleThread();
if (pThread != NULL) {
- //fprintf(stdout, "ThreadPool: ExecuteTask = %x\n", pTask);
+ //fprintf(stdout, "ThreadPool: ExecuteTask = %x at thread %x\n", pTask, pThread);
pThread->SetTask (pTask);
return WELS_THREAD_ERROR_OK;
}
}
-
- AddTaskToWaitedList (pTask);
//fprintf(stdout, "ThreadPool: AddTaskToWaitedList: %x\n", pTask);
+ AddTaskToWaitedList (pTask);
+
+ //fprintf(stdout, "ThreadPool: SignalThread: %x\n", pTask);
SignalThread();
return WELS_THREAD_ERROR_OK;
}
@@ -193,6 +257,7 @@
if (WELS_THREAD_ERROR_OK != pThread->Start()) {
return WELS_THREAD_ERROR_GENERAL;
}
+ //fprintf(stdout, "ThreadPool: AddThreadToIdleQueue: %x\n", pThread);
AddThreadToIdleQueue (pThread);
return WELS_THREAD_ERROR_OK;
@@ -219,10 +284,10 @@
WELS_THREAD_ERROR_CODE CWelsThreadPool::RemoveThreadFromBusyList (CWelsTaskThread* pThread) {
CWelsAutoLock cLock (m_cLockBusyTasks);
- if (m_cBusyThreads->erase(pThread)) {
- return WELS_THREAD_ERROR_OK;
+ if (m_cBusyThreads->erase (pThread)) {
+ return WELS_THREAD_ERROR_OK;
} else {
- return WELS_THREAD_ERROR_GENERAL;
+ return WELS_THREAD_ERROR_GENERAL;
}
}
@@ -230,6 +295,7 @@
CWelsAutoLock cLock (m_cLockWaitedTasks);
m_cWaitedTasks->push_back (pTask);
+ //fprintf(stdout, "CWelsThreadPool::AddTaskToWaitedList=%d, pTask=%x\n", m_cWaitedTasks->size(), pTask);
return;
}
@@ -236,6 +302,7 @@
CWelsTaskThread* CWelsThreadPool::GetIdleThread() {
CWelsAutoLock cLock (m_cLockIdleTasks);
+ //fprintf(stdout, "CWelsThreadPool::GetIdleThread=%d\n", m_cIdleThreads->size());
if (m_cIdleThreads->size() == 0) {
return NULL;
}
@@ -254,6 +321,7 @@
}
int32_t CWelsThreadPool::GetWaitedTaskNum() {
+ //fprintf(stdout, "CWelsThreadPool::m_cWaitedTasks=%d\n", m_cWaitedTasks->size());
return m_cWaitedTasks->size();
}
--- a/codec/encoder/core/inc/wels_task_management.h
+++ b/codec/encoder/core/inc/wels_task_management.h
@@ -60,6 +60,8 @@
= 0;
static IWelsTaskManage* CreateTaskManage (sWelsEncCtx* pCtx, const int32_t iSpatialLayer, const bool bNeedLock);
+
+ virtual int32_t GetThreadPoolThreadNum() = 0;
};
@@ -83,13 +85,11 @@
virtual WelsErrorType OnTaskCancelled (WelsCommon::IWelsTask* pTask);
//IWelsTaskSink
- virtual int OnTaskExecuted() {
- return 0;
- };
- virtual int OnTaskCancelled() {
- return 0;
- };
+ virtual WelsErrorType OnTaskExecuted();
+ virtual WelsErrorType OnTaskCancelled();
+ int32_t GetThreadPoolThreadNum();
+
protected:
virtual WelsErrorType CreateTasks (sWelsEncCtx* pEncCtx, const int32_t kiTaskCount);
@@ -131,6 +131,8 @@
WelsErrorType Init (sWelsEncCtx* pEncCtx);
virtual WelsErrorType ExecuteTasks(const CWelsBaseTask::ETaskType iTaskType = CWelsBaseTask::WELS_ENC_TASK_ENCODING);
+
+ int32_t GetThreadPoolThreadNum() {return 1;};
};
} //namespace
--- a/codec/encoder/core/src/slice_multi_threading.cpp
+++ b/codec/encoder/core/src/slice_multi_threading.cpp
@@ -333,9 +333,6 @@
MT_TRACE_LOG (pLogCtx, WELS_LOG_INFO, "[MT] Open pReadySliceCodingEvent%d = 0x%p named(%s) ret%d err%d", iIdx,
(void*)pSmt->pReadySliceCodingEvent[iIdx], name, err, errno);
- pSmt->pThreadBsBuffer[iIdx] = (uint8_t*)pMa->WelsMalloc (iCountBsLen, "pSmt->pThreadBsBuffer");
- WELS_VERIFY_RETURN_PROC_IF (1, (NULL == pSmt->pThreadBsBuffer[iIdx]), FreeMemorySvc (ppCtx))
-
pSmt->pSliceInThread[iIdx] = (SSlice*)pMa->WelsMalloc (sizeof (SSlice)*iMaxSliceNumInThread, "pSmt->pSliceInThread");
WELS_VERIFY_RETURN_PROC_IF (1, (NULL == pSmt->pSliceInThread[iIdx]), FreeMemorySvc (ppCtx))
@@ -350,6 +347,7 @@
pSmt->piSliceIndexInThread[iIdx] = NULL;
}
+
WelsSnprintf (name, SEM_NAME_MAX, "scm%s", pSmt->eventNamespace);
err = WelsEventOpen (&pSmt->pSliceCodedMasterEvent, name);
MT_TRACE_LOG (pLogCtx, WELS_LOG_INFO, "[MT] Open pSliceCodedMasterEvent named(%s) ret%d err%d", name, err, errno);
@@ -359,6 +357,17 @@
(*ppCtx)->pTaskManage = IWelsTaskManage::CreateTaskManage (*ppCtx, iNumSpatialLayers, bDynamicSlice);
WELS_VERIFY_RETURN_PROC_IF (iReturn, (NULL == (*ppCtx)->pTaskManage), FreeMemorySvc (ppCtx))
+
+ int32_t iThreadBufferNum = WELS_MIN((*ppCtx)->pTaskManage->GetThreadPoolThreadNum(), MAX_THREADS_NUM);
+ for (iIdx = 0;iIdx < iThreadBufferNum; iIdx++) {
+ pSmt->pThreadBsBuffer[iIdx] = (uint8_t*)pMa->WelsMalloc (iCountBsLen, "pSmt->pThreadBsBuffer");
+ WELS_VERIFY_RETURN_PROC_IF (1, (NULL == pSmt->pThreadBsBuffer[iIdx]), FreeMemorySvc (ppCtx))
+ }
+ if (iThreadBufferNum < MAX_THREADS_NUM) {
+ for (iIdx = iThreadBufferNum; iIdx < MAX_THREADS_NUM; iIdx++) {
+ pSmt->pThreadBsBuffer[iIdx] = NULL;
+ }
+ }
memset (&pSmt->bThreadBsBufferUsage, 0, MAX_THREADS_NUM * sizeof (bool));
iReturn = WelsMutexInit (&pSmt->mutexThreadBsBufferUsage);
--- a/codec/encoder/core/src/wels_task_encoder.cpp
+++ b/codec/encoder/core/src/wels_task_encoder.cpp
@@ -65,7 +65,7 @@
}
WelsErrorType CWelsSliceEncodingTask::Execute() {
- WelsThreadSetName ("OpenH264Enc_CWelsSliceEncodingTask_Execute");
+ //fprintf(stdout, "OpenH264Enc_CWelsSliceEncodingTask_Execute, %x, sink=%x\n", this, m_pSink);
m_eTaskResult = InitTask();
WELS_VERIFY_RETURN_IFNEQ (m_eTaskResult, ENC_RETURN_SUCCESS)
@@ -73,6 +73,8 @@
m_eTaskResult = ExecuteTask();
FinishTask();
+
+ //fprintf(stdout, "OpenH264Enc_CWelsSliceEncodingTask_Execute Ends\n");
return m_eTaskResult;
}
--- a/codec/encoder/core/src/wels_task_management.cpp
+++ b/codec/encoder/core/src/wels_task_management.cpp
@@ -88,19 +88,26 @@
}
CWelsTaskManageBase::~CWelsTaskManageBase() {
- //printf ("~CWelsTaskManageBase\n");
+ //fprintf(stdout, "~CWelsTaskManageBase\n");
Uninit();
}
WelsErrorType CWelsTaskManageBase::Init (sWelsEncCtx* pEncCtx) {
m_pEncCtx = pEncCtx;
-
m_iThreadNum = m_pEncCtx->pSvcParam->iMultipleThreadIdc;
- m_pThreadPool = WELS_NEW_OP (WelsCommon::CWelsThreadPool (this, m_iThreadNum),
- WelsCommon::CWelsThreadPool);
- WELS_VERIFY_RETURN_IF (ENC_RETURN_MEMALLOCERR, NULL == m_pThreadPool)
int32_t iReturn = ENC_RETURN_SUCCESS;
+ //fprintf(stdout, "m_pThreadPool = &(CWelsThreadPool::GetInstance, this=%x\n", this);
+ iReturn = CWelsThreadPool::SetThreadNum (m_iThreadNum);
+ m_pThreadPool = & (CWelsThreadPool::AddReference (this));
+ if ( (iReturn != ENC_RETURN_SUCCESS) && pEncCtx ) {
+ WelsLog (& (pEncCtx->sLogCtx), WELS_LOG_WARNING, "Set Thread Num to %d did not succeed, current thread num in use: %d",
+ m_iThreadNum, m_pThreadPool->GetThreadNum());
+ }
+ WELS_VERIFY_RETURN_IF (ENC_RETURN_MEMALLOCERR, NULL == m_pThreadPool)
+ //fprintf(stdout, "m_pThreadPool = &(CWelsThreadPool::GetInstance3\n");
+
+ iReturn = ENC_RETURN_SUCCESS;
for (int32_t iDid = 0; iDid < MAX_DEPENDENCY_LAYER; iDid++) {
m_pcAllTaskList[CWelsBaseTask::WELS_ENC_TASK_ENCODING][iDid] = m_cEncodingTaskList[iDid];
m_pcAllTaskList[CWelsBaseTask::WELS_ENC_TASK_UPDATEMBMAP][iDid] = m_cPreEncodingTaskList[iDid];
@@ -107,14 +114,18 @@
iReturn |= CreateTasks (pEncCtx, iDid);
}
- //printf ("CWelsTaskManageBase Init m_iThreadNum %d m_iCurrentTaskNum %d pEncCtx->iMaxSliceCount %d\n", m_iThreadNum, m_iCurrentTaskNum, pEncCtx->iMaxSliceCount);
+ //fprintf(stdout, "CWelsTaskManageBase Init m_iThreadNum %d m_iCurrentTaskNum %d pEncCtx->iMaxSliceCount %d\n", m_iThreadNum, m_iCurrentTaskNum, pEncCtx->iMaxSliceCount);
return iReturn;
}
void CWelsTaskManageBase::Uninit() {
DestroyTasks();
- WELS_DELETE_OP (m_pThreadPool);
+ //fprintf(stdout, "m_pThreadPool = m_pThreadPool->RemoveInstance\n");
+ m_pThreadPool->RemoveInstance();
+ //WELS_DELETE_OP (m_pThreadPool);
+ //fprintf(stdout, "m_pThreadPool = m_pThreadPool->RemoveInstance2\n");
+
for (int32_t iDid = 0; iDid < MAX_DEPENDENCY_LAYER; iDid++) {
WELS_DELETE_OP(m_cEncodingTaskList[iDid]);
WELS_DELETE_OP(m_cPreEncodingTaskList[iDid]);
@@ -140,25 +151,26 @@
}
for (int idx = 0; idx < kiTaskCount; idx++) {
- if (uiSliceMode==SM_SIZELIMITED_SLICE) {
- pTask = WELS_NEW_OP (CWelsConstrainedSizeSlicingEncodingTask (this, pEncCtx, idx), CWelsConstrainedSizeSlicingEncodingTask);
+ if (uiSliceMode == SM_SIZELIMITED_SLICE) {
+ pTask = WELS_NEW_OP (CWelsConstrainedSizeSlicingEncodingTask (this, pEncCtx, idx),
+ CWelsConstrainedSizeSlicingEncodingTask);
} else {
- if (pEncCtx->pSvcParam->bUseLoadBalancing) {
- pTask = WELS_NEW_OP (CWelsLoadBalancingSlicingEncodingTask (this, pEncCtx, idx), CWelsLoadBalancingSlicingEncodingTask);
- } else {
- pTask = WELS_NEW_OP (CWelsSliceEncodingTask (this, pEncCtx, idx), CWelsSliceEncodingTask);
+ if (pEncCtx->pSvcParam->bUseLoadBalancing) {
+ pTask = WELS_NEW_OP (CWelsLoadBalancingSlicingEncodingTask (this, pEncCtx, idx), CWelsLoadBalancingSlicingEncodingTask);
+ } else {
+ pTask = WELS_NEW_OP (CWelsSliceEncodingTask (this, pEncCtx, idx), CWelsSliceEncodingTask);
+ }
}
- }
WELS_VERIFY_RETURN_IF (ENC_RETURN_MEMALLOCERR, NULL == pTask)
m_cEncodingTaskList[kiCurDid]->push_back (pTask);
}
- //printf ("CWelsTaskManageBase CreateTasks m_iThreadNum %d kiTaskCount=%d\n", m_iThreadNum, kiTaskCount);
+ //fprintf(stdout, "CWelsTaskManageBase CreateTasks m_iThreadNum %d kiTaskCount=%d\n", m_iThreadNum, kiTaskCount);
return ENC_RETURN_SUCCESS;
}
void CWelsTaskManageBase::DestroyTaskList (TASKLIST_TYPE* pTargetTaskList) {
- //printf ("CWelsTaskManageBase: pTargetTaskList size=%d m_iTotalTaskNum=%d\n", static_cast<int32_t> (pTargetTaskList->size()), m_iTotalTaskNum);
+ //fprintf(stdout, "CWelsTaskManageBase: pTargetTaskList size=%d m_iTotalTaskNum=%d\n", static_cast<int32_t> (pTargetTaskList->size()), m_iTotalTaskNum);
while (NULL != pTargetTaskList->begin()) {
CWelsBaseTask* pTask = pTargetTaskList->begin();
WELS_DELETE_OP (pTask);
@@ -176,7 +188,7 @@
m_pcAllTaskList[CWelsBaseTask::WELS_ENC_TASK_ENCODING][iDid] = NULL;
}
}
- //printf ("[MT] CWelsTaskManageBase() DestroyTasks, cleaned %d tasks\n", m_iTotalTaskNum);
+ //fprintf(stdout, "[MT] CWelsTaskManageBase() DestroyTasks, cleaned %d tasks\n", m_iTotalTaskNum);
}
void CWelsTaskManageBase::OnTaskMinusOne() {
@@ -184,9 +196,9 @@
m_iWaitTaskNum --;
if (m_iWaitTaskNum <= 0) {
WelsEventSignal (&m_hTaskEvent);
- //printf ("OnTaskMinusOne WelsEventSignal m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
+ //fprintf(stdout, "OnTaskMinusOne WelsEventSignal m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
}
- //printf ("OnTaskMinusOne m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
+ //fprintf(stdout, "OnTaskMinusOne m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
}
WelsErrorType CWelsTaskManageBase::OnTaskCancelled (WelsCommon::IWelsTask* pTask) {
@@ -199,10 +211,20 @@
return ENC_RETURN_SUCCESS;
}
+WelsErrorType CWelsTaskManageBase::OnTaskCancelled() {
+ OnTaskMinusOne();
+ return ENC_RETURN_SUCCESS;
+}
+
+WelsErrorType CWelsTaskManageBase::OnTaskExecuted() {
+ OnTaskMinusOne();
+ return ENC_RETURN_SUCCESS;
+}
+
WelsErrorType CWelsTaskManageBase::ExecuteTaskList (TASKLIST_TYPE** pTaskList) {
m_iWaitTaskNum = m_iTaskNum[m_iCurDid];
TASKLIST_TYPE* pTargetTaskList = (pTaskList[m_iCurDid]);
- //printf ("ExecuteTaskList m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
+ //fprintf(stdout, "ExecuteTaskList m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
if (0 == m_iWaitTaskNum) {
return ENC_RETURN_SUCCESS;
}
@@ -227,6 +249,10 @@
WelsErrorType CWelsTaskManageBase::ExecuteTasks (const CWelsBaseTask::ETaskType iTaskType) {
return ExecuteTaskList (m_pcAllTaskList[iTaskType]);
+}
+
+int32_t CWelsTaskManageBase::GetThreadPoolThreadNum() {
+ return m_pThreadPool->GetThreadNum();
}
// CWelsTaskManageOne is for test
--- a/test/common/WelsThreadPoolTest.cpp
+++ b/test/common/WelsThreadPoolTest.cpp
@@ -10,7 +10,7 @@
#include "WelsTask.h"
#include "WelsThreadPoolTest.h"
-#define TEST_TASK_NUM 20
+#define TEST_TASK_NUM 30
class CSimpleTask : public IWelsTask {
public:
@@ -36,11 +36,10 @@
uint32_t CSimpleTask::id = 0;
-
-TEST (CThreadPoolTest, CThreadPoolTest) {
+void* OneCallingFunc() {
CThreadPoolTest cThreadPoolTest;
CSimpleTask* aTasks[TEST_TASK_NUM];
- CWelsThreadPool cThreadPool (&cThreadPoolTest);
+ CWelsThreadPool* pThreadPool = & (CWelsThreadPool::AddReference (&cThreadPoolTest));
int32_t i;
for (i = 0; i < TEST_TASK_NUM; i++) {
@@ -48,7 +47,7 @@
}
for (i = 0; i < TEST_TASK_NUM; i++) {
- cThreadPool.QueueTask (aTasks[i]);
+ pThreadPool->QueueTask (aTasks[i]);
}
while (cThreadPoolTest.GetTaskCount() < TEST_TASK_NUM) {
@@ -58,5 +57,66 @@
for (i = 0; i < TEST_TASK_NUM; i++) {
delete aTasks[i];
}
+ pThreadPool->RemoveInstance();
+
+ return 0;
+}
+
+
+TEST (CThreadPoolTest, CThreadPoolTest) {
+ OneCallingFunc();
+
+ int iRet = CWelsThreadPool::SetThreadNum (8);
+ EXPECT_EQ (0, iRet);
+ EXPECT_FALSE (CWelsThreadPool::IsReferenced());
+
+ CWelsThreadPool* pThreadPool = & (CWelsThreadPool::AddReference (NULL));
+ EXPECT_TRUE(pThreadPool->IsReferenced());
+ EXPECT_EQ (8, pThreadPool->GetThreadNum());
+
+ iRet = CWelsThreadPool::SetThreadNum (4);
+ EXPECT_TRUE (0 != iRet);
+ EXPECT_EQ (8, pThreadPool->GetThreadNum());
+
+ pThreadPool->RemoveInstance();
+
+ iRet = CWelsThreadPool::SetThreadNum (4);
+ EXPECT_EQ (0, iRet);
+ pThreadPool = & (CWelsThreadPool::AddReference (NULL));
+ EXPECT_TRUE (pThreadPool->IsReferenced());
+ EXPECT_EQ (4, pThreadPool->GetThreadNum());
+ pThreadPool->RemoveInstance();
+
+ EXPECT_FALSE (CWelsThreadPool::IsReferenced());
+}
+
+
+TEST (CThreadPoolTest, CThreadPoolTestMulti) {
+ int iCallingNum = 10;
+ WELS_THREAD_HANDLE mThreadID[30];
+ int i = 0;
+
+ for (i = 0; i < iCallingNum; i++) {
+ WelsThreadCreate (& (mThreadID[i]), (LPWELS_THREAD_ROUTINE)OneCallingFunc, NULL, 0);
+ WelsSleep (1);
+ }
+
+ for (i = iCallingNum; i < iCallingNum * 2; i++) {
+ WelsThreadCreate (& (mThreadID[i]), (LPWELS_THREAD_ROUTINE)OneCallingFunc, NULL, 0);
+ WelsSleep (1);
+ WelsThreadJoin (mThreadID[i]);
+ }
+
+ for (i = 0; i < iCallingNum; i++) {
+ WelsThreadJoin (mThreadID[i]);
+ }
+
+ for (i = iCallingNum * 2; i < iCallingNum * 3; i++) {
+ WelsThreadCreate (& (mThreadID[i]), (LPWELS_THREAD_ROUTINE)OneCallingFunc, NULL, 0);
+ WelsSleep (1);
+ WelsThreadJoin (mThreadID[i]);
+ }
+
+ EXPECT_FALSE (CWelsThreadPool::IsReferenced());
}
--- a/test/common/WelsThreadPoolTest.h
+++ b/test/common/WelsThreadPoolTest.h
@@ -28,7 +28,7 @@
return cmResultSuccess;
}
- virtual int OnTaskExecuted() {
+ virtual int32_t OnTaskExecuted() {
WelsCommon::CWelsAutoLock cAutoLock (m_cTaskCountLock);
m_iTaskCount ++;
//fprintf(stdout, "Task execute over count is %d\n", m_iTaskCount);
@@ -35,7 +35,7 @@
return cmResultSuccess;
}
- virtual int OnTaskCancelled() {
+ virtual int32_t OnTaskCancelled() {
WelsCommon::CWelsAutoLock cAutoLock (m_cTaskCountLock);
m_iTaskCount ++;
//fprintf(stdout, "Task execute cancelled count is %d\n", m_iTaskCount);