shithub: openh264

Download patch

ref: fa268249779551a81948f468f2deb9cedf4fdfe2
parent: 73cccfda3e32f08365d0a00f575ab14bec5bc9ad
parent: c51a4ff09427aff7c4963bc1beb0f5a8cc06c7c2
author: ruil2 <ruil2@cisco.com>
date: Thu Apr 27 05:30:24 EDT 2017

Merge pull request #2729 from sijchen/fixmt1

[Encoder] fix potential deadlock prob in multi-thread

--- a/codec/common/inc/WelsThread.h
+++ b/codec/common/inc/WelsThread.h
@@ -82,7 +82,7 @@
   }
 
   void SignalThread() {
-    WelsEventSignal (&m_hEvent);
+    WelsEventSignal (&m_hEvent, &m_hMutex, &m_iConVar);
   }
 
  private:
@@ -91,6 +91,7 @@
   CWelsLock           m_cLockStatus;
   bool                m_bRunning;
   bool                m_bEndFlag;
+  int                 m_iConVar;
 
   DISALLOW_COPY_AND_ASSIGN (CWelsThread);
 };
--- a/codec/common/inc/WelsThreadLib.h
+++ b/codec/common/inc/WelsThreadLib.h
@@ -112,8 +112,8 @@
 WELS_THREAD_ERROR_CODE    WelsEventOpen (WELS_EVENT* p_event, const char* event_name = NULL);
 WELS_THREAD_ERROR_CODE    WelsEventClose (WELS_EVENT* event, const char* event_name = NULL);
 
-WELS_THREAD_ERROR_CODE    WelsEventSignal (WELS_EVENT* event);
-WELS_THREAD_ERROR_CODE    WelsEventWait (WELS_EVENT* event,WELS_MUTEX *pMutex = NULL);
+WELS_THREAD_ERROR_CODE    WelsEventSignal (WELS_EVENT* event,WELS_MUTEX *pMutex, int* iCondition);
+WELS_THREAD_ERROR_CODE    WelsEventWait (WELS_EVENT* event,WELS_MUTEX *pMutex, int& iCondition);
 WELS_THREAD_ERROR_CODE    WelsEventWaitWithTimeOut (WELS_EVENT* event, uint32_t dwMilliseconds,WELS_MUTEX *pMutex = NULL);
 WELS_THREAD_ERROR_CODE    WelsMultipleEventsWaitSingleBlocking (uint32_t nCount, WELS_EVENT* event_list,
     WELS_EVENT* master_event = NULL,WELS_MUTEX *pMutex = NULL);
--- a/codec/common/src/WelsThread.cpp
+++ b/codec/common/src/WelsThread.cpp
@@ -49,7 +49,7 @@
 
   WelsEventOpen (&m_hEvent);
   WelsMutexInit(&m_hMutex);
-
+  m_iConVar = 1;
 }
 
 CWelsThread::~CWelsThread() {
@@ -60,13 +60,14 @@
 
 void CWelsThread::Thread() {
   while (true) {
-    WelsEventWait (&m_hEvent,&m_hMutex);
+    WelsEventWait (&m_hEvent,&m_hMutex,m_iConVar);
 
     if (GetEndFlag()) {
       break;
     }
 
-    ExecuteTask();
+    m_iConVar = 1;
+    ExecuteTask();//in ExecuteTask there will be OnTaskStop which opens the potential new Signaling of next run, so the setting of m_iConVar = 1 should be before ExecuteTask()
   }
 
   SetRunning (false);
@@ -105,7 +106,7 @@
 
   SetEndFlag (true);
 
-  WelsEventSignal (&m_hEvent);
+  WelsEventSignal (&m_hEvent,&m_hMutex,&m_iConVar);
   WelsThreadJoin (m_hThread);
   return;
 }
--- a/codec/common/src/WelsThreadLib.cpp
+++ b/codec/common/src/WelsThreadLib.cpp
@@ -140,7 +140,7 @@
   return WELS_THREAD_ERROR_OK;
 }
 
-WELS_THREAD_ERROR_CODE    WelsEventSignal (WELS_EVENT* event) {
+WELS_THREAD_ERROR_CODE    WelsEventSignal (WELS_EVENT* event, WELS_MUTEX *pMutex, int* iCondition) {
   if (SetEvent (*event)) {
     return WELS_THREAD_ERROR_OK;
   }
@@ -147,7 +147,7 @@
   return WELS_THREAD_ERROR_GENERAL;
 }
 
-WELS_THREAD_ERROR_CODE    WelsEventWait (WELS_EVENT* event, WELS_MUTEX* pMutex) {
+WELS_THREAD_ERROR_CODE    WelsEventWait (WELS_EVENT* event, WELS_MUTEX* pMutex, int& iCondition) {
   return WaitForSingleObject (*event, INFINITE);
 }
 
@@ -327,11 +327,21 @@
   usleep (dwMilliSecond * 1000);
 }
 
-WELS_THREAD_ERROR_CODE   WelsEventSignal (WELS_EVENT* event) {
+WELS_THREAD_ERROR_CODE   WelsEventSignal (WELS_EVENT* event, WELS_MUTEX *pMutex, int* iCondition) {
   WELS_THREAD_ERROR_CODE err = 0;
+  //fprintf( stderr, "before signal it, event=%x iCondition= %d..\n", event, *iCondition );
 #ifdef __APPLE__
+  WelsMutexLock (pMutex);
+  (*iCondition) --;
+  WelsMutexUnlock (pMutex);
+  if ((*iCondition) <= 0) {
   err = pthread_cond_signal (event);
+  //fprintf( stderr, "signal it, event=%x iCondition= %d..\n",event, *iCondition );
+
+  }
 #else
+    (*iCondition) --;
+    if ((*iCondition) <= 0) {
 //  int32_t val = 0;
 //  sem_getvalue(event, &val);
 //  fprintf( stderr, "before signal it, val= %d..\n",val );
@@ -338,14 +348,23 @@
   if (event != NULL)
     err = sem_post (*event);
 //  sem_getvalue(event, &val);
-//  fprintf( stderr, "after signal it, val= %d..\n",val );
+    //fprintf( stderr, "signal it, event=%x iCondition= %d..\n",event, *iCondition );
+    }
 #endif
+  //fprintf( stderr, "after signal it, event=%x  iCondition= %d..\n",event, *iCondition );
   return err;
 }
 
-WELS_THREAD_ERROR_CODE WelsEventWait (WELS_EVENT* event, WELS_MUTEX* pMutex) {
+WELS_THREAD_ERROR_CODE WelsEventWait (WELS_EVENT* event, WELS_MUTEX* pMutex, int& iCondition) {
 #ifdef __APPLE__
-  return pthread_cond_wait (event, pMutex);
+  int err = 0;
+  WelsMutexLock(pMutex);
+  //fprintf( stderr, "WelsEventWait event %x %d..\n", event, iCondition );
+  while (iCondition>0) {
+    err = pthread_cond_wait (event, pMutex);
+  }
+  WelsMutexUnlock(pMutex);
+  return err;
 #else
   return sem_wait (*event); // blocking until signaled
 #endif
--- a/codec/common/src/WelsThreadPool.cpp
+++ b/codec/common/src/WelsThreadPool.cpp
@@ -94,7 +94,7 @@
     }
   }
 
-  //fprintf(stdout, "m_iRefCount=%d, iMaxThreadNum=%d\n", m_iRefCount, m_iMaxThreadNum);
+  ////fprintf(stdout, "m_iRefCount=%d, iMaxThreadNum=%d\n", m_iRefCount, m_iMaxThreadNum);
 
   ++ m_iRefCount;
   //fprintf(stdout, "m_iRefCount2=%d\n", m_iRefCount);
@@ -135,9 +135,10 @@
   RemoveThreadFromBusyList (pThread);
   AddThreadToIdleQueue (pThread);
 
-  if (pTask->GetSink()) {
-    pTask->GetSink()->OnTaskExecuted();
+  if (pTask && pTask->GetSink()) {
     //fprintf(stdout, "CWelsThreadPool::OnTaskStop 1: Task %x at Thread %x Finished, m_pSink=%x\n", pTask, pThread, pTask->GetSink());
+    pTask->GetSink()->OnTaskExecuted();
+    ////fprintf(stdout, "CWelsThreadPool::OnTaskStop 1: Task %x at Thread %x Finished, m_pSink=%x\n", pTask, pThread, pTask->GetSink());
   }
   //if (m_pSink) {
   //  m_pSink->OnTaskExecuted (pTask);
@@ -222,13 +223,20 @@
   CWelsTaskThread* pThread = NULL;
   IWelsTask*    pTask = NULL;
   while (GetWaitedTaskNum() > 0) {
+    //fprintf(stdout, "ThreadPool:  ExecuteTask: waiting task %d\n", GetWaitedTaskNum());
     pThread = GetIdleThread();
     if (pThread == NULL) {
+      //fprintf(stdout, "ThreadPool:  ExecuteTask: no IdleThread\n");
+
       break;
     }
     pTask = GetWaitedTask();
     //fprintf(stdout, "ThreadPool:  ExecuteTask = %x at thread %x\n", pTask, pThread);
-    pThread->SetTask (pTask);
+    if (pTask) {
+      pThread->SetTask (pTask);
+    } else {
+      AddThreadToIdleQueue (pThread);
+    }
   }
 }
 
@@ -309,11 +317,12 @@
 CWelsTaskThread*   CWelsThreadPool::GetIdleThread() {
   CWelsAutoLock cLock (m_cLockIdleTasks);
 
-  //fprintf(stdout, "CWelsThreadPool::GetIdleThread=%d\n", m_cIdleThreads->size());
-  if (m_cIdleThreads->size() == 0) {
+  if (NULL == m_cIdleThreads || m_cIdleThreads->size() == 0) {
     return NULL;
   }
 
+  //fprintf(stdout, "CWelsThreadPool::GetIdleThread=%d\n", m_cIdleThreads->size());
+
   CWelsTaskThread* pThread = m_cIdleThreads->begin();
   m_cIdleThreads->pop_front();
   return pThread;
@@ -320,21 +329,21 @@
 }
 
 int32_t  CWelsThreadPool::GetBusyThreadNum() {
-  return m_cBusyThreads->size();
+  return (m_cBusyThreads?m_cBusyThreads->size():0);
 }
 
 int32_t  CWelsThreadPool::GetIdleThreadNum() {
-  return m_cIdleThreads->size();
+  return (m_cIdleThreads?m_cIdleThreads->size():0);
 }
 
 int32_t  CWelsThreadPool::GetWaitedTaskNum() {
-  return m_cWaitedTasks->size();
+  return (m_cWaitedTasks?m_cWaitedTasks->size():0);
 }
 
 IWelsTask* CWelsThreadPool::GetWaitedTask() {
   CWelsAutoLock lock (m_cLockWaitedTasks);
 
-  if (m_cWaitedTasks->size() == 0) {
+  if (NULL==m_cWaitedTasks || m_cWaitedTasks->size() == 0) {
     return NULL;
   }
 
@@ -347,6 +356,9 @@
 
 void  CWelsThreadPool::ClearWaitedTasks() {
   CWelsAutoLock cLock (m_cLockWaitedTasks);
+  if (NULL == m_cWaitedTasks) {
+    return;
+  }
   IWelsTask* pTask = NULL;
   while (0 != m_cWaitedTasks->size()) {
     pTask = m_cWaitedTasks->begin();
--- a/codec/encoder/core/src/wels_task_management.cpp
+++ b/codec/encoder/core/src/wels_task_management.cpp
@@ -195,12 +195,17 @@
 }
 
 void  CWelsTaskManageBase::OnTaskMinusOne() {
+  //fprintf(stdout, "OnTaskMinusOne event %x m_iWaitTaskNum=%d\n", &m_hEventMutex, m_iWaitTaskNum);
   WelsCommon::CWelsAutoLock cAutoLock (m_cWaitTaskNumLock);
+  WelsEventSignal (&m_hTaskEvent,&m_hEventMutex, &m_iWaitTaskNum);
+  /*WelsMutexLock(&m_hEventMutex);
   m_iWaitTaskNum --;
+  WelsMutexUnlock(&m_hEventMutex);
+
   if (m_iWaitTaskNum <= 0) {
     WelsEventSignal (&m_hTaskEvent);
-    //fprintf(stdout, "OnTaskMinusOne WelsEventSignal m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
-  }
+    fprintf(stdout, "OnTaskMinusOne WelsEventSignal m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
+  }*/
   //fprintf(stdout, "OnTaskMinusOne m_iWaitTaskNum=%d\n", m_iWaitTaskNum);
 }
 
@@ -228,7 +233,8 @@
     m_pThreadPool->QueueTask (pTargetTaskList->getNode (iIdx));
     iIdx ++;
   }
-  WelsEventWait (&m_hTaskEvent, &m_hEventMutex);
+
+  WelsEventWait (&m_hTaskEvent,&m_hEventMutex, m_iWaitTaskNum);
 
   return ENC_RETURN_SUCCESS;
 }