shithub: openh264

ref: 99faf1ec4a47c5362a6bde552d6097166e2fda16
dir: /codec/WelsThreadLib/src/WelsThreadLib.cpp/

View raw version
/*!
 * \copy
 *     Copyright (c)  2009-2013, Cisco Systems
 *     All rights reserved.
 *
 *     Redistribution and use in source and binary forms, with or without
 *     modification, are permitted provided that the following conditions
 *     are met:
 *
 *        * Redistributions of source code must retain the above copyright
 *          notice, this list of conditions and the following disclaimer.
 *
 *        * Redistributions in binary form must reproduce the above copyright
 *          notice, this list of conditions and the following disclaimer in
 *          the documentation and/or other materials provided with the
 *          distribution.
 *
 *     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 *     "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 *     LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 *     FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 *     COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 *     INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 *     BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 *     LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 *     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 *     LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 *     ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 *     POSSIBILITY OF SUCH DAMAGE.
 *
 *
 * \file	WelsThreadLib.c
 *
 * \brief	Interfaces introduced in thread programming
 *
 * \date	11/17/2009 Created
 *
 *************************************************************************************
 */


#include "WelsThreadLib.h"
#include <stdio.h>

#ifdef  WIN32

void WelsSleep( uint32_t dwMilliseconds )
{
	Sleep( dwMilliseconds );
}

WELS_THREAD_ERROR_CODE    WelsMutexInit( WELS_MUTEX   * mutex )
{
	InitializeCriticalSection(mutex);

	return WELS_THREAD_ERROR_OK;
}

WELS_THREAD_ERROR_CODE    WelsMutexLock( WELS_MUTEX   * mutex )
{
	EnterCriticalSection(mutex);

	return WELS_THREAD_ERROR_OK;
}

WELS_THREAD_ERROR_CODE    WelsMutexUnlock( WELS_MUTEX * mutex )
{
	LeaveCriticalSection(mutex);

	return WELS_THREAD_ERROR_OK;
}

WELS_THREAD_ERROR_CODE    WelsMutexDestroy( WELS_MUTEX * mutex )
{
    DeleteCriticalSection(mutex);

	return WELS_THREAD_ERROR_OK;
}

WELS_THREAD_ERROR_CODE    WelsEventInit( WELS_EVENT  *  event )
{
    WELS_EVENT   h = CreateEvent(NULL, FALSE, FALSE, NULL);

	if( h == NULL ){
		return WELS_THREAD_ERROR_GENERIAL;
	}
	*event = h;
	return WELS_THREAD_ERROR_OK;
}

WELS_THREAD_ERROR_CODE    WelsEventSignal( WELS_EVENT * event )
{
	if( SetEvent( *event ) ){
		return WELS_THREAD_ERROR_OK;
	}
	return WELS_THREAD_ERROR_GENERIAL;
}

WELS_THREAD_ERROR_CODE    WelsEventReset( WELS_EVENT * event )
{
	if ( ResetEvent( *event ) )
		return WELS_THREAD_ERROR_OK;
	return WELS_THREAD_ERROR_GENERIAL;
}

WELS_THREAD_ERROR_CODE    WelsEventWait( WELS_EVENT * event )
{
	return WaitForSingleObject(*event, INFINITE );
}

WELS_THREAD_ERROR_CODE    WelsEventWaitWithTimeOut( WELS_EVENT * event, uint32_t dwMilliseconds )
{
	return WaitForSingleObject(*event, dwMilliseconds );
}

WELS_THREAD_ERROR_CODE    WelsMultipleEventsWaitSingleBlocking(	uint32_t nCount,
																WELS_EVENT *event_list,
																uint32_t dwMilliseconds )
{
	return WaitForMultipleObjects( nCount, event_list, FALSE, dwMilliseconds );
}

WELS_THREAD_ERROR_CODE    WelsMultipleEventsWaitAllBlocking( uint32_t nCount, WELS_EVENT *event_list )
{
	return WaitForMultipleObjects( nCount, event_list, TRUE, (uint32_t)-1 );
}

WELS_THREAD_ERROR_CODE    WelsEventDestroy( WELS_EVENT * event )
{
	CloseHandle( *event );

	*event = NULL;
	return WELS_THREAD_ERROR_OK;
}


WELS_THREAD_ERROR_CODE    WelsThreadCreate( WELS_THREAD_HANDLE * thread,  LPWELS_THREAD_ROUTINE  routine, 
										   void * arg, WELS_THREAD_ATTR attr)
{
    WELS_THREAD_HANDLE   h = CreateThread(NULL, 0, routine, arg, 0, NULL);

	if( h == NULL ) {
		return WELS_THREAD_ERROR_GENERIAL;
	}
	* thread = h;

	return WELS_THREAD_ERROR_OK;
}

WELS_THREAD_ERROR_CODE	  WelsSetThreadCancelable()
{
	// nil implementation for WIN32
	return WELS_THREAD_ERROR_OK;
}

WELS_THREAD_ERROR_CODE    WelsThreadJoin( WELS_THREAD_HANDLE  thread )
{
    WaitForSingleObject(thread, INFINITE);

	return WELS_THREAD_ERROR_OK;
}

WELS_THREAD_ERROR_CODE    WelsThreadCancel( WELS_THREAD_HANDLE  thread )
{
	return WELS_THREAD_ERROR_OK;
}


WELS_THREAD_ERROR_CODE    WelsThreadDestroy( WELS_THREAD_HANDLE *thread )
{
	if ( thread != NULL )
	{
		CloseHandle(*thread);
		*thread = NULL;
	}	
	return WELS_THREAD_ERROR_OK;
}

WELS_THREAD_HANDLE        WelsThreadSelf()
{
	return GetCurrentThread();
}

WELS_THREAD_ERROR_CODE    WelsQueryLogicalProcessInfo(WelsLogicalProcessInfo * pInfo)
{
	SYSTEM_INFO  si;	
	
	GetSystemInfo(&si);

	pInfo->ProcessorCount = si.dwNumberOfProcessors;

	return WELS_THREAD_ERROR_OK;
}

#elif   defined(__GNUC__)

#ifdef MACOS
#include <CoreServices/CoreServices.h>
//#include <Gestalt.h>
#endif//MACOS

static int32_t  SystemCall(const str_t * pCmd, str_t * pRes, int32_t iSize)
{
    int32_t fd[2];
    int32_t iPid;
    int32_t iCount;
    int32_t left;
    str_t * p = NULL;
    int32_t iMaxLen = iSize - 1;
    memset(pRes, 0, iSize);

    if( pipe(fd) ){
        return -1;
    }

    if( (iPid = fork()) == 0 ){
        int32_t  fd2[2];
        if( pipe(fd2) ){
            return -1;
        }
        close(STDOUT_FILENO);
        dup2(fd2[1],STDOUT_FILENO);
        close(fd[0]);
        close(fd2[1]);
        system(pCmd);
        read(fd2[0], pRes, iMaxLen);
        write(fd[1], pRes, strlen(pRes));	// confirmed_safe_unsafe_usage
        close(fd2[0]);
		close(fd[1]);
        exit(0);
    }
    close(fd[1]);
    p = pRes;
    left = iMaxLen;
    while( (iCount = read(fd[0], p, left)) ){
        p += iCount;
        left -= iCount;
        if( left <=0 ) break;   
    }
    close(fd[0]);
    return 0;
}

void WelsSleep( uint32_t dwMilliseconds )
{
	usleep( dwMilliseconds * 1000 );	// microseconds
}

WELS_THREAD_ERROR_CODE    WelsThreadCreate( WELS_THREAD_HANDLE * thread,  LPWELS_THREAD_ROUTINE  routine, 
										   void * arg, WELS_THREAD_ATTR attr)
{
	WELS_THREAD_ERROR_CODE err = 0;

	pthread_attr_t at;
	err = pthread_attr_init(&at);
	if ( err )
		return err;
	err = pthread_attr_setscope(&at, PTHREAD_SCOPE_SYSTEM);
	if ( err )
		return err;
	err = pthread_attr_setschedpolicy(&at, SCHED_FIFO);
	if ( err )
		return err;
	err = pthread_create( thread, &at, routine, arg );

	pthread_attr_destroy(&at);

	return err;

//	return pthread_create(thread, NULL, routine, arg); 
}

WELS_THREAD_ERROR_CODE	  WelsSetThreadCancelable()
{
	WELS_THREAD_ERROR_CODE err = pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL );
	if ( 0 == err )
		err = pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, NULL );
	return err;
}

WELS_THREAD_ERROR_CODE    WelsThreadJoin( WELS_THREAD_HANDLE  thread )
{
    return pthread_join(thread, NULL);
}

WELS_THREAD_ERROR_CODE    WelsThreadCancel( WELS_THREAD_HANDLE  thread )
{
	return pthread_cancel( thread );
}

WELS_THREAD_ERROR_CODE    WelsThreadDestroy( WELS_THREAD_HANDLE *thread )
{	
	return WELS_THREAD_ERROR_OK;
}

WELS_THREAD_HANDLE        WelsThreadSelf()
{
	return pthread_self();
}

WELS_THREAD_ERROR_CODE    WelsMutexInit( WELS_MUTEX   * mutex )
{
	return pthread_mutex_init(mutex, NULL);
}

WELS_THREAD_ERROR_CODE    WelsMutexLock( WELS_MUTEX   * mutex )
{
	return pthread_mutex_lock(mutex);
}

WELS_THREAD_ERROR_CODE    WelsMutexUnlock( WELS_MUTEX * mutex )
{
	return pthread_mutex_unlock(mutex);
}

WELS_THREAD_ERROR_CODE    WelsMutexDestroy( WELS_MUTEX * mutex )
{
    return pthread_mutex_destroy(mutex);
}

// unnamed semaphores can not work well for posix threading models under not root users

WELS_THREAD_ERROR_CODE    WelsEventInit( WELS_EVENT *event )
{
	return sem_init(event, 0, 0);
}

WELS_THREAD_ERROR_CODE   WelsEventDestroy( WELS_EVENT * event )
{
	return sem_destroy( event );	// match with sem_init	
}

WELS_THREAD_ERROR_CODE    WelsEventOpen( WELS_EVENT **p_event, str_t *event_name )
{
	if ( p_event == NULL || event_name == NULL )
		return WELS_THREAD_ERROR_GENERIAL;
	*p_event = sem_open(event_name, O_CREAT,  (S_IRUSR | S_IWUSR)/*0600*/, 0);
	if ( *p_event == (sem_t *)SEM_FAILED ) {
		sem_unlink( event_name );
		*p_event = NULL;
		return WELS_THREAD_ERROR_GENERIAL;
	} else {		
		return WELS_THREAD_ERROR_OK;
	}
}
WELS_THREAD_ERROR_CODE    WelsEventClose( WELS_EVENT *event, str_t *event_name )
{
	WELS_THREAD_ERROR_CODE err = sem_close( event );	// match with sem_open
	if ( event_name )
		sem_unlink( event_name );
	return err;
}

WELS_THREAD_ERROR_CODE   WelsEventSignal( WELS_EVENT * event )
{
	WELS_THREAD_ERROR_CODE err = 0;
//	int32_t val = 0;
//	sem_getvalue(event, &val);
//	fprintf( stderr, "before signal it, val= %d..\n",val );
	err = sem_post(event);
//	sem_getvalue(event, &val);
//	fprintf( stderr, "after signal it, val= %d..\n",val );
    return err;
}
WELS_THREAD_ERROR_CODE    WelsEventReset( WELS_EVENT * event )
{
	// FIXME for posix event reset, seems not be supported for pthread??
	sem_close(event);
	return sem_init(event, 0, 0);
}

WELS_THREAD_ERROR_CODE   WelsEventWait( WELS_EVENT * event )
{
	return sem_wait(event);	// blocking until signaled
}

WELS_THREAD_ERROR_CODE    WelsEventWaitWithTimeOut( WELS_EVENT * event, uint32_t dwMilliseconds )
{	
	if ( dwMilliseconds != (uint32_t)-1 )
	{
		return sem_wait(event);
	}
	else
	{
#if defined(MACOS)
		int32_t err = 0;
		int32_t wait_count = 0;
		do{
			err = sem_trywait(event);
			if ( WELS_THREAD_ERROR_OK == err)
				break;// WELS_THREAD_ERROR_OK;
			else if ( wait_count > 0 )
				break;
			usleep( dwMilliseconds * 1000 );
			++ wait_count;
		}while(1);
		return err;
#else
		struct timespec ts;
		struct timeval tv;

		gettimeofday(&tv,0);

		ts.tv_sec = tv.tv_sec + dwMilliseconds /1000;
		ts.tv_nsec = tv.tv_usec*1000 + (dwMilliseconds % 1000) * 1000000;

		return sem_timedwait(event, &ts);
#endif//MACOS
	}
}

WELS_THREAD_ERROR_CODE    WelsMultipleEventsWaitSingleBlocking(	uint32_t nCount,
																WELS_EVENT **event_list,
																uint32_t dwMilliseconds )
{
	// bWaitAll = FALSE && blocking
	uint32_t nIdx = 0;
	const uint32_t kuiAccessTime = 2;	// 2 us once
//	uint32_t uiSleepMs = 0;

	if ( nCount == 0 )
		return WELS_THREAD_ERROR_WAIT_FAILED;

	while (1)
	{
		nIdx = 0;	// access each event by order
		while ( nIdx < nCount )
		{
			int32_t err = 0;			
//#if defined(MACOS)	// clock_gettime(CLOCK_REALTIME) & sem_timedwait not supported on mac, so have below impl
			int32_t wait_count = 0;
//			struct timespec ts;
//			struct timeval tv;
//			
//			gettimeofday(&tv,0);
//			ts.tv_sec = tv.tv_sec/*+ kuiAccessTime / 1000*/;		// second
//			ts.tv_nsec = (tv.tv_usec + kuiAccessTime) * 1000;	// nano-second
			
			/*
			 * although such interface is not used in __GNUC__ like platform, to use 
			 * pthread_cond_timedwait() might be better choice if need
			 */
			do{
				err = sem_trywait( event_list[nIdx] );
				if ( WELS_THREAD_ERROR_OK == err )
					return WELS_THREAD_ERROR_WAIT_OBJECT_0 + nIdx;
				else if ( wait_count > 0 )
					break;
				usleep(kuiAccessTime);
				++ wait_count;
			}while( 1 );
//#else
//			struct timespec ts;
//			
//			if ( clock_gettime(CLOCK_REALTIME, &ts) == -1 )
//				return WELS_THREAD_ERROR_WAIT_FAILED;
//			ts.tv_nsec += kuiAccessTime/*(kuiAccessTime % 1000)*/ * 1000;
//			
////			fprintf( stderr, "sem_timedwait(): start to wait event %d..\n", nIdx );
//			err = sem_timedwait(event_list[nIdx], &ts);
////			if ( err == -1 )
////			{
////				sem_getvalue(&event_list[nIdx], &val);
////				fprintf( stderr, "sem_timedwait() errno(%d) semaphore %d..\n", errno, val);
////				return WELS_THREAD_ERROR_WAIT_FAILED;
////			}			
////			fprintf( stderr, "sem_timedwait(): wait event %d result %d errno %d..\n", nIdx, err, errno );
//			if ( WELS_THREAD_ERROR_OK == err ) // non-blocking mode
//			{	
////				int32_t val = 0;
////				sem_getvalue(&event_list[nIdx], &val);
////				fprintf( stderr, "after sem_timedwait(), event_list[%d] semaphore value= %d..\n", nIdx, val);
////				fprintf( stderr, "WelsMultipleEventsWaitSingleBlocking sleep %d us\n", uiSleepMs);
//				return WELS_THREAD_ERROR_WAIT_OBJECT_0 + nIdx;
//			}
//#endif					
			// we do need access next event next time
			++ nIdx;
//			uiSleepMs += kuiAccessTime;
		}
		usleep( 1 );	// switch to working threads
//		++ uiSleepMs;
	}	

	return WELS_THREAD_ERROR_WAIT_FAILED;
}

WELS_THREAD_ERROR_CODE    WelsMultipleEventsWaitAllBlocking( uint32_t nCount, WELS_EVENT **event_list )
{
	// bWaitAll = TRUE && blocking
	uint32_t nIdx = 0;
//	const uint32_t kuiAccessTime = (uint32_t)-1;// 1 ms once
	uint32_t uiCountSignals = 0;
	uint32_t uiSignalFlag	= 0;	// UGLY: suppose maximal event number up to 32
	
	if ( nCount == 0 || nCount > (sizeof(uint32_t)<<3) )
		return WELS_THREAD_ERROR_WAIT_FAILED;
	
	while (1)
	{
		nIdx = 0;	// access each event by order
		while (nIdx < nCount)
		{			
			const uint32_t kuiBitwiseFlag = (1<<nIdx);
			
			if ( (uiSignalFlag & kuiBitwiseFlag) != kuiBitwiseFlag ) // non-blocking mode
			{	
				int32_t err = 0;
//				fprintf( stderr, "sem_wait(): start to wait event %d..\n", nIdx );
				err = sem_wait(event_list[nIdx]);
//				fprintf( stderr, "sem_wait(): wait event %d result %d errno %d..\n", nIdx, err, errno );
				if ( WELS_THREAD_ERROR_OK == err )
				{
//					int32_t val = 0;
//					sem_getvalue(&event_list[nIdx], &val);
//					fprintf( stderr, "after sem_timedwait(), event_list[%d] semaphore value= %d..\n", nIdx, val);

					uiSignalFlag |= kuiBitwiseFlag;
					++ uiCountSignals;
					if ( uiCountSignals >= nCount )
					{						
						return WELS_THREAD_ERROR_OK;
					}
				}				
			}			
			// we do need access next event next time
			++ nIdx;
		}		
	}	
	
	return WELS_THREAD_ERROR_WAIT_FAILED;
}

WELS_THREAD_ERROR_CODE    WelsQueryLogicalProcessInfo(WelsLogicalProcessInfo * pInfo)
{
#ifdef LINUX

#define   CMD_RES_SIZE    2048
    str_t pBuf[CMD_RES_SIZE];
   
    SystemCall("cat /proc/cpuinfo | grep \"processor\" | wc -l", pBuf, CMD_RES_SIZE);

    pInfo->ProcessorCount = atoi(pBuf);

    if( pInfo->ProcessorCount == 0 ){
        pInfo->ProcessorCount = 1;
    }   
 
	return WELS_THREAD_ERROR_OK;
#undef   CMD_RES_SIZE

#else

	SInt32 cpunumber;
	Gestalt(gestaltCountOfCPUs,&cpunumber);

	pInfo->ProcessorCount	= cpunumber;

	return WELS_THREAD_ERROR_OK;

#endif//LINUX
}

#endif