当前位置: 首页 > news >正文

移植pbrt中的并行化到ray trace in weeks中

整个复现pbrt的代码,劝退性太高了,我的做法是根据raytrace in weeks 的系列去做一个修改

首先是解决raytrace in weeks中渲染太慢的问题,因为他是一个像素一个像素处理的。

pbrt的做法

  1. 首先pbrt中的所有的任务不管是2d的块任务,还是1d的,继承自一个ParallelJob,这个job只有一个static 的线程池,理论上只会有这一个线程池。
  2. 然后任务通过链表的形式加在线程池中。
  3. 这里有一个很妙的点,就是比如说线程是我是一个线程公司。然后有其他的任务公司,公司告诉我,我的任务需要出你的多少工人,平均每人负责多少大任务,大任务下多少小任务才能做完,并且告诉你小任务的数量。
  4. 然后我每次工人会去遍历这个公司的任务,看看你现在的任务还缺不缺人,如果你现在的任务缺人,那这个工人就开始做你的一个大任务,然后释放任务锁,其他的工人就可以继续去找,等到一个大任务做完了,这个工人就可以去找其他的任务了。
  5. 这样的优点在于,如果我将所有的任务分线程,那么就会将时间长时间的浪费在选任务上。所以我需要你接手这个公司的业务,你就要多做一点,但是你不能一直在做,你做了一部分,其他部分可以由别的工人来做,所以就区分任务块。
  6. 但是这里它的做法是一次处理一个公司的任务,也就是不会说去做其他的任务。

结合ray trace in weeks的做法

首先是raytrace in weeks中对于每个像素的写入,这里是msaa的方式去写入每个像素

ParallelFor2D(image, [&](Point2i p){color pixel_color(0,0,0);for(int sampleu = 0; sampleu < sqrt_spp; sampleu++){for(int samplev = 0; samplev < sqrt_spp; samplev++){Ray r = get_ray(p.x, p.y, sampleu, samplev);}}
});

这里开始分配对应的任务,也就是这里划分多个大任务,其中包含了多个小任务

inline void ParallelFor2D(const Bounds2i& extent, const std::function<void(Point2i)>& func)
{ParallelFor2D(extent, [&](const Bounds2i& b){for(const Point2i& p : b){func(p);}});
}

pbrt中设置了一个对先有线程的扩充。意思就是总小任务的区域。然后设置了8倍的线程去分配,得到每个8倍线程每个线程应该负责的小任务的区域,然后得到边长clamp。这里的目的就是得到一个线程最少执行多少个任务边长,这样不至于让每个线程执行很多的任务,也不会让线程不执行任务。也就是提高并行程度。

int tileSize = std::clamp((int)(
extent.Diagonal().x * extent.Diagonal().y / (8 * RunningThreads())), 1, 32
);

ThreadPool

  1. 这个线程池子类就是让线程在这里去得到对应的任务。全局维护一个这样的类,然后这个类维护一个任务链表。
  2. 这个池子维护一个锁去维护它的任务池子,每个主任务都可以得到该锁,一次只有一个线程可以访问任务池子和对应的任务。
  3. 一开始的时候需要去创建这个线程池子,在这个池子中有若干的工作线程和一个主线程,通过isEnqueuingThread去区分。所以这个线程池会再创建n-1个线程
  4. 每个线程都会进入到一个分配函数。这个函数WorkOrWait会判断当前是什么线程,如果是工作线程还需要判断是不是禁用了线程池。然后就会在线程池维护的任务队列中去找需要工作的任务,如果有就runstep去分配当前的线程任务。
  5. 当一个任务结束,我们需要把它移出去。因为这里一定是集合线程的力量去做一个队列的任务,所以当一个任务移除的时候,就意味着该去找下一个任务了。
  6. 最后当这个池子需要被销毁的时候,我们需要保证所有的线程任务执行完毕。
class ThreadPool
{
public:explicit ThreadPool(int nThreads);~ThreadPool();size_t size() const { return threads.size(); }std::unique_lock<std::mutex> AddToJobList(ParallelJob* job);void RemoveFromJobList(ParallelJob* job);void WorkOrWait(std::unique_lock<std::mutex>* lock, bool isEnqueuingThread);
private:void Worker();
private:std::vector<std::thread> threads;ParallelJob* jobLists = nullptr;mutable std::mutex mutex;bool shutdownThreads = false;bool disabled = false;std::condition_variable condition;
};ThreadPool::ThreadPool(int nThreads)
{for(int i = 0; i < nThreads - 1; i++){threads.push_back(std::thread(&ThreadPool::Worker, this));}
}//单个工作线程会一直执行
void ThreadPool::Worker()
{std::unique_lock<std::mutex> lock(mutex);while(!shutdownThreads){WorkOrWait(&lock, false);}
}std::unique_lock<std::mutex> ThreadPool::AddToJobList(ParallelJob* job)
{std::unique_lock<std::mutex> lock(mutex);if(jobLists){jobLists->prev = job;}job->next = jobLists;jobLists = job;condition.notify_all();return lock;
}void ThreadPool::WorkOrWait(std::unique_lock<std::mutex>* lock, bool isEnqueuingThread)
{if(!lock->owns_lock()){assert("lock is not owned by current thread");exit(-1);}// Return if this is a worker thread and the thread pool is disabledif(!isEnqueuingThread && disabled){condition.wait(*lock); //release lock and block itself, waiting for notifyreturn;}ParallelJob* job = jobLists;while(job && !job->HaveWork()){job = job->next;}if(job){job->activeWorkers++;job->RunStep(lock);if(lock->owns_lock()){assert("you need to release lock before return");}lock->lock();job->activeWorkers--;if(job->Finished()){condition.notify_all();}}else{condition.wait(*lock);}
}ThreadPool::~ThreadPool()
{if(threads.empty()){return;}{std::lock_guard<std::mutex> lock(mutex);shutdownThreads = true;condition.notify_all();}for(auto& thread : threads){thread.join();}
}void ThreadPool::RemoveFromJobList(ParallelJob* job)
{if(job->prev){job->prev->next = job->next;}else{jobLists = job->next;}if(job->next){job->next->prev = job->prev;}job->removed = true;
}

主线程就是true

inline void ParallelFor2D(const Bounds2i& extent, const std::function<void(const Bounds2i)>& func)
{if(extent.IsEmpty()){return;}else if(extent.Area() == 1){func(extent);}int tileSize = std::clamp((int)(extent.Diagonal().x * extent.Diagonal().y / (8 * RunningThreads())), 1, 32);ParallelForLoop2D loop(extent, tileSize, std::move(func));std::unique_ptr<std::mutex> lock = ParallelJob::threadPool->AddToJobList(&loop);while(!loop.Finished()){ParallelJob::threadPool->WorkOrWait(&lock, true);}
} 
  1. 当主线程进来以后线程池锁住整个线程池,判断有任务可以做,然后,对应的任务runstep也就是当前的线程去做这个任务的一部分的任务。
  2. 之前阻塞的线程池

Parallel2D

  1. 这个类去记录一个主任务,我们创建了一个这种任务以后给他加入到任务池中。
  2. 然后这个主任务去维护一个链表,对应的就是线程池中的队列。
  3. 我们需要然线程池知道如何去做一部分的小任务,需要去知道这个任务是否还需要线程,是否做完。
void ParallelForLoop2D::RunStep(std::unique_lock<std::mutex>* lock)
{Point2i end = nextStart + Vector2i(chunkSize, chunkSize);Bounds2i b = Intersect(Bounds2i(nextStart, end), extent);if(b.IsEmpty()){assert("bounds is empty");return;}nextStart.x += chunkSize;if(nextStart.x >= extent.pMax.x){nextStart.x = extent.pMin.x;nextStart.y += chunkSize;}if(!HaveWork()){threadPool->RemoveFromJobList(this);}lock->unlock();func(b);
}
class ParallelJob
{
public:virtual ~ParallelJob() { assert(removed && "ParallelJob is being destroyed without being removed!"); }virtual bool HaveWork() const = 0;virtual void RunStep(std::unique_lock<std::mutex>* lock) = 0;bool Finished() const { return !HaveWork() && activeWorkers == 0; }virtual std::string ToString() const = 0;static ThreadPool* threadPool;
protected:std::string BasicToString() const {char resString[256];sprintf(resString, "activeWorkers: %d removed: %s", activeWorkers, removed);return std::string(resString);}
private:friend class ThreadPool;bool removed = false;int activeWorkers = 0;ParallelJob* prev = nullptr, *next = nullptr;
};class ParallelForLoop2D : public ParallelJob
{
public:ParallelForLoop2D(const Bounds2i& extent, int chunkSize, std::function<void(Bounds2i)> func) : extent(extent) ,nextStart(extent.pMin),chunkSize(chunkSize),func(std::move(func)){}virtual std::string ToString() const override{return BasicToString();}virtual bool HaveWork() const override { return nextStart.y < extent.pMax.y; }virtual void RunStep(std::unique_lock<std::mutex>* lock) override;
private:std::function<void(Bounds2i)> func;const Bounds2i extent;Point2i nextStart;int chunkSize;
};inline int RunningThreads()
{return ParallelJob::threadPool ? (1 + ParallelJob::threadPool->size()) : 1;
}inline void ParallelFor2D(const Bounds2i& extent, const std::function<void(const Bounds2i)>& func)
{if(extent.IsEmpty()){return;}else if(extent.Area() == 1){func(extent);}int tileSize = std::clamp((int)(extent.Diagonal().x * extent.Diagonal().y / (8 * RunningThreads())), 1, 32);ParallelForLoop2D loop(extent, tileSize, std::move(func));std::unique_lock<std::mutex> lock = ParallelJob::threadPool->AddToJobList(&loop);while(!loop.Finished()){ParallelJob::threadPool->WorkOrWait(&lock, true);}
} inline void ParallelFor2D(const Bounds2i& extent, const std::function<void(Point2i)>& func)
{ParallelFor2D(extent, [&](const Bounds2i& b){for(const Point2i& p : b){func(p);}});
}
#endif

结果

最终的耗时从390多ms,变为了59ms

整个代码

  1. parallel.h
#ifndef PARALLEL_H
#define PARALLEL_H
#include <thread>
#include <mutex>
#include <condition_variable>
#include <rtweekend.h>
#include "vecmath.h"
#include <stdio.h>
#include <cassert>
inline int AvaliableCores()
{return std::max<int>(1, std::thread::hardware_concurrency());
}class ParallelJob;class ThreadPool
{
public:explicit ThreadPool(int nThreads);~ThreadPool();size_t size() const { return threads.size(); }std::unique_lock<std::mutex> AddToJobList(ParallelJob* job);void RemoveFromJobList(ParallelJob* job);void WorkOrWait(std::unique_lock<std::mutex>* lock, bool isEnqueuingThread);
private:void Worker();
private:std::vector<std::thread> threads;ParallelJob* jobLists = nullptr;mutable std::mutex mutex;bool shutdownThreads = false;bool disabled = false;std::condition_variable condition;
};class ParallelJob
{
public:virtual ~ParallelJob() { assert(removed && "ParallelJob is being destroyed without being removed!"); }virtual bool HaveWork() const = 0;virtual void RunStep(std::unique_lock<std::mutex>* lock) = 0;bool Finished() const { return !HaveWork() && activeWorkers == 0; }virtual std::string ToString() const = 0;static ThreadPool* threadPool;
protected:std::string BasicToString() const {char resString[256];sprintf(resString, "activeWorkers: %d removed: %s", activeWorkers, removed);return std::string(resString);}
private:friend class ThreadPool;bool removed = false;int activeWorkers = 0;ParallelJob* prev = nullptr, *next = nullptr;
};class ParallelForLoop2D : public ParallelJob
{
public:ParallelForLoop2D(const Bounds2i& extent, int chunkSize, std::function<void(Bounds2i)> func) : extent(extent) ,nextStart(extent.pMin),chunkSize(chunkSize),func(std::move(func)){}virtual std::string ToString() const override{return BasicToString();}virtual bool HaveWork() const override { return nextStart.y < extent.pMax.y; }virtual void RunStep(std::unique_lock<std::mutex>* lock) override;
private:std::function<void(Bounds2i)> func;const Bounds2i extent;Point2i nextStart;int chunkSize;
};inline int RunningThreads()
{return ParallelJob::threadPool ? (1 + ParallelJob::threadPool->size()) : 1;
}inline void ParallelFor2D(const Bounds2i& extent, const std::function<void(const Bounds2i)>& func)
{if(extent.IsEmpty()){return;}else if(extent.Area() == 1){func(extent);}int tileSize = std::clamp((int)(extent.Diagonal().x * extent.Diagonal().y / (8 * RunningThreads())), 1, 32);ParallelForLoop2D loop(extent, tileSize, std::move(func));std::unique_lock<std::mutex> lock = ParallelJob::threadPool->AddToJobList(&loop);while(!loop.Finished()){ParallelJob::threadPool->WorkOrWait(&lock, true);}
} inline void ParallelFor2D(const Bounds2i& extent, const std::function<void(Point2i)>& func)
{ParallelFor2D(extent, [&](const Bounds2i& b){for(const Point2i& p : b){func(p);}});
}
#endif
  1. parallel.cpp
#include "parallel.h"ThreadPool::ThreadPool(int nThreads)
{for(int i = 0; i < nThreads - 1; i++){threads.push_back(std::thread(&ThreadPool::Worker, this));}
}ThreadPool *ParallelJob::threadPool = new ThreadPool(AvaliableCores());
void ThreadPool::Worker()
{std::unique_lock<std::mutex> lock(mutex);while(!shutdownThreads){WorkOrWait(&lock, false);}
}std::unique_lock<std::mutex> ThreadPool::AddToJobList(ParallelJob* job)
{std::unique_lock<std::mutex> lock(mutex);if(jobLists){jobLists->prev = job;}job->next = jobLists;jobLists = job;condition.notify_all();return lock;
}void ThreadPool::WorkOrWait(std::unique_lock<std::mutex>* lock, bool isEnqueuingThread)
{if(!lock->owns_lock()){assert("lock is not owned by current thread");exit(-1);}// Return if this is a worker thread and the thread pool is disabledif(!isEnqueuingThread && disabled){condition.wait(*lock); //release lock and block itself, waiting for notifyreturn;}ParallelJob* job = jobLists;while(job && !job->HaveWork()){job = job->next;}if(job){job->activeWorkers++;job->RunStep(lock);if(lock->owns_lock()){assert("you need to release lock before return");}lock->lock();job->activeWorkers--;if(job->Finished()){condition.notify_all();}}else{condition.wait(*lock);}
}ThreadPool::~ThreadPool()
{if(threads.empty()){return;}{std::lock_guard<std::mutex> lock(mutex);shutdownThreads = true;condition.notify_all();}for(auto& thread : threads){thread.join();}
}void ThreadPool::RemoveFromJobList(ParallelJob* job)
{if(job->prev){job->prev->next = job->next;}else{jobLists = job->next;}if(job->next){job->next->prev = job->prev;}job->removed = true;
}void ParallelForLoop2D::RunStep(std::unique_lock<std::mutex>* lock)
{Point2i end = nextStart + Vector2i(chunkSize, chunkSize);Bounds2i b = Intersect(Bounds2i(nextStart, end), extent);if(b.IsEmpty()){assert("bounds is empty");return;}nextStart.x += chunkSize;if(nextStart.x >= extent.pMax.x){nextStart.x = extent.pMin.x;nextStart.y += chunkSize;}if(!HaveWork()){threadPool->RemoveFromJobList(this);}lock->unlock();func(b);
}
http://www.lryc.cn/news/601879.html

相关文章:

  • LangGraph底层API入门总结
  • OpenLayers 综合案例-地图绘制
  • 十字链表以及实现
  • SpringAI入门及浅实践,实战 Spring‎ AI 调用大模型、提示词工程、对话记忆、Adv‎isor 的使用
  • 第五章 中央处理器(CPU)知识体系与考法总结
  • 【第六节】方法与事件处理器
  • Gradle#Plugin
  • Windows---动态链接库Dynamic Link Library(.dll)
  • 2025.7.27总结—新励成
  • Kubernetes 核心组件解析
  • HCIE学习之路:MSTP实现负载均衡实验
  • 【INT范围提取字符串数字为正数】2022-8-29
  • Leetcode 3628. Maximum Number of Subsequences After One Inserting
  • rust- 定义模块以控制作用域和隐私
  • 握手未来,PostgreSQL认证专家
  • 【I】题目解析
  • Spring AI 学习笔记
  • 小架构step系列27:Hibernate提供的validator
  • 「mysql」Mac osx彻底删除mysql
  • Java面试宝典:MySQL性能优化
  • uart通信
  • JVM类加载机制全流程详解
  • 从MySQL的information_schema系统数据库中获取表的元数据信息
  • MySQL - 索引(B+树)
  • Cgroup 控制组学习(三)在容器中使用 CGroups
  • MySQL - 主从复制与读写分离
  • Cline与Cursor深度实战指南:AI编程助手的革命性应用
  • 基于CNN图像特征提取流程(简化版)
  • Linux实战:从零搭建基于LNMP+NFS+DNS的WordPress博客系统
  • Flink窗口:解锁流计算的秘密武器