5#include "Async/Fundamental/Scheduler.h"
6#include "Async/Fundamental/Task.h"
7#include "Async/TaskGraphInterfaces.h"
8#include "Containers/Array.h"
9#include "Containers/Map.h"
11#include "Experimental/ConcurrentLinearAllocator.h"
12#include "Experimental/Containers/FAAArrayQueue.h"
13#include "Experimental/Misc/ExecutionResource.h"
14#include "GenericPlatform/GenericPlatformAffinity.h"
15#include "HAL/CriticalSection.h"
16#include "HAL/Platform.h"
17#include "HAL/PlatformAffinity.h"
18#include "HAL/PlatformCrt.h"
19#include "HAL/PlatformProcess.h"
20#include "HAL/ThreadSafeCounter.h"
21#include "HAL/UnrealMemory.h"
22#include "Misc/AssertionMacros.h"
23#include "Misc/IQueuedWork.h"
24#include "Misc/MemStack.h"
25#include "Misc/ScopeLock.h"
28#include "Stats/Stats2.h"
29#include "Templates/Atomic.h"
30#include "Templates/Function.h"
31#include "Templates/RefCounting.h"
36
37
38
39
44
45
46
47
52
53
57
58
59 void Resume(int32 InNumQueuedWork = -1);
62
63
112 check(GetRefCount() == 0);
126 FExecutionResourceContextScope ExecutionContextScope(
this);
178 virtual int32
GetMaxConcurrency()
const {
return MaxConcurrency.load(std::memory_order_relaxed); }
191 bool Create(uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority,
const TCHAR* Name)
override;
210
211
212
213
218
219
220
221
234
235
239 QueuedWork.Sort(EQueuedWorkPriority::Normal, Predicate);
244
249
250
255 if (InPriorityMapper)
257 PriorityMapper = InPriorityMapper;
261 PriorityMapper = [
this](EQueuedWorkPriority InPriority) {
return GetDefaultPriorityMapping(InPriority); };
266
267
272 PriorityMapper = [InDesiredThread](EQueuedWorkPriority InPriority) {
return InDesiredThread; };
282 check(bIsExiting ==
false);
284 FFunctionGraphTask::CreateAndDispatchWhenReady(
285 [
this, InQueuedWork](ENamedThreads::Type CurrentThread,
const FGraphEventRef& MyCompletionGraphEvent)
287 FMemMark Mark(FMemStack::Get());
288 InQueuedWork->DoThreadedWork();
289 OnTaskCompleted(InQueuedWork);
291 QUICK_USE_CYCLE_STAT(FQueuedThreadPoolTaskGraphWrapper, STATGROUP_ThreadPoolAsyncTasks),
293 PriorityMapper(InPriority)
310 return FTaskGraphInterface::Get().GetNumWorkerThreads();
315 ENamedThreads::Type DesiredThread = ENamedThreads::AnyNormalThreadNormalTask;
318 DesiredThread = ENamedThreads::AnyBackgroundThreadNormalTask;
322 DesiredThread = ENamedThreads::AnyHiPriThreadNormalTask;
324 return DesiredThread;
327 bool Create(uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority,
const TCHAR* Name)
override
336 if (LowLevelTasks::FScheduler::Get().IsWorkerThread())
338 LowLevelTasks::BusyWaitUntil([
this]() {
return TaskCount == 0; });
342 while (TaskCount != 0)
344 FPlatformProcess::Sleep(0.01f);
355
365 return Task.TryCancel();
370
371
393
394
401
402
405 for (uint32 i = 0; i < uint32(InNumQueuedWork); i++)
412 TaskCount.fetch_add(1, std::memory_order_acquire);
413 verifySlow(Scheduler->TryLaunch(QueuedWork->Task, LowLevelTasks::EQueuePreference::GlobalQueuePreference));
416 if (InNumQueuedWork == -1)
421 bool bWakeUpWorker =
true;
430 FQueuedWorkInternalData* QueuedWork = Dequeue();
433 bWakeUpWorker |= LowLevelTasks::FSchedulerTls::IsBusyWaiting();
434 verifySlow(Scheduler->TryLaunch(QueuedWork->Task, bWakeUpWorker ? LowLevelTasks::EQueuePreference::GlobalQueuePreference : LowLevelTasks::EQueuePreference::LocalQueuePreference, bWakeUpWorker));
435 TaskCount.fetch_add(1, std::memory_order_acquire);
436 bWakeUpWorker =
true;
447 TaskCount.fetch_sub(1, std::memory_order_release);
448 bool bWakeUpWorker =
false;
454 check(bIsExiting ==
false);
457 InQueuedWork->InternalData = QueuedWorkInternalData;
460 LowLevelTasks::ETaskPriority TaskPriorityMapper[
int(EQueuedWorkPriority::Count)] = { LowLevelTasks::ETaskPriority::High, LowLevelTasks::ETaskPriority::High, LowLevelTasks::ETaskPriority::BackgroundHigh, LowLevelTasks::ETaskPriority::BackgroundNormal, LowLevelTasks::ETaskPriority::BackgroundLow, LowLevelTasks::ETaskPriority::BackgroundLow };
461 LowLevelTasks::ETaskPriority TaskPriority = TaskPriorityMapper[
int(Priority)];
462 LowLevelTasks::ETaskFlags Flags = (InQueuedWork->GetQueuedWorkFlags() & EQueuedWorkFlags::DoNotRunInsideBusyWait) == EQueuedWorkFlags::None ? LowLevelTasks::ETaskFlags::DefaultFlags : (LowLevelTasks::ETaskFlags::DefaultFlags & ~LowLevelTasks::ETaskFlags::AllowBusyWaiting);
464 QueuedWorkInternalData->Task.Init(
TEXT(
"FQueuedLowLevelThreadPoolTask"), TaskPriority, [InQueuedWork, InternalData = InQueuedWork->InternalData, Deleter = LowLevelTasks::TDeleter<FQueuedLowLevelThreadPool, &FQueuedLowLevelThreadPool::FinalizeExecution>{
this }]
466 FMemMark Mark(FMemStack::Get());
467 InQueuedWork->DoThreadedWork();
472 TaskCount.fetch_add(1, std::memory_order_acquire);
473 verifySlow(Scheduler->TryLaunch(QueuedWorkInternalData->Task, LowLevelTasks::EQueuePreference::GlobalQueuePreference));
483 bool bCancelled =
false;
484 if(InQueuedWork->InternalData.IsValid())
486 bCancelled = InQueuedWork->InternalData->Retract();
487 InQueuedWork->InternalData =
nullptr;
490 bool bWakeUpWorker =
true;
497 return Scheduler->GetNumWorkers();
501 bool Create(uint32 InNumQueuedThreads, uint32 InStackSize, EThreadPriority InThreadPriority,
const TCHAR* InName)
override
520 verifySlow(Scheduler->TryLaunch(QueuedWork->Task, LowLevelTasks::EQueuePreference::GlobalQueuePreference));
523 if (Scheduler->IsWorkerThread())
525 Scheduler->BusyWaitUntil([
this]() {
return TaskCount == 0; });
529 while (TaskCount != 0)
531 FPlatformProcess::Sleep(0.01f);
554 PendingWork[int32(Priority)].enqueue(Item);
FWindowsCriticalSection FCriticalSection
std::atomic_bool bIsExiting
void Enqueue(EQueuedWorkPriority Priority, FQueuedWorkInternalData *Item)
FQueuedLowLevelThreadPool(TFunction< EQueuedWorkPriority(EQueuedWorkPriority)> InPriorityMapper=[](EQueuedWorkPriority InPriority) { return InPriority;}, LowLevelTasks::FScheduler *InScheduler=&LowLevelTasks::FScheduler::Get())
int32 GetNumThreads() const override
std::atomic_uint TaskCount
TFunction< EQueuedWorkPriority(EQueuedWorkPriority)> PriorityMapper
bool Create(uint32 InNumQueuedThreads, uint32 InStackSize, EThreadPriority InThreadPriority, const TCHAR *InName) override
bool RetractQueuedWork(IQueuedWork *InQueuedWork) override
std::atomic_bool bIsPaused
FAAArrayQueue< FQueuedWorkInternalData > PendingWork[int32(EQueuedWorkPriority::Count)]
void AddQueuedWork(IQueuedWork *InQueuedWork, EQueuedWorkPriority InPriority=EQueuedWorkPriority::Normal) override
~FQueuedLowLevelThreadPool()
void Resume(int32 InNumQueuedWork=-1)
void ScheduleTasks(bool &bWakeUpWorker)
void operator delete(void *ptr)
FQueuedWorkInternalData * Dequeue()
void * operator new(size_t size)
LowLevelTasks::FScheduler * Scheduler
void Sort(TFunctionRef< bool(const IQueuedWork *Lhs, const IQueuedWork *Rhs)> Predicate)
FQueuedThreadPoolDynamicWrapper(FQueuedThreadPool *InWrappedQueuedThreadPool, int32 InMaxConcurrency=-1, TFunction< EQueuedWorkPriority(EQueuedWorkPriority)> InPriorityMapper=[](EQueuedWorkPriority InPriority) { return InPriority;})
void AddQueuedWork(IQueuedWork *InQueuedWork, EQueuedWorkPriority InPriority=EQueuedWorkPriority::Normal) override
void OnTaskCompleted(IQueuedWork *InQueuedWork)
TAtomic< bool > bIsExiting
bool RetractQueuedWork(IQueuedWork *InQueuedWork) override
TAtomic< uint32 > TaskCount
~FQueuedThreadPoolTaskGraphWrapper()
int32 GetNumThreads() const override
FQueuedThreadPoolTaskGraphWrapper(ENamedThreads::Type InDesiredThread)
FQueuedThreadPoolTaskGraphWrapper(TFunction< ENamedThreads::Type(EQueuedWorkPriority)> InPriorityMapper=nullptr)
bool Create(uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority, const TCHAR *Name) override
void AddQueuedWork(IQueuedWork *InQueuedWork, EQueuedWorkPriority InPriority=EQueuedWorkPriority::Normal) override
TFunction< ENamedThreads::Type(EQueuedWorkPriority)> PriorityMapper
ENamedThreads::Type GetDefaultPriorityMapping(EQueuedWorkPriority InQueuedWorkPriority)
FQueuedThreadPoolWrapper * ParentPool
void Assign(FQueuedThreadPoolWrapper *InParentPool, IQueuedWork *InWork, EQueuedWorkPriority InPriority)
IQueuedWork * GetInnerWork() const
uint32 GetRefCount() const override
uint32 AddRef() const override
EQueuedWorkPriority GetPriority() const
FThreadSafeCounter NumRefs
FScheduledWork(const FScheduledWork &)=delete
void DoThreadedWork() override
int64 GetRequiredMemory() const override
uint32 Release() const override
EQueuedWorkPriority Priority
~FScheduledWork() override
EQueuedWorkFlags GetQueuedWorkFlags() const override
FScheduledWork & operator=(const FScheduledWork &&)=delete
FScheduledWork(const FScheduledWork &&)=delete
FScheduledWork & operator=(const FScheduledWork &)=delete
void Schedule(FScheduledWork *Work=nullptr)
~FQueuedThreadPoolWrapper()
FQueuedThreadPool * WrappedQueuedThreadPool
void AddQueuedWork(IQueuedWork *InQueuedWork, EQueuedWorkPriority InPriority=EQueuedWorkPriority::Normal) override
virtual int32 GetMaxConcurrency() const
FQueuedThreadPoolWrapper(FQueuedThreadPool *InWrappedQueuedThreadPool, int32 InMaxConcurrency=-1, TFunction< EQueuedWorkPriority(EQueuedWorkPriority)> InPriorityMapper=[](EQueuedWorkPriority InPriority) { return InPriority;})
virtual void OnScheduled(const IQueuedWork *)
virtual void OnUnscheduled(const IQueuedWork *)
TFunction< EQueuedWorkPriority(EQueuedWorkPriority)> PriorityMapper
std::atomic< int32 > CurrentConcurrency
bool RetractQueuedWork(IQueuedWork *InQueuedWork) override
int32 GetNumThreads() const override
bool CanSchedule(EQueuedWorkPriority Priority) const
bool Create(uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority, const TCHAR *Name) override
virtual FScheduledWork * AllocateScheduledWork()
TMap< IQueuedWork *, FScheduledWork * > ScheduledWork
void SetMaxConcurrency(int32 MaxConcurrency=-1)
FScheduledWork * AllocateWork(IQueuedWork *InnerWork, EQueuedWorkPriority Priority)
void Resume(int32 InNumQueuedWork=-1)
TArray< FScheduledWork * > WorkPool
EQueuedWorkPriority WrappedQueuePriority
void ReleaseWorkNoLock(FScheduledWork *Work)
std::atomic< int32 > MaxConcurrency
bool TryRetractWorkNoLock(EQueuedWorkPriority InPriority)
int32 GetCurrentConcurrency() const
FThreadPoolPriorityQueue QueuedWork
UE_NODISCARD_CTOR FScopeLock(FCriticalSection *InSynchObject)
virtual int64 GetRequiredMemory() const
virtual void DoThreadedWork()=0
virtual EQueuedWorkFlags GetQueuedWorkFlags() const
static void Free(void *Original)
static void * Malloc(SIZE_T Count, uint32 Alignment=DEFAULT_ALIGNMENT)
LowLevelTasks::FTask Task