【GAD翻译馆】任务系统2.0:实现无锁的工作窃取(一):基础
译者:王磊(未来的未来) 审校:张华栋(wcby)
早在2012年,我就介绍了Molecule引擎中的任务调度器的实现。 从那以后,已经过去了三年的时间,现在是时候给旧的系统做一个应有的提升了。
对新任务系统的要求如下:
l 基础实现需要很简单。任务自己本身可能很愚蠢,但应该有可能建立高层次的算法,比如说是在顶部的实现上实现并行化。
l 任务系统需要实现自动负载平衡。
l 性能改进应该通过在适用的情况下逐步用无锁的方案替换系统的某些部分来完成。
l 系统需要支持“动态并行性”:在任务仍然运行的时候,必须能够改变任务的父子关系并添加依赖关系。这是允许高级原语(比如像是parallel_for)将给定的任务负载动态分割为更小的任务所必需的。
今天,我们将看下新任务系统的基本实现,使用锁/临界区。 即使在使用锁的时候,我也想在lock-free之前指出这里面的一些缺陷。
非常基础的实现
与旧的任务调度器类似,我们的新任务系统基本上以如下方式进行工作:
l 我们有N个工作线程,不断从队列中获取任务,并执行这些任务。
l 对于N个核的情况,我们创建N-1个工作线程。
l 主线程也被认为是一个工作线程,可以帮助执行任务。
这一次,有一个主要的区别:我们的任务系统现在实现了一个被称为“任务窃取”的概念,这意味着每个工作线程都有自己的任务队列,而不是使用一个所有任务被推入的全局队列。使用全局任务队列会产生很多竞争,特别是在涉及多个线程的时候。
“任务窃取“是一个简单而有效的概念:
l 新的任务总是被压入调用线程的队列中。
l 当工作线程想要任务的时候,它首先尝试从自己的队列中弹出一个任务。如果队列中没有任何任务,则线程将尝试从另一个工作线程的队列中窃取任务。
l 操作Push()和Pop()仅由拥有这个队列的工作线程调用。
l 操作Steal()仅由不拥有队列的工作线程调用。
最后两项是非常重要的,并导致以下结论:
l 操作Push()和Pop()可以在队列的一端(私端)工作,而操作Steal()在另一端(公共端)工作。
l 私有端可以以后进先出(LIFO)方式来工作,以便更好地利用缓存,而公共端以先入先出(FIFO)方式工作,以实现更好的负载平衡。
在谈到任务窃取的时候,这种双端数据结构通常被称为任务窃取队列/双端队列。 这种任务窃取队列的一个非常重要的好处是可以以无锁的方式实现这一点。
在C ++中,一个基本的实现可能如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | // main function of each worker thread while (workerThreadActive) { Job* job = GetJob(); if (job) { Execute(job); } } Job* GetJob( void ) { WorkStealingQueue* queue = GetWorkerThreadQueue(); Job* job = queue->Pop(); if (IsEmptyJob(job)) { // this is not a valid job because our own queue is empty, so try stealing from some other queue unsigned int randomIndex = GenerateRandomNumber(0, g_workerThreadCount+1); WorkStealingQueue* stealQueue = g_jobQueues[randomIndex]; if (stealQueue == queue) { // don't try to steal from ourselves Yield(); return nullptr; } Job* stolenJob = stealQueue->Steal(); if (IsEmptyJob(stolenJob)) { // we couldn't steal a job from the other queue either, so we just yield our time slice for now Yield(); return nullptr; } return stolenJob; } return job; } void Execute(Job* job) { (job->function)(job, job->data); Finish(job); } |
我在此刻故意省略了Finish()函数中的细节 - 稍后会讨论它。
什么是任务?
服从“保持简单”的要求,一个任务至少需要存储两个东西:一个指向被执行函数的指针,一个可选的父任务。
此外,我们需要一个计数器,让我们跟踪那些未完成的任务来处理父/子关系。为了避免假的共享,我们添加填充以确保任务对象占用至少一个整个缓存行:
1 2 3 4 5 6 7 | struct Job { JobFunction function; Job* parent; int32_t unfinishedJobs; // atomic char padding[]; }; |
请注意,未完成的任务成员被标记为原子。在Molecule引擎中,它是通过使用Windows操作系统上的Interlocked *函数来改变的。 使用C ++ 11的话,你可以使用std :: atomic类型。 我在这里也忽略了填充数组的大小,因为这个填充数组的大小会依据32位操作系统和64位操作系统而不同,并且由于涉及多个sizeof()运算符而使得代码复杂不直观。
与任务关联的任务函数接受两个参数:它所属的任务以及与任务相关的数据。
1 | typedef void (*JobFunction)(Job*, const void *); |
将数据与任务相关联
旧的任务调度器里面有一个东西我不太喜欢,那就是用户不得不持有任务的数据,直到任务完成。任务数据可以存储在堆栈中的时候,这不是问题,但有时候这会导致在堆中做不必要的内存分配,而这个堆我希望从新系统中删除掉。
幸运的是,存储属于任务的数据有一个简单的解决方案:我们可以将这些数据存储在我们的Job结构中!
填充数组是存储任务数据的理想选择。 这个数组没有用,我们还是需要它,为什么不把它用得好呢? 在Molecule引擎中,只要给定的数据符合大小的话,与任务相关的数据就会被填充到填充数组中 - 这是通过编译时检查来确保的。 如果数据太大而无法正确存储,用户可以随时从堆中分配数据,只需将指向数据的指针传递给任务系统就可以了。
添加任务
将任务推入系统总是分两步进行:首先是创建一个任务。 其次,这个任务被添加到系统中。 将这个操作分成两部分,使我们可以实现动态的并行化,这是前面提到的要求之一。
在C ++中,任务是使用以下任一函数创建的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | Job* CreateJob(JobFunction function) { Job* job = AllocateJob(); job->function = function; job->parent = nullptr; job->unfinishedJobs = 1; return job; } Job* CreateJobAsChild(Job* parent, JobFunction function) { atomic::Increment(&parent->unfinishedJobs); Job* job = AllocateJob(); job->function = function; job->parent = parent; job->unfinishedJobs = 1; return job; } |
请注意,我省略了那个接受额外数据的函数,这些额外数据是拷贝到填充数组的。现在,AllocateJob()函数通过调用new来简单地分配和返回一个新的Job对象。
可以看出,当创建一个任务作为一个已经存在的任务的子任务的时候,父任务的未完成任务这个成员变量会自动增大。这个操作需要以原子方式来完成,因为其他线程可能会将不同的任务添加为同一个任务的子项,从而导致数据的竞争。
向系统中添加新创建的任务是通过调用Run()函数来完成的:
1 2 3 4 5 | void Run(Job* job) { WorkStealingQueue* queue = GetWorkerThreadQueue(); queue->Push(job); } |
等待一个任务
当然,一旦我们为系统增加了一任务,我们需要能够检查这些任务是否完成,并在此期间做一些有意义的事情。这个事情是通过调用Wait()函数来完成的:
1 2 3 4 5 6 7 8 9 10 11 12 | void Wait( const Job* job) { // wait until the job has completed. in the meantime, work on any other job. while (!HasJobCompleted(job)) { Job* nextJob = GetJob(); if (nextJob) { Execute(nextJob); } } } |
通过将未完成的任务与0进行比较来确定任务是否已完成。如果计数器大于0,则任务本身或其任何子任务尚未完成。 如果计数器为零,则所有相关的任务都已完成。
实践该系统
下面的简单例子创建了一堆添加到系统中的单个空任务:
1 2 3 4 5 6 7 8 9 10 | void empty_job(Job*, const void *) { } for (unsigned int i=0; i < N; ++i) { Job* job = jobSystem::CreateJob(&empty_job); jobSystem::Run(job); jobSystem::Wait(job); } |
当然,这种做法是低效的,因为我们创建,运行,并孤立地等待每个任务。 尽管如此,这也是衡量任务系统创建,增加和运行任务方面的开销的一个很好的考验。
另一个例子再次创建单个任务,但将其作为一个根任务的子任务运行:
1 2 3 4 5 6 7 8 | Job* root = jobSystem::CreateJob(&empty_job); for (unsigned int i=0; i < N; ++i) { Job* job = jobSystem::CreateJobAsChild(root, &empty_job); jobSystem::Run(job); } jobSystem::Run(root); jobSystem::Wait(root); |
这样做效率更高,因为创建任务和运行任务现在可以与执行已经在系统中的任务并行完成。
完成任务和删除任务
我们差不多完成了。 我们仍然需要通过告诉父任务执行已经完成来正确完成任务。 我们需要删除我们分配的所有任务。
你可能会试图写这样的Finish()函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 | void Finish(Job* job) { const int32_t unfinishedJobs = atomic::Decrement(&job->unfinishedJobs); if (unfinishedJobs == 0) { if (job->parent) { Finish(job->parent); } delete job; } } |
我们首先以原子的方式来减少我们的未完成任务的计数器。 如前所述,一旦这个计数器达到0,说明这个任务和它的所有子任务都完成了,所以我们需要告诉我们的父任务这个信息。在这之后,我们可以删除这个任务,因为我们不再需要它了。然而,这里有一个不那么微妙的错误 - 你能发现它吗?
问题是我们现在不允许删除这个任务。 仍然有线程在等待这个确切的任务,并会调用HasJobCompleted()来检查这个任务是否已经完成。这将导致线程访问不再属于此进程的内存,导致访问冲突或读取垃圾值。
一个解决办法是推迟到以后的时间再删除这个任务,但你仍然要小心这个问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | void Finish(Job* job) { const int32_t unfinishedJobs = atomic::Decrement(&job->unfinishedJobs); if (unfinishedJobs == 0) { const int32_t index = atomic::Increment(&g_jobToDeleteCount); g_jobsToDelete[index-1] = job; if (job->parent) { Finish(job->parent); } } } |
我已经插入了代码,将要删除的任务存储在全局数组中,但这么做仍然是错误的。 错误的原因是,只要完成任务的线程减少了未完成任务这个成员变量的值,线程可能被抢先清空。如果你不够幸运的话,而这个特定任务又是上面例子中的根任务,那么继续尝试删除存储在数组中的所有任务将是灾难性的。
当然,有一种方法可以安全地做到这一点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | void Finish(Job* job) { const int32_t unfinishedJobs = atomic::Decrement(&job->unfinishedJobs); if (unfinishedJobs == 0) { const int32_t index = atomic::Increment(&g_jobToDeleteCount); g_jobsToDelete[index-1] = job; if (job->parent) { Finish(job->parent); } atomic::Decrement(&job->unfinishedJobs); } } |
请注意,该代码在将任务添加到全局数组后已经递减了计数器,并且已经通知了父任务。一个任务的完成现在通过未完成任务值为-1而不是0来通知。在根任务完成执行之后,删除为该帧分配的所有任务是安全的。
在这种特殊情况下,如果不使用原子指令,将unfinishedJobs设置为-1是安全的,但代码需要额外的编译器屏障(以及其他平台上的内存屏障)才能保证正确。
实现的细节
这里有关于如何实现这篇文章中提到的一些东西的一些注意事项:
l 对工作线程队列的访问可以通过使用线程本地索引来完成。在Windows / MSVC上,可以通过使用__declspec(thread)或是TlsAlloc来完成。
l 通过使用_mm_pause,Sleep(1),Sleep(0)或其他变量可以完成对线程时间片的占用。但是,您应该始终确保在没有任何操作的情况下,工作线程不会占用100%的中央处理器时间。事件,信号量或条件变量可用于这个情况的处理。
性能
使用上述任务系统,我进行了两个测试来测试系统的性能和开销。第一个测试会创建65000个单独运行的空任务,如上面的例子所示。第二个测试也创造了65000个任务,但是通过使用几个parallel_for循环递归地将他们的任务分成更小的任务。
性能测试采用主频为3.4 GHz的IntelCore i7-2600K CPU,具有4个带超线程(8个逻辑内核)的物理内核。
运行时间如下:
单任务:18.5毫秒。
parallel_for:5.3 毫秒。
这里有几件事值得注意:
l 使用parallel_for对于实际工作负载更具有代表性,因为创建和添加任务可以并行完成。
l 任务系统使用新建和删除来分配任务,这不是非常有效的做法。
l 任务窃取队列的实现使用了锁。
展望
下一次,我们将看看如何去掉创建和删除任务,简化我们的Finish()函数的过程。之后,我们将解决任务窃取队列的无锁实现问题。 最后但并非最不重要的是,我们将看看如何实现一些更高级的算法,比如将parallel_for用于这个工作系统。
我保证我们会把运行时间减少到目前运行时间的很小一部分。
声明
这篇文章假设了系统使用了x86架构和强大的内存模型。 如果您没有意识到这有什么潜在的影响的话,那么在其他平台上工作的时候,最好使用C ++ 11和std :: atomic以及顺序一致性。
【版权声明】
原文作者未做权利声明,视为共享知识产权进入公共领域,自动获得授权。