/********************************************************************** Filename : GThreads_orbis.cpp Content : Orbis thread support Created : Feb 2013 Authors : Dzmitry Malyshau, Oscar Valer Copyright : (c) 2013 Rockstar Games Corp. All Rights Reserved. Licensees may use this file in accordance with the valid Scaleform Commercial License Agreement provided with the software. This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR ANY PURPOSE. **********************************************************************/ #if RSG_ORBIS // RAGE - RSG_ORBIS #include "GThreads.h" #include "GHash.h" #ifndef GFC_NO_THREADSUPPORT #include "GTimer.h" #include #include #include // RAGE - for sprintf #define ORBIS_THREAD_STACK_SIZE 0x20000 // *** Classes Implemented class GMutex; class GWaitCondition; class GThread; // *** Mutex implementation // Interface used internally in a mutex class GMutex_AreadyLockedAcquireInterface: public GAcquireInterface { public: // Mutex we belong to GMutex *pMutex; GMutex_AreadyLockedAcquireInterface() { pMutex = 0; } // Acquire interface implementation virtual bool CanAcquire(); virtual bool TryAcquire(); virtual bool TryAcquireCommit(); virtual bool TryAcquireCancel(); // GInterface - no implementation virtual void AddRef() {} virtual void Release(UInt flags = 0) {} }; // Acquire interface implementation bool GMutex_AreadyLockedAcquireInterface::CanAcquire() { return true; } bool GMutex_AreadyLockedAcquireInterface::TryAcquire() { return pMutex->TryAcquire(); } bool GMutex_AreadyLockedAcquireInterface::TryAcquireCommit() { return pMutex->TryAcquireCommit(); } bool GMutex_AreadyLockedAcquireInterface::TryAcquireCancel() { return pMutex->TryAcquireCancel(); } // *** Internal mutex implementation structure class GMutexImpl: public GNewOverrideBase { ScePthreadMutex MutexId; ScePthread LockedBy; bool Recursive; UInt LockCount; GMutex_AreadyLockedAcquireInterface AreadyLockedAcquire; friend class GWaitConditionImpl; public: GMutexImpl(GMutex* pmutex, bool recursive = true); ~GMutexImpl(); // Locking functions void Lock(); bool TryLock(); void Unlock(GMutex* pmutex); bool IsLockedByAnotherThread() const; bool IsSignaled() const; GAcquireInterface* GetAcquireInterface(GMutex* pmutex); }; // Constructor and destructor GMutexImpl::GMutexImpl(GMutex* pmutex, bool recursive) { AreadyLockedAcquire.pMutex = pmutex; Recursive = recursive; LockCount = 0; ScePthreadMutexattr mutexAttr; scePthreadMutexattrInit(&mutexAttr); if (Recursive) scePthreadMutexattrSettype(&mutexAttr, SCE_PTHREAD_MUTEX_RECURSIVE); int err = scePthreadMutexInit(&MutexId, &mutexAttr, NULL); GASSERT(err == SCE_OK); scePthreadMutexattrDestroy(&mutexAttr); GUNUSED(err); } GMutexImpl::~GMutexImpl() { int err = scePthreadMutexDestroy(&MutexId); GASSERT(err == SCE_OK); GUNUSED(err); } // Lock and try lock void GMutexImpl::Lock() { while(scePthreadMutexLock(&MutexId) != SCE_OK); LockCount++; LockedBy = scePthreadSelf(); } bool GMutexImpl::TryLock() { if(scePthreadMutexTrylock(&MutexId) == SCE_OK) { LockCount++; LockedBy = scePthreadSelf(); return true; } return false; } // Unlock void GMutexImpl::Unlock(GMutex* pmutex) { const ScePthread thrId = scePthreadSelf(); GASSERT(thrId == LockedBy && LockCount > 0); GUNUSED(thrId); UInt lockCount; LockCount--; lockCount = LockCount; GMutex::CallableHandlers handlers; pmutex->GetCallableHandlers(&handlers); scePthreadMutexUnlock(&MutexId); if(lockCount == 0) handlers.CallWaitHandlers(); } bool GMutexImpl::IsLockedByAnotherThread() const { if(LockCount == 0) return false; const ScePthread thrId = scePthreadSelf(); if(thrId != LockedBy) return true; return false; } bool GMutexImpl::IsSignaled() const { return LockCount == 0; } // Obtain the acquisition interface GAcquireInterface* GMutexImpl::GetAcquireInterface(GMutex* pmutex) { if(LockCount && !IsLockedByAnotherThread()) return &AreadyLockedAcquire; return pmutex; } // *** Actual implementation of GMutex GMutex::GMutex(bool recursive, bool multiWait): GWaitable(multiWait) { pImpl = new GMutexImpl(this, recursive); } GMutex::~GMutex() { delete pImpl; } // Lock, try lock and unlock void GMutex::Lock() { pImpl->Lock(); } bool GMutex::TryLock() { return pImpl->TryLock(); } void GMutex::Unlock() { pImpl->Unlock(this); } bool GMutex::IsLockedByAnotherThread() { return pImpl->IsLockedByAnotherThread(); } bool GMutex::IsSignaled() const { return pImpl->IsSignaled(); } // Obtain the acquisition interface GAcquireInterface* GMutex::GetAcquireInterface() { return pImpl->GetAcquireInterface(this); } // Acquire interface implementation bool GMutex::CanAcquire() { return !IsLockedByAnotherThread(); } bool GMutex::TryAcquire() { return TryLock(); } bool GMutex::TryAcquireCommit() { return true; } bool GMutex::TryAcquireCancel() { Unlock(); return true; } // ***** Wait Condition implementation // Internal implementation class class GWaitConditionImpl: public GNewOverrideBase { ScePthreadMutex MutexId; ScePthreadCond CondvId; public: GWaitConditionImpl(); ~GWaitConditionImpl(); // Release mutex and wait for condition. The mutex is re-aqured after the wait bool Wait(GMutex *pmutex, UInt delay = GFC_WAIT_INFINITE); // Notify a condition, releasing at one object waiting // and releasing all objects waiting void Notify(); void NotifyAll(); }; // Constructor and destructor GWaitConditionImpl::GWaitConditionImpl() { static ScePthreadMutexattr mutexAttr = NULL; if (!mutexAttr) { scePthreadMutexattrInit( &mutexAttr ); } int err = scePthreadMutexInit( &MutexId, &mutexAttr,NULL ); GASSERT(err == SCE_OK); GUNUSED(err); static ScePthreadCondattr condvAttr = NULL; if (!condvAttr) { scePthreadCondattrInit( &condvAttr ); } err = scePthreadCondInit( &CondvId, &condvAttr, NULL ); GASSERT(err == SCE_OK); } GWaitConditionImpl::~GWaitConditionImpl() { int err = scePthreadCondDestroy( &CondvId ); GASSERT(err == SCE_OK); GUNUSED(err); err = scePthreadMutexDestroy( &MutexId ); GASSERT(err == SCE_OK); } // Release mutex and wait for condition. The mutex is re-aqured after the wait bool GWaitConditionImpl::Wait(GMutex *pmutex, UInt delay) { bool ret = true; UInt lockCount = pmutex->pImpl->LockCount; // Mutex must have been locked if(lockCount == 0) return false; scePthreadMutexLock( &MutexId ); if(pmutex->pImpl->Recursive) { // Release the recursive mutex N times pmutex->pImpl->LockCount = 0; for(UInt i = 0; i < lockCount; i++) { scePthreadMutexUnlock( &pmutex->pImpl->MutexId ); } pmutex->CallWaitHandlers(); } else { pmutex->pImpl->LockCount = 0; scePthreadMutexUnlock( &pmutex->pImpl->MutexId ); pmutex->CallWaitHandlers(); } if(delay == GFC_WAIT_INFINITE) { scePthreadCondWait( &CondvId, &MutexId ); } else { // Timeout period in microseconds SceKernelUseconds timeout = delay * 1000; const int err = scePthreadCondTimedwait( &CondvId, &MutexId, timeout ); GASSERT(err == SCE_OK || err == SCE_KERNEL_ERROR_ETIMEDOUT); if(err) ret = false; } scePthreadMutexUnlock( &MutexId ); // Re-aquire the mutex for(UInt i = 0; i < lockCount; i++) pmutex->Lock(); return ret; } // Notify a condition, releasing the least object in a queue void GWaitConditionImpl::Notify() { scePthreadMutexLock( &MutexId ); scePthreadCondSignal( &CondvId ); scePthreadMutexUnlock( &MutexId ); } // Notify a condition, releasing all objects waiting void GWaitConditionImpl::NotifyAll() { scePthreadMutexLock( &MutexId ); scePthreadCondBroadcast( &CondvId ); scePthreadMutexUnlock( &MutexId ); } // *** Actual implementation of GWaitCondition GWaitCondition::GWaitCondition() { pImpl = new GWaitConditionImpl; } GWaitCondition::~GWaitCondition() { delete pImpl; } // Wait for condition bool GWaitCondition::Wait(GMutex *pmutex, UInt delay) { return pImpl->Wait(pmutex, delay); } // Notification void GWaitCondition::Notify() { pImpl->Notify(); } void GWaitCondition::NotifyAll() { pImpl->NotifyAll(); } // *** GThread constructors and destructor // RAGE - this is so I can init some TLS variables void* GThread::EngineInitCallbackData = NULL; void (*GThread::EngineInitCallbackFn)(void*) = NULL; GThread::GThread(UPInt stackSize, int processor): GWaitable(1) { CreateParams params; params.stackSize = stackSize; params.processor = processor; Init(params); } GThread::GThread(GThread::ThreadFn threadFunction, void* userHandle, UPInt stackSize, int processor, GThread::ThreadState initialState): GWaitable(1) { CreateParams params(threadFunction, userHandle, stackSize, processor, initialState); Init(params); } GThread::GThread(const CreateParams& params): GWaitable(1) { Init(params); } void GThread::Init(const CreateParams& params) { ThreadFlags = 0; ThreadHandle = 0; ExitCode = 0; SuspendCount = 0; StackSize = params.stackSize; Processor = params.processor; Priority = params.priority; ThreadFunction = params.threadFunction; UserHandle = params.userHandle; if(params.initialState != NotRunning) Start(params.initialState); } GThread::~GThread() { ThreadHandle = 0; } // *** Overridable user functions // Default Run implementation SInt GThread::Run() { return (ThreadFunction) ? ThreadFunction(this, UserHandle) : 0; } void GThread::OnExit() { } // Finishes the thread and releases internal reference to it void GThread::FinishAndRelease() { CallableHandlers handlers; GetCallableHandlers(&handlers); ThreadFlags &= (UInt32)~(GFC_THREAD_STARTED); ThreadFlags |= GFC_THREAD_FINISHED; Release(); handlers.CallWaitHandlers(); } // *** ThreadList - used to tack all created threads class GThreadList: public GNewOverrideBase { struct ThreadHashOp { size_t operator()(const GThread* ptr) { return (((size_t)ptr) >> 6) ^ (size_t)ptr; } }; GHashSet ThreadSet; GMutex ThreadMutex; GWaitCondition ThreadsEmpty; ScePthread RootThreadId; static GThreadList* volatile pRunningThreads; void addThread(GThread *pthr) { GMutex::Locker lock(&ThreadMutex); ThreadSet.Add(pthr); } void removeThread(GThread *pthr) { GMutex::Locker lock(&ThreadMutex); ThreadSet.Remove(pthr); if(ThreadSet.GetSize() == 0) ThreadsEmpty.Notify(); } void finishAllThreads() { // Only original root thread can call this const ScePthread thrId = scePthreadSelf(); GASSERT(thrId == RootThreadId); GUNUSED(thrId); GMutex::Locker lock(&ThreadMutex); while(ThreadSet.GetSize() != 0) ThreadsEmpty.Wait(&ThreadMutex); } public: GThreadList() { RootThreadId = scePthreadSelf(); } ~GThreadList() {} static void AddRunningThread(GThread *pthr) { if(!pRunningThreads) { pRunningThreads = new GThreadList; GASSERT(pRunningThreads); } pRunningThreads->addThread(pthr); } static void RemoveRunningThread(GThread *pthr) { GASSERT(pRunningThreads); pRunningThreads->removeThread(pthr); } static void FinishAllThreads() { if(pRunningThreads) { pRunningThreads->finishAllThreads(); delete pRunningThreads; pRunningThreads = 0; } } }; // By default, we have no thread list GThreadList* volatile GThreadList::pRunningThreads = 0; void GThread::FinishAllThreads() { GThreadList::FinishAllThreads(); } SInt GThread::PRun() { if(ThreadFlags & GFC_THREAD_START_SUSPENDED) { Suspend(); ThreadFlags &= (UInt32)~GFC_THREAD_START_SUSPENDED; } ExitCode = Run(); return ExitCode; } bool GThread::GetExitFlag() const { return (ThreadFlags & GFC_THREAD_EXIT) != 0; } void GThread::SetExitFlag(bool exitFlag) { if(exitFlag) ThreadFlags |= GFC_THREAD_EXIT; else ThreadFlags &= (UInt32) ~GFC_THREAD_EXIT; } // State functions bool GThread::IsFinished() const { return (ThreadFlags & GFC_THREAD_FINISHED) != 0; } bool GThread::IsSuspended() const { return SuspendCount > 0; } GThread::ThreadState GThread::GetThreadState() const { if(IsSuspended()) return Suspended; if(ThreadFlags & GFC_THREAD_STARTED) return Running; return NotRunning; } // ***** Thread management // The actual first function called on thread start void* GThread_OrbisThreadStartFn(void* phandle) { GThread *const pthr = (GThread*)phandle; // RAGE - added this so we can init TLS data if (GThread::EngineInitCallbackFn) { GThread::EngineInitCallbackFn(GThread::EngineInitCallbackData); } SInt ret = pthr->PRun(); pthr->Exit(ret); return NULL; } /* static */ int GThread::GetOSPriority(ThreadPriority p) //static inline int MapToSystemPrority(GThread::ThreadPriority p) { switch(p) { case GThread::CriticalPriority: return SCE_KERNEL_PRIO_FIFO_HIGHEST; case GThread::HighestPriority: return (SCE_KERNEL_PRIO_FIFO_DEFAULT + 2*SCE_KERNEL_PRIO_FIFO_HIGHEST)/3; case GThread::AboveNormalPriority: return (SCE_KERNEL_PRIO_FIFO_DEFAULT*2 + SCE_KERNEL_PRIO_FIFO_HIGHEST)/3; case GThread::NormalPriority: return SCE_KERNEL_PRIO_FIFO_DEFAULT; case GThread::BelowNormalPriority: return (SCE_KERNEL_PRIO_FIFO_DEFAULT*2 + SCE_KERNEL_PRIO_FIFO_LOWEST)/3; case GThread::LowestPriority: return (SCE_KERNEL_PRIO_FIFO_DEFAULT + 2*SCE_KERNEL_PRIO_FIFO_LOWEST)/3; case GThread::IdlePriority: return SCE_KERNEL_PRIO_FIFO_LOWEST; } return 1000; } bool GThread::Start(ThreadState initialState) { if(initialState == NotRunning) return false; // If the thread is already running then wait until its // finished to begin running this thread if((GetThreadState() != NotRunning) && !Wait()) return false; // Default values of stack size and thread priority size_t stackSize = ORBIS_THREAD_STACK_SIZE; int thrPrio = GThread::GetOSPriority(NormalPriority); ExitCode = 0; SuspendCount = 0; ThreadFlags = (initialState == Running) ? 0 : GFC_THREAD_START_SUSPENDED; // AddRef to us until the thread is finished AddRef(); GThreadList::AddRunningThread(this); if(StackSize != ORBIS_THREAD_STACK_SIZE || Priority != NormalPriority) { stackSize = StackSize; thrPrio = GThread::GetOSPriority(Priority); } // RAGE - synthesize unique thread name so SN Tuner doesn't complain about duplicate threads. // A better fix would be to expose the thread name to the creator. ScePthreadAttr Attrib; scePthreadAttrInit( &Attrib ); scePthreadAttrSetstacksize( &Attrib, stackSize ); scePthreadAttrSetinheritsched( &Attrib, SCE_PTHREAD_EXPLICIT_SCHED ); // scePthreadAttrSetaffinity( &Attrib, 1 << Processor); scePthreadAttrSetschedpolicy( &Attrib, SCE_KERNEL_SCHED_RR ); SceKernelSchedParam schedParam = { thrPrio }; scePthreadAttrSetschedparam( &Attrib, &schedParam ); char tname[32]; sprintf(tname,"GThread %p",this); int err = scePthreadCreate( &ThreadHandle, NULL, >hread_OrbisThreadStartFn, this, tname ); GASSERT(err == SCE_OK); scePthreadAttrDestroy( &Attrib ); if(err) { ThreadFlags = 0; Release(); GThreadList::RemoveRunningThread(this); return false; } return true; } // Suspend doesn't supported bool GThread::Suspend() { return false; } // Resume doesn't supported bool GThread::Resume() { return false; } // Quits with an exit code void GThread::Exit(SInt exitCode) { OnExit(); FinishAndRelease(); GThreadList::RemoveRunningThread(this); scePthreadExit((void*)(size_t)exitCode); } // *** Sleep functions bool GThread::Sleep(UInt secs) { sceKernelSleep(secs); return true; } bool GThread::MSleep(UInt msecs) { sceKernelUsleep(msecs * 1000); return true; } int GThread::GetCPUCount() { return true; } #endif // GFC_NO_THREADSUPPORT #endif // RAGE - Orbis only