Ark Server API (ASA) - Wiki
Loading...
Searching...
No Matches
QueuedThreadPoolWrapper.h
Go to the documentation of this file.
1// Copyright Epic Games, Inc. All Rights Reserved.
2
3#pragma once
4
5#include "Async/Fundamental/Scheduler.h"
6#include "Async/Fundamental/Task.h"
7#include "Async/TaskGraphInterfaces.h"
8#include "Containers/Array.h"
9#include "Containers/Map.h"
10#include "CoreMinimal.h"
11#include "Experimental/ConcurrentLinearAllocator.h"
12#include "Experimental/Containers/FAAArrayQueue.h"
13#include "Experimental/Misc/ExecutionResource.h"
14#include "GenericPlatform/GenericPlatformAffinity.h"
15#include "HAL/CriticalSection.h"
16#include "HAL/Platform.h"
17#include "HAL/PlatformAffinity.h"
18#include "HAL/PlatformCrt.h"
19#include "HAL/PlatformProcess.h"
20#include "HAL/ThreadSafeCounter.h"
21#include "HAL/UnrealMemory.h"
22#include "Misc/AssertionMacros.h"
23#include "Misc/IQueuedWork.h"
24#include "Misc/MemStack.h"
25#include "Misc/ScopeLock.h"
27#include "ScopeRWLock.h"
28#include "Stats/Stats2.h"
29#include "Templates/Atomic.h"
30#include "Templates/Function.h"
31#include "Templates/RefCounting.h"
32
33#include <atomic>
34
35/** ThreadPool wrapper implementation allowing to schedule
36 * up to MaxConcurrency tasks at a time making sub-partitioning
37 * another thread-pool a breeze and allowing more fine-grained control
38 * over scheduling by effectively giving another set of priorities.
39 */
41{
42public:
43 /**
44 * InWrappedQueuedThreadPool Underlying thread pool to schedule task to.
45 * InMaxConcurrency Maximum number of concurrent tasks allowed, -1 will limit concurrency to number of threads available in the underlying thread pool.
46 * InPriorityMapper Thread-safe function used to map any priority from this Queue to the priority that should be used when scheduling the task on the underlying thread pool.
47 */
48 FQueuedThreadPoolWrapper(FQueuedThreadPool* InWrappedQueuedThreadPool, int32 InMaxConcurrency = -1, TFunction<EQueuedWorkPriority(EQueuedWorkPriority)> InPriorityMapper = [](EQueuedWorkPriority InPriority) { return InPriority; });
50
51 /**
52 * Queued task are not scheduled against the wrapped thread-pool until resumed
53 */
54 void Pause();
55
56 /**
57 * Resume a specified amount of queued work, or -1 to unpause.
58 */
59 void Resume(int32 InNumQueuedWork = -1);
60
61 /**
62 * Dynamically adjust the maximum number of concurrent tasks, -1 for unlimited.
63 */
64 void SetMaxConcurrency(int32 MaxConcurrency = -1);
65
67 bool RetractQueuedWork(IQueuedWork* InQueuedWork) override;
68 int32 GetNumThreads() const override;
69 int32 GetCurrentConcurrency() const { return CurrentConcurrency.load(std::memory_order_relaxed); }
70
71protected:
73 {
74 public:
76
79
80 FScheduledWork(const FScheduledWork&&) = delete;
82
83 ~FScheduledWork() override;
84
85 private:
86 uint32 AddRef() const override
87 {
88 return uint32(NumRefs.Increment());
89 }
90
91 uint32 Release() const override
92 {
93 uint32 Refs = uint32(NumRefs.Decrement());
94
95 // When the last ref is released, we call the schedule function of the parent pool
96 // so that OnUnschedule can release any resources acquired by the OnSchedule function and
97 // the scheduling of the next work items can proceed.
98 if (Refs == 0)
99 {
100 ParentPool->Schedule(const_cast<FScheduledWork*>(this));
101 }
102 return Refs;
103 }
104
105 uint32 GetRefCount() const override
106 {
107 return uint32(NumRefs.GetValue());
108 }
109
110 void Assign(FQueuedThreadPoolWrapper* InParentPool, IQueuedWork* InWork, EQueuedWorkPriority InPriority)
111 {
112 check(GetRefCount() == 0);
113 ParentPool = InParentPool;
114 Work = InWork;
115 Priority = InPriority;
116 AddRef();
117 }
118
119 void DoThreadedWork() override
120 {
121 {
122 // Add this object as an execution context that can be retrieved via
123 // FExecutionResourceContext::Get() if a task needs to hold on the
124 // resources acquired (i.e. Concurrency Limit, Memory Pressure, etc...)
125 // longer than for the DoThreadedWork() scope.
126 FExecutionResourceContextScope ExecutionContextScope(this);
127
129 }
130
131 Release();
132 }
133
134 void Abandon() override
135 {
137
138 Release();
139 }
140
142 {
144 }
145
146 int64 GetRequiredMemory() const override
147 {
149 }
150
152 {
153 return Work;
154 }
155
157 {
158 return Priority;
159 }
160
161 void Reset()
162 {
163 Work = nullptr;
164 }
165
171 };
172
173 // A critical section is used since we need reentrancy support from the same thread
176
177 // Can be overriden to dynamically control the maximum concurrency
178 virtual int32 GetMaxConcurrency() const { return MaxConcurrency.load(std::memory_order_relaxed); }
179
180 // Can be overriden to know when work has been scheduled.
181 virtual void OnScheduled(const IQueuedWork*) {}
182
183 // Can be overriden to know when work has been unscheduled.
184 virtual void OnUnscheduled(const IQueuedWork*) {}
185
186 // Can be overriden to allocate a more specialized version if needed.
188private:
190 bool CanSchedule(EQueuedWorkPriority Priority) const;
191 bool Create(uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority, const TCHAR* Name) override;
192 void Destroy() override;
193 void Schedule(FScheduledWork* Work = nullptr);
196
198
202 std::atomic<int32> MaxConcurrency;
204 std::atomic<int32> CurrentConcurrency;
206 bool bIsScheduling = false;
207};
208
209/** ThreadPool wrapper implementation allowing to schedule
210 * up to MaxConcurrency tasks at a time making sub-partitioning
211 * another thread-pool a breeze and allowing more fine-grained control
212 * over scheduling by giving full control of task reordering.
213 */
215{
216public:
217 /**
218 * InWrappedQueuedThreadPool Underlying thread pool to schedule task to.
219 * InMaxConcurrency Maximum number of concurrent tasks allowed, -1 will limit concurrency to number of threads available in the underlying thread pool.
220 * InPriorityMapper Thread-safe function used to map any priority from this Queue to the priority that should be used when scheduling the task on the underlying thread pool.
221 */
222 FQueuedThreadPoolDynamicWrapper(FQueuedThreadPool* InWrappedQueuedThreadPool, int32 InMaxConcurrency = -1, TFunction<EQueuedWorkPriority(EQueuedWorkPriority)> InPriorityMapper = [](EQueuedWorkPriority InPriority) { return InPriority; })
224 {
225 }
226
228 {
229 // Override priority to make sure all elements are in the same buckets and can then be sorted all together.
231 }
232
233 /**
234 * Apply sort predicate to reorder the queued tasks
235 */
236 void Sort(TFunctionRef<bool(const IQueuedWork* Lhs, const IQueuedWork* Rhs)> Predicate)
237 {
238 FScopeLock ScopeLock(&Lock);
239 QueuedWork.Sort(EQueuedWorkPriority::Normal, Predicate);
240 }
241};
242
243/** ThreadPool wrapper implementation allowing to schedule thread-pool tasks on the task graph.
244 */
245class FQueuedThreadPoolTaskGraphWrapper final : public FQueuedThreadPool
246{
247public:
248 /**
249 * InPriorityMapper Thread-safe function used to map any priority from this Queue to the priority that should be used when scheduling the task on the task graph.
250 */
251 FQueuedThreadPoolTaskGraphWrapper(TFunction<ENamedThreads::Type (EQueuedWorkPriority)> InPriorityMapper = nullptr)
252 : TaskCount(0)
253 , bIsExiting(0)
254 {
255 if (InPriorityMapper)
256 {
257 PriorityMapper = InPriorityMapper;
258 }
259 else
260 {
261 PriorityMapper = [this](EQueuedWorkPriority InPriority) { return GetDefaultPriorityMapping(InPriority); };
262 }
263 }
264
265 /**
266 * InDesiredThread The task-graph desired thread and priority.
267 */
269 : TaskCount(0)
270 , bIsExiting(0)
271 {
272 PriorityMapper = [InDesiredThread](EQueuedWorkPriority InPriority) { return InDesiredThread; };
273 }
274
276 {
277 Destroy();
278 }
279private:
281 {
282 check(bIsExiting == false);
283 TaskCount++;
284 FFunctionGraphTask::CreateAndDispatchWhenReady(
285 [this, InQueuedWork](ENamedThreads::Type CurrentThread, const FGraphEventRef& MyCompletionGraphEvent)
286 {
287 FMemMark Mark(FMemStack::Get());
288 InQueuedWork->DoThreadedWork();
289 OnTaskCompleted(InQueuedWork);
290 },
291 QUICK_USE_CYCLE_STAT(FQueuedThreadPoolTaskGraphWrapper, STATGROUP_ThreadPoolAsyncTasks),
292 nullptr,
293 PriorityMapper(InPriority)
294 );
295 }
296
297 bool RetractQueuedWork(IQueuedWork* InQueuedWork) override
298 {
299 // The task graph doesn't support retraction for now
300 return false;
301 }
302
303 void OnTaskCompleted(IQueuedWork* InQueuedWork)
304 {
305 --TaskCount;
306 }
307
308 int32 GetNumThreads() const override
309 {
310 return FTaskGraphInterface::Get().GetNumWorkerThreads();
311 }
312
314 {
315 ENamedThreads::Type DesiredThread = ENamedThreads::AnyNormalThreadNormalTask;
316 if (InQueuedWorkPriority > EQueuedWorkPriority::Normal)
317 {
318 DesiredThread = ENamedThreads::AnyBackgroundThreadNormalTask;
319 }
320 else if (InQueuedWorkPriority < EQueuedWorkPriority::Normal)
321 {
322 DesiredThread = ENamedThreads::AnyHiPriThreadNormalTask;
323 }
324 return DesiredThread;
325 }
326protected:
327 bool Create(uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority, const TCHAR* Name) override
328 {
329 return true;
330 }
331
332 void Destroy() override
333 {
334 bIsExiting = true;
335
336 if (LowLevelTasks::FScheduler::Get().IsWorkerThread())
337 {
338 LowLevelTasks::BusyWaitUntil([this]() { return TaskCount == 0; });
339 }
340 else
341 {
342 while (TaskCount != 0)
343 {
344 FPlatformProcess::Sleep(0.01f);
345 }
346 }
347 }
348private:
350 TAtomic<uint32> TaskCount;
351 TAtomic<bool> bIsExiting;
352};
353
354/** ThreadPool wrapper implementation allowing to schedule thread-pool tasks on the the low level backend which is also used by the taskgraph.
355*/
357{
358 /* Internal data of the scheduler used for cancellation */
360 {
362
363 virtual bool Retract()
364 {
365 return Task.TryCancel();
366 }
367 };
368public:
369 /**
370 * InPriorityMapper Thread-safe function used to map any priority from this Queue to the priority that should be used when scheduling the task on the underlying thread pool.
371 **/
372 FQueuedLowLevelThreadPool(TFunction<EQueuedWorkPriority(EQueuedWorkPriority)> InPriorityMapper = [](EQueuedWorkPriority InPriority) { return InPriority; }, LowLevelTasks::FScheduler* InScheduler = &LowLevelTasks::FScheduler::Get())
374 {
375 }
376
378 {
379 Destroy();
380 }
381
382 void* operator new(size_t size)
383 {
384 return FMemory::Malloc(size, 128);
385 }
386
387 void operator delete(void* ptr)
388 {
390 }
391
392 /**
393 * Queued task are not scheduled against the wrapped thread-pool until resumed
394 */
395 void Pause()
396 {
397 bIsPaused = true;
398 }
399
400 /**
401 * Resume a specified amount of queued work, or -1 to unpause.
402 */
403 void Resume(int32 InNumQueuedWork = -1)
404 {
405 for (uint32 i = 0; i < uint32(InNumQueuedWork); i++)
406 {
408 if (!QueuedWork)
409 {
410 break;
411 }
412 TaskCount.fetch_add(1, std::memory_order_acquire);
413 verifySlow(Scheduler->TryLaunch(QueuedWork->Task, LowLevelTasks::EQueuePreference::GlobalQueuePreference));
414 }
415
416 if (InNumQueuedWork == -1)
417 {
418 bIsPaused = false;
419 }
420
421 bool bWakeUpWorker = true;
422 ScheduleTasks(bWakeUpWorker);
423 }
424
425private:
426 void ScheduleTasks(bool &bWakeUpWorker)
427 {
428 while (!bIsPaused)
429 {
430 FQueuedWorkInternalData* QueuedWork = Dequeue();
431 if (QueuedWork)
432 {
433 bWakeUpWorker |= LowLevelTasks::FSchedulerTls::IsBusyWaiting();
434 verifySlow(Scheduler->TryLaunch(QueuedWork->Task, bWakeUpWorker ? LowLevelTasks::EQueuePreference::GlobalQueuePreference : LowLevelTasks::EQueuePreference::LocalQueuePreference, bWakeUpWorker));
435 TaskCount.fetch_add(1, std::memory_order_acquire);
436 bWakeUpWorker = true;
437 }
438 else
439 {
440 break;
441 }
442 }
443 }
444
446 {
447 TaskCount.fetch_sub(1, std::memory_order_release);
448 bool bWakeUpWorker = false;
449 ScheduleTasks(bWakeUpWorker);
450 }
451
453 {
454 check(bIsExiting == false);
455
456 FQueuedWorkInternalData* QueuedWorkInternalData = new FQueuedWorkInternalData();
457 InQueuedWork->InternalData = QueuedWorkInternalData;
458 EQueuedWorkPriority Priority = PriorityMapper(InPriority);
459
460 LowLevelTasks::ETaskPriority TaskPriorityMapper[int(EQueuedWorkPriority::Count)] = { LowLevelTasks::ETaskPriority::High, LowLevelTasks::ETaskPriority::High, LowLevelTasks::ETaskPriority::BackgroundHigh, LowLevelTasks::ETaskPriority::BackgroundNormal, LowLevelTasks::ETaskPriority::BackgroundLow, LowLevelTasks::ETaskPriority::BackgroundLow };
461 LowLevelTasks::ETaskPriority TaskPriority = TaskPriorityMapper[int(Priority)];
462 LowLevelTasks::ETaskFlags Flags = (InQueuedWork->GetQueuedWorkFlags() & EQueuedWorkFlags::DoNotRunInsideBusyWait) == EQueuedWorkFlags::None ? LowLevelTasks::ETaskFlags::DefaultFlags : (LowLevelTasks::ETaskFlags::DefaultFlags & ~LowLevelTasks::ETaskFlags::AllowBusyWaiting);
463
464 QueuedWorkInternalData->Task.Init(TEXT("FQueuedLowLevelThreadPoolTask"), TaskPriority, [InQueuedWork, InternalData = InQueuedWork->InternalData, Deleter = LowLevelTasks::TDeleter<FQueuedLowLevelThreadPool, &FQueuedLowLevelThreadPool::FinalizeExecution>{ this }]
465 {
466 FMemMark Mark(FMemStack::Get());
467 InQueuedWork->DoThreadedWork();
468 }, Flags);
469
470 if (!bIsPaused)
471 {
472 TaskCount.fetch_add(1, std::memory_order_acquire);
473 verifySlow(Scheduler->TryLaunch(QueuedWorkInternalData->Task, LowLevelTasks::EQueuePreference::GlobalQueuePreference));
474 }
475 else
476 {
477 Enqueue(Priority, QueuedWorkInternalData);
478 }
479 }
480
481 bool RetractQueuedWork(IQueuedWork* InQueuedWork) override
482 {
483 bool bCancelled = false;
484 if(InQueuedWork->InternalData.IsValid())
485 {
486 bCancelled = InQueuedWork->InternalData->Retract();
487 InQueuedWork->InternalData = nullptr;
488 }
489
490 bool bWakeUpWorker = true;
491 ScheduleTasks(bWakeUpWorker);
492 return bCancelled;
493 }
494
495 int32 GetNumThreads() const override
496 {
497 return Scheduler->GetNumWorkers();
498 }
499
500protected:
501 bool Create(uint32 InNumQueuedThreads, uint32 InStackSize, EThreadPriority InThreadPriority, const TCHAR* InName) override
502 {
503 return true;
504 }
505
506 void Destroy() override
507 {
508 bIsExiting = true;
509
510 while (true)
511 {
513 if (!QueuedWork)
514 {
515 break;
516 }
517
518 verify(QueuedWork->Retract());
519 TaskCount++;
520 verifySlow(Scheduler->TryLaunch(QueuedWork->Task, LowLevelTasks::EQueuePreference::GlobalQueuePreference));
521 }
522
523 if (Scheduler->IsWorkerThread())
524 {
525 Scheduler->BusyWaitUntil([this]() { return TaskCount == 0; });
526 }
527 else
528 {
529 while (TaskCount != 0)
530 {
531 FPlatformProcess::Sleep(0.01f);
532 }
533 }
534 }
535
536private:
538
540 {
541 for (int32 i = 0; i < int32(EQueuedWorkPriority::Count); i++)
542 {
543 FQueuedWorkInternalData* QueuedWork = PendingWork[i].dequeue();
544 if (QueuedWork)
545 {
546 return QueuedWork;
547 }
548 }
549 return nullptr;
550 }
551
553 {
554 PendingWork[int32(Priority)].enqueue(Item);
555 }
556
559
560 std::atomic_uint TaskCount{0};
561 std::atomic_bool bIsExiting{false};
562 std::atomic_bool bIsPaused{false};
563};
#define check(expr)
#define verifySlow(expr)
EQueuedWorkFlags
Definition IQueuedWork.h:10
#define TEXT(x)
Definition Platform.h:1108
EQueuedWorkPriority
FWindowsCriticalSection FCriticalSection
void Enqueue(EQueuedWorkPriority Priority, FQueuedWorkInternalData *Item)
FQueuedLowLevelThreadPool(TFunction< EQueuedWorkPriority(EQueuedWorkPriority)> InPriorityMapper=[](EQueuedWorkPriority InPriority) { return InPriority;}, LowLevelTasks::FScheduler *InScheduler=&LowLevelTasks::FScheduler::Get())
int32 GetNumThreads() const override
TFunction< EQueuedWorkPriority(EQueuedWorkPriority)> PriorityMapper
bool Create(uint32 InNumQueuedThreads, uint32 InStackSize, EThreadPriority InThreadPriority, const TCHAR *InName) override
bool RetractQueuedWork(IQueuedWork *InQueuedWork) override
FAAArrayQueue< FQueuedWorkInternalData > PendingWork[int32(EQueuedWorkPriority::Count)]
void AddQueuedWork(IQueuedWork *InQueuedWork, EQueuedWorkPriority InPriority=EQueuedWorkPriority::Normal) override
void Resume(int32 InNumQueuedWork=-1)
void ScheduleTasks(bool &bWakeUpWorker)
FQueuedWorkInternalData * Dequeue()
LowLevelTasks::FScheduler * Scheduler
void Sort(TFunctionRef< bool(const IQueuedWork *Lhs, const IQueuedWork *Rhs)> Predicate)
FQueuedThreadPoolDynamicWrapper(FQueuedThreadPool *InWrappedQueuedThreadPool, int32 InMaxConcurrency=-1, TFunction< EQueuedWorkPriority(EQueuedWorkPriority)> InPriorityMapper=[](EQueuedWorkPriority InPriority) { return InPriority;})
void AddQueuedWork(IQueuedWork *InQueuedWork, EQueuedWorkPriority InPriority=EQueuedWorkPriority::Normal) override
void OnTaskCompleted(IQueuedWork *InQueuedWork)
bool RetractQueuedWork(IQueuedWork *InQueuedWork) override
FQueuedThreadPoolTaskGraphWrapper(ENamedThreads::Type InDesiredThread)
FQueuedThreadPoolTaskGraphWrapper(TFunction< ENamedThreads::Type(EQueuedWorkPriority)> InPriorityMapper=nullptr)
bool Create(uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority, const TCHAR *Name) override
void AddQueuedWork(IQueuedWork *InQueuedWork, EQueuedWorkPriority InPriority=EQueuedWorkPriority::Normal) override
TFunction< ENamedThreads::Type(EQueuedWorkPriority)> PriorityMapper
ENamedThreads::Type GetDefaultPriorityMapping(EQueuedWorkPriority InQueuedWorkPriority)
void Assign(FQueuedThreadPoolWrapper *InParentPool, IQueuedWork *InWork, EQueuedWorkPriority InPriority)
FScheduledWork(const FScheduledWork &)=delete
EQueuedWorkFlags GetQueuedWorkFlags() const override
FScheduledWork & operator=(const FScheduledWork &&)=delete
FScheduledWork(const FScheduledWork &&)=delete
FScheduledWork & operator=(const FScheduledWork &)=delete
void Schedule(FScheduledWork *Work=nullptr)
FQueuedThreadPool * WrappedQueuedThreadPool
void AddQueuedWork(IQueuedWork *InQueuedWork, EQueuedWorkPriority InPriority=EQueuedWorkPriority::Normal) override
virtual int32 GetMaxConcurrency() const
FQueuedThreadPoolWrapper(FQueuedThreadPool *InWrappedQueuedThreadPool, int32 InMaxConcurrency=-1, TFunction< EQueuedWorkPriority(EQueuedWorkPriority)> InPriorityMapper=[](EQueuedWorkPriority InPriority) { return InPriority;})
virtual void OnScheduled(const IQueuedWork *)
virtual void OnUnscheduled(const IQueuedWork *)
TFunction< EQueuedWorkPriority(EQueuedWorkPriority)> PriorityMapper
std::atomic< int32 > CurrentConcurrency
bool RetractQueuedWork(IQueuedWork *InQueuedWork) override
int32 GetNumThreads() const override
bool CanSchedule(EQueuedWorkPriority Priority) const
bool Create(uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority, const TCHAR *Name) override
virtual FScheduledWork * AllocateScheduledWork()
TMap< IQueuedWork *, FScheduledWork * > ScheduledWork
void SetMaxConcurrency(int32 MaxConcurrency=-1)
FScheduledWork * AllocateWork(IQueuedWork *InnerWork, EQueuedWorkPriority Priority)
void Destroy() override
void Resume(int32 InNumQueuedWork=-1)
TArray< FScheduledWork * > WorkPool
EQueuedWorkPriority WrappedQueuePriority
void ReleaseWorkNoLock(FScheduledWork *Work)
std::atomic< int32 > MaxConcurrency
bool TryRetractWorkNoLock(EQueuedWorkPriority InPriority)
FThreadPoolPriorityQueue QueuedWork
UE_NODISCARD_CTOR FScopeLock(FCriticalSection *InSynchObject)
Definition ScopeLock.h:35
virtual int64 GetRequiredMemory() const
Definition IQueuedWork.h:86
virtual void DoThreadedWork()=0
virtual EQueuedWorkFlags GetQueuedWorkFlags() const
Definition IQueuedWork.h:81
virtual void Abandon()=0
Definition json.hpp:4518
static void Free(void *Original)
static void * Malloc(SIZE_T Count, uint32 Alignment=DEFAULT_ALIGNMENT)