【GAD翻译馆】任务系统2.0:实现无锁的工作窃取(四):parrallel_for
译者:王磊(未来的未来) 审校:崔国军(飞扬971)
让我们从我们上次离开的地方继续,今天我们将讨论如何构建高层次的算法,比如将parallel_for用于我们的任务系统。
这个系列教程中的其他文章。
第一部分:介绍新任务系统的基础知识,并总结了任务窃取。
第二部分:详细介绍了线程本地分配机制。
第三部分:讨论了任务窃取队列的无锁实现。。
基本的想法
目前,我们所能做的就是将任务推送到系统中,让系统负责在所有可用内核之间进行负载均衡。 到目前为止,这个系统并没有意识到一些任务可能会更小,有些任务可能会更大,并且他们可能需要花费不同的时间来完成。
游戏编程中的一个非常常见的情况是为固定数量的元素执行特定的任务,举个简单的例子来说,对数组/向量/范围的所有元素应用相同的变换。在这种情况下,转换可以是剔除10.000个边界框,为1.000个角色更新骨骼的曾是,或是对100.000个粒子做动画。不管任务具体是什么,我们可以(也应该)利用一些数据上的并行性。
这是parallel_for等高级算法的起点。
从概念上讲,parallel_for背后的想法非常简单。给定一定范围内的元素务应该将被成几乎相等的部分,并通过将新创建的任务派到系统中来分配单个工作量。对于每个创建的新范围,都应该调用用户提供的函数,以便在给定的元素范围内执行实际的工作。
作为一个例子,让我们考虑下如何去更新粒子。使用最简单的形式,这可能只不过是一个接受数据数组的自由函数,带有存储在数组中的元素的数量:
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
void UpdateParticles(Particle* particles, unsigned int count);
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
假设我们有100,000个粒子需要更新,我们可以通过产生一个只需执行以下操作的任务来更新同一个核上的所有粒子:
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
UpdateParticles(particles, 100000);
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
如果我们想将更新分成4个大小相同的部分,我们该怎么办? 对于这种函数来说这个事情很简单:
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
UpdateParticles(particles, 25000);
UpdateParticles(particles + 25000, 25000);
UpdateParticles(particles + 50000, 25000);
UpdateParticles(particles + 75000, 25000);
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
将每个调用放入一个单独的任务之中可以自动地将工作负载分配给4个内核(如果可用的话)。 请注意,由于我们定义UpdateParticles()函数的方式,这非常的容易。
当然,所有这些事情都是手工操作的会是单调乏味,并且容易出错,所以这应该是parallel_for的责任。
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
Job* parallel_for(Particles* data, unsigned int count, void (*function)(Particles*, unsigned int))
{
const parallel_for_job_data jobData = { data, count, function };
Job* job = jobSystem::CreateJob(&jobs::parallel_for_job, jobData);
return job;
}
struct parallel_for_job_data
{
Particles* data;
unsigned int count;
void (*function)(Particles*, unsigned int);
};
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
这个实现简单地创建一个称为jobs :: parallel_for_job的新(根)任务,它负责递归地将范围划分为更小的范围。 然后返回的任务可以运行并由用户进行等待:、
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
Job* job = parallel_for(g_particles, 100000, &UpdateParticles)
jobSystem::Run(job);
jobSystem::Wait(job);
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
parallel_for_job本身也只有少数几行代码:
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
void parallel_for_job(Job* job, const void* jobData)
{
const parallel_for_job_data* data = static_cast<const parallel_for_job_data*>(jobData);
if (data->count > 256)
{
// split in two
const unsigned int leftCount = data->count / 2u;
const parallel_for_job_data leftData(data->data, leftCount, data->function);
Job* left = jobSystem::CreateJobAsChild(job, &jobs::parallel_for_job, leftData);
jobSystem::Run(left);
const unsigned int rightCount = data->count - leftCount;
const parallel_for_job_data rightData(data->data + leftCount, rightCount, data->function);
Job* right = jobSystem::CreateJobAsChild(job, &jobs::parallel_for_job, rightData);
jobSystem::Run(right);
}
else
{
// execute the function on the range of data
(data->function)(data->data, data->count);
}
}
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
可以看出,只要在该范围内剩余的元素超过256个,任务就将给定的范围分成两半。对于每一个新创建的范围,都会产生一个新的任务,作为给定任务的一个子任务,使用分而治之的策略有效地将初始范围切割成更小的片段。请注意,范围的分裂是与这些(和其他)任务的执行交错进行的的,这比一个任务做全部的分裂工作更有效率。
当然,在这个实现中至少有两件事情需要改变:
一个更通用的实现
通过为parallel_for函数和任务本身引入模板参数,可以实现对任何类型的范围进行接受。 此外,parallel_for_job_data还需要能够保存任意类型的指针。
类似地,可以通过由用户提供的附加模板参数将拆分策略送到parallel_for算法中。 这允许用户基于要执行的任务的内容而从不同的策略中进行选择。
通过增加上述两个参数,代码就变成了以下内容:
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
template <typename T, typename S>
Job* parallel_for(T* data, unsigned int count, void (*function)(T*, unsigned int), const S& splitter)
{
typedef parallel_for_job_data<T, S> JobData;
const JobData jobData(data, count, function, splitter);
Job* job = jobSystem::CreateJob(&jobs::parallel_for_job<JobData>, jobData);
return job;
}
template <typename T, typename S>
struct parallel_for_job_data
{
typedef T DataType;
typedef S SplitterType;
parallel_for_job_data(DataType* data, unsigned int count, void (*function)(DataType*, unsigned int), const SplitterType& splitter)
: data(data)
, count(count)
, function(function)
, splitter(splitter)
{
}
DataType* data;
unsigned int count;
void (*function)(DataType*, unsigned int);
SplitterType splitter;
};
template <typename JobData>
void parallel_for_job(Job* job, const void* jobData)
{
const JobData* data = static_cast<const JobData*>(jobData);
const JobData::SplitterType& splitter = data->splitter;
if (splitter.Split<JobData::DataType>(data->count))
{
// split in two
const unsigned int leftCount = data->count / 2u;
const JobData leftData(data->data, leftCount, data->function, splitter);
Job* left = jobSystem::CreateJobAsChild(job, &jobs::parallel_for_job<JobData>, leftData);
jobSystem::Run(left);
const unsigned int rightCount = data->count - leftCount;
const JobData rightData(data->data + leftCount, rightCount, data->function, splitter);
Job* right = jobSystem::CreateJobAsChild(job, &jobs::parallel_for_job<JobData>, rightData);
jobSystem::Run(right);
}
else
{
// execute the function on the range of data
(data->function)(data->data, data->count);
}
}
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
现在parallel_for的实现能够处理任意数据的范围,也能接受拆分策略作为附加参数。 一个有效的策略实现只需要提供一个Split()函数来决定给定的范围是否应该被进一步拆分,以下面两个实现为例:
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
class CountSplitter
{
public:
explicit CountSplitter(unsigned int count)
: m_count(count)
{
}
template <typename T>
inline bool Split(unsigned int count) const
{
return (count > m_count);
}
private:
unsigned int m_count;
};
class DataSizeSplitter
{
public:
explicit DataSizeSplitter(unsigned int size)
: m_size(size)
{
}
template <typename T>
inline bool Split(unsigned int count) const
{
return (count*sizeof(T) > m_size);
}
private:
unsigned int m_size;
};
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
CountSplitter仅仅根据元素的数量来分割一个范围,就像我们在前面的例子中看到的一样。
另一方面,DataSizeSplitter也是一个兼顾任务集大小的策略。举个简单的例子来说,在L1高速缓存大小为32KB的平台上,可以使用DataSizeSplitter(32 * 1024)来确保只要任务集不再适合L1高速缓存的大小,任务的范围就会被拆分。 这里的重要特点是,它可以跨所有类型的任务进行工作,无论他们是在处理粒子,动画还是剔除数据 – 拆分策略都会自动完成这些工作。
parallel_for的调用和以前一样,除了增加一个额外的splitter参数:
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
Job* job = parallel_for(g_particles, 100000, &UpdateParticles, DataSizeSplitter(32*1024));
jobSystem::Run(job);
jobSystem::Wait(job);
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
下一步的工作
到目前为止,parallel_for的实现只能调用自由函数。对这个系统进行升级应该很容易,它应该也可以接受成员函、,lambdas、std :: function等等。
展望
这个系列教程的下一篇文章可能将是这个系列教程的最后一部分,它将详细介绍如何处理任务之间的依赖关系。
【版权声明】
原文作者未做权利声明,视为共享知识产权进入公共领域,自动获得授权。