123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776 |
- /* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
- file Copyright.txt or https://cmake.org/licensing for details. */
- #include "cmWorkerPool.h"
- #include <algorithm>
- #include <array>
- #include <condition_variable>
- #include <cstddef>
- #include <deque>
- #include <functional>
- #include <mutex>
- #include <thread>
- #include <cm/memory>
- #include <cm3p/uv.h>
- #include "cmRange.h"
- #include "cmStringAlgorithms.h"
- #include "cmUVHandlePtr.h"
- /**
- * @brief libuv pipe buffer class
- */
- class cmUVPipeBuffer
- {
- public:
- using DataRange = cmRange<const char*>;
- using DataFunction = std::function<void(DataRange)>;
- /// On error the ssize_t argument is a non zero libuv error code
- using EndFunction = std::function<void(ssize_t)>;
- /**
- * Reset to construction state
- */
- void reset();
- /**
- * Initializes uv_pipe(), uv_stream() and uv_handle()
- * @return true on success
- */
- bool init(uv_loop_t* uv_loop);
- /**
- * Start reading
- * @return true on success
- */
- bool startRead(DataFunction dataFunction, EndFunction endFunction);
- //! libuv pipe
- uv_pipe_t* uv_pipe() const { return this->UVPipe_.get(); }
- //! uv_pipe() casted to libuv stream
- uv_stream_t* uv_stream() const
- {
- return static_cast<uv_stream_t*>(this->UVPipe_);
- }
- //! uv_pipe() casted to libuv handle
- uv_handle_t* uv_handle() { return static_cast<uv_handle_t*>(this->UVPipe_); }
- private:
- // -- Libuv callbacks
- static void UVAlloc(uv_handle_t* handle, size_t suggestedSize,
- uv_buf_t* buf);
- static void UVData(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf);
- cm::uv_pipe_ptr UVPipe_;
- std::vector<char> Buffer_;
- DataFunction DataFunction_;
- EndFunction EndFunction_;
- };
- void cmUVPipeBuffer::reset()
- {
- if (this->UVPipe_.get()) {
- this->EndFunction_ = nullptr;
- this->DataFunction_ = nullptr;
- this->Buffer_.clear();
- this->Buffer_.shrink_to_fit();
- this->UVPipe_.reset();
- }
- }
- bool cmUVPipeBuffer::init(uv_loop_t* uv_loop)
- {
- this->reset();
- if (!uv_loop) {
- return false;
- }
- int ret = this->UVPipe_.init(*uv_loop, 0, this);
- return (ret == 0);
- }
- bool cmUVPipeBuffer::startRead(DataFunction dataFunction,
- EndFunction endFunction)
- {
- if (!this->UVPipe_.get()) {
- return false;
- }
- if (!dataFunction || !endFunction) {
- return false;
- }
- this->DataFunction_ = std::move(dataFunction);
- this->EndFunction_ = std::move(endFunction);
- int ret = uv_read_start(this->uv_stream(), &cmUVPipeBuffer::UVAlloc,
- &cmUVPipeBuffer::UVData);
- return (ret == 0);
- }
- void cmUVPipeBuffer::UVAlloc(uv_handle_t* handle, size_t suggestedSize,
- uv_buf_t* buf)
- {
- auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(handle->data);
- pipe.Buffer_.resize(suggestedSize);
- buf->base = pipe.Buffer_.data();
- buf->len = static_cast<unsigned long>(pipe.Buffer_.size());
- }
- void cmUVPipeBuffer::UVData(uv_stream_t* stream, ssize_t nread,
- const uv_buf_t* buf)
- {
- auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(stream->data);
- if (nread > 0) {
- if (buf->base) {
- // Call data function
- pipe.DataFunction_(DataRange(buf->base, buf->base + nread));
- }
- } else if (nread < 0) {
- // Save the end function on the stack before resetting the pipe
- EndFunction efunc;
- efunc.swap(pipe.EndFunction_);
- // Reset pipe before calling the end function
- pipe.reset();
- // Call end function
- efunc((nread == UV_EOF) ? 0 : nread);
- }
- }
- /**
- * @brief External process management class
- */
- class cmUVReadOnlyProcess
- {
- public:
- // -- Types
- //! @brief Process settings
- struct SetupT
- {
- std::string WorkingDirectory;
- std::vector<std::string> Command;
- cmWorkerPool::ProcessResultT* Result = nullptr;
- bool MergedOutput = false;
- };
- // -- Const accessors
- SetupT const& Setup() const { return this->Setup_; }
- cmWorkerPool::ProcessResultT* Result() const { return this->Setup_.Result; }
- bool IsStarted() const { return this->IsStarted_; }
- bool IsFinished() const { return this->IsFinished_; }
- // -- Runtime
- void setup(cmWorkerPool::ProcessResultT* result, bool mergedOutput,
- std::vector<std::string> const& command,
- std::string const& workingDirectory = std::string());
- bool start(uv_loop_t* uv_loop, std::function<void()> finishedCallback);
- private:
- // -- Libuv callbacks
- static void UVExit(uv_process_t* handle, int64_t exitStatus, int termSignal);
- void UVPipeOutData(cmUVPipeBuffer::DataRange data) const;
- void UVPipeOutEnd(ssize_t error);
- void UVPipeErrData(cmUVPipeBuffer::DataRange data) const;
- void UVPipeErrEnd(ssize_t error);
- void UVTryFinish();
- // -- Setup
- SetupT Setup_;
- // -- Runtime
- bool IsStarted_ = false;
- bool IsFinished_ = false;
- std::function<void()> FinishedCallback_;
- std::vector<const char*> CommandPtr_;
- std::array<uv_stdio_container_t, 3> UVOptionsStdIO_;
- uv_process_options_t UVOptions_;
- cm::uv_process_ptr UVProcess_;
- cmUVPipeBuffer UVPipeOut_;
- cmUVPipeBuffer UVPipeErr_;
- };
- void cmUVReadOnlyProcess::setup(cmWorkerPool::ProcessResultT* result,
- bool mergedOutput,
- std::vector<std::string> const& command,
- std::string const& workingDirectory)
- {
- this->Setup_.WorkingDirectory = workingDirectory;
- this->Setup_.Command = command;
- this->Setup_.Result = result;
- this->Setup_.MergedOutput = mergedOutput;
- }
- bool cmUVReadOnlyProcess::start(uv_loop_t* uv_loop,
- std::function<void()> finishedCallback)
- {
- if (this->IsStarted() || !this->Result()) {
- return false;
- }
- // Reset result before the start
- this->Result()->reset();
- // Fill command string pointers
- if (!this->Setup().Command.empty()) {
- this->CommandPtr_.reserve(this->Setup().Command.size() + 1);
- for (std::string const& arg : this->Setup().Command) {
- this->CommandPtr_.push_back(arg.c_str());
- }
- this->CommandPtr_.push_back(nullptr);
- } else {
- this->Result()->ErrorMessage = "Empty command";
- }
- if (!this->Result()->error()) {
- if (!this->UVPipeOut_.init(uv_loop)) {
- this->Result()->ErrorMessage = "libuv stdout pipe initialization failed";
- }
- }
- if (!this->Result()->error()) {
- if (!this->UVPipeErr_.init(uv_loop)) {
- this->Result()->ErrorMessage = "libuv stderr pipe initialization failed";
- }
- }
- if (!this->Result()->error()) {
- // -- Setup process stdio options
- // stdin
- this->UVOptionsStdIO_[0].flags = UV_IGNORE;
- this->UVOptionsStdIO_[0].data.stream = nullptr;
- // stdout
- this->UVOptionsStdIO_[1].flags =
- static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
- this->UVOptionsStdIO_[1].data.stream = this->UVPipeOut_.uv_stream();
- // stderr
- this->UVOptionsStdIO_[2].flags =
- static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
- this->UVOptionsStdIO_[2].data.stream = this->UVPipeErr_.uv_stream();
- // -- Setup process options
- std::fill_n(reinterpret_cast<char*>(&this->UVOptions_),
- sizeof(this->UVOptions_), 0);
- this->UVOptions_.exit_cb = &cmUVReadOnlyProcess::UVExit;
- this->UVOptions_.file = this->CommandPtr_[0];
- this->UVOptions_.args = const_cast<char**>(this->CommandPtr_.data());
- this->UVOptions_.cwd = this->Setup_.WorkingDirectory.c_str();
- this->UVOptions_.flags = UV_PROCESS_WINDOWS_HIDE;
- #if UV_VERSION_MAJOR > 1 || !defined(CMAKE_USE_SYSTEM_LIBUV)
- this->UVOptions_.flags |= UV_PROCESS_WINDOWS_USE_PARENT_ERROR_MODE;
- #endif
- this->UVOptions_.stdio_count =
- static_cast<int>(this->UVOptionsStdIO_.size());
- this->UVOptions_.stdio = this->UVOptionsStdIO_.data();
- // -- Spawn process
- int uvErrorCode = this->UVProcess_.spawn(*uv_loop, this->UVOptions_, this);
- if (uvErrorCode != 0) {
- this->Result()->ErrorMessage = "libuv process spawn failed";
- if (const char* uvErr = uv_strerror(uvErrorCode)) {
- this->Result()->ErrorMessage += ": ";
- this->Result()->ErrorMessage += uvErr;
- }
- }
- }
- // -- Start reading from stdio streams
- if (!this->Result()->error()) {
- if (!this->UVPipeOut_.startRead(
- [this](cmUVPipeBuffer::DataRange range) {
- this->UVPipeOutData(range);
- },
- [this](ssize_t error) { this->UVPipeOutEnd(error); })) {
- this->Result()->ErrorMessage =
- "libuv start reading from stdout pipe failed";
- }
- }
- if (!this->Result()->error()) {
- if (!this->UVPipeErr_.startRead(
- [this](cmUVPipeBuffer::DataRange range) {
- this->UVPipeErrData(range);
- },
- [this](ssize_t error) { this->UVPipeErrEnd(error); })) {
- this->Result()->ErrorMessage =
- "libuv start reading from stderr pipe failed";
- }
- }
- if (!this->Result()->error()) {
- this->IsStarted_ = true;
- this->FinishedCallback_ = std::move(finishedCallback);
- } else {
- // Clear libuv handles and finish
- this->UVProcess_.reset();
- this->UVPipeOut_.reset();
- this->UVPipeErr_.reset();
- this->CommandPtr_.clear();
- }
- return this->IsStarted();
- }
- void cmUVReadOnlyProcess::UVExit(uv_process_t* handle, int64_t exitStatus,
- int termSignal)
- {
- auto& proc = *reinterpret_cast<cmUVReadOnlyProcess*>(handle->data);
- if (proc.IsStarted() && !proc.IsFinished()) {
- // Set error message on demand
- proc.Result()->ExitStatus = exitStatus;
- proc.Result()->TermSignal = termSignal;
- if (proc.Result()->ErrorMessage.empty()) {
- if (termSignal != 0) {
- proc.Result()->ErrorMessage = cmStrCat(
- "Process was terminated by signal ", proc.Result()->TermSignal);
- } else if (exitStatus != 0) {
- proc.Result()->ErrorMessage = cmStrCat(
- "Process failed with return value ", proc.Result()->ExitStatus);
- }
- }
- // Reset process handle
- proc.UVProcess_.reset();
- // Try finish
- proc.UVTryFinish();
- }
- }
- void cmUVReadOnlyProcess::UVPipeOutData(cmUVPipeBuffer::DataRange data) const
- {
- this->Result()->StdOut.append(data.begin(), data.end());
- }
- void cmUVReadOnlyProcess::UVPipeOutEnd(ssize_t error)
- {
- // Process pipe error
- if ((error != 0) && !this->Result()->error()) {
- this->Result()->ErrorMessage = cmStrCat(
- "Reading from stdout pipe failed with libuv error code ", error);
- }
- // Try finish
- this->UVTryFinish();
- }
- void cmUVReadOnlyProcess::UVPipeErrData(cmUVPipeBuffer::DataRange data) const
- {
- std::string* str = this->Setup_.MergedOutput ? &this->Result()->StdOut
- : &this->Result()->StdErr;
- str->append(data.begin(), data.end());
- }
- void cmUVReadOnlyProcess::UVPipeErrEnd(ssize_t error)
- {
- // Process pipe error
- if ((error != 0) && !this->Result()->error()) {
- this->Result()->ErrorMessage = cmStrCat(
- "Reading from stderr pipe failed with libuv error code ", error);
- }
- // Try finish
- this->UVTryFinish();
- }
- void cmUVReadOnlyProcess::UVTryFinish()
- {
- // There still might be data in the pipes after the process has finished.
- // Therefore check if the process is finished AND all pipes are closed
- // before signaling the worker thread to continue.
- if ((this->UVProcess_.get()) || (this->UVPipeOut_.uv_pipe()) ||
- (this->UVPipeErr_.uv_pipe())) {
- return;
- }
- this->IsFinished_ = true;
- this->FinishedCallback_();
- }
- /**
- * @brief Worker pool worker thread
- */
- class cmWorkerPoolWorker
- {
- public:
- cmWorkerPoolWorker(uv_loop_t& uvLoop);
- ~cmWorkerPoolWorker();
- cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
- cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;
- /**
- * Set the internal thread
- */
- void SetThread(std::thread&& aThread) { this->Thread_ = std::move(aThread); }
- /**
- * Run an external process
- */
- bool RunProcess(cmWorkerPool::ProcessResultT& result,
- std::vector<std::string> const& command,
- std::string const& workingDirectory);
- private:
- // -- Libuv callbacks
- static void UVProcessStart(uv_async_t* handle);
- void UVProcessFinished();
- // -- Process management
- struct
- {
- std::mutex Mutex;
- cm::uv_async_ptr Request;
- std::condition_variable Condition;
- std::unique_ptr<cmUVReadOnlyProcess> ROP;
- } Proc_;
- // -- System thread
- std::thread Thread_;
- };
- cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
- {
- this->Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
- }
- cmWorkerPoolWorker::~cmWorkerPoolWorker()
- {
- if (this->Thread_.joinable()) {
- this->Thread_.join();
- }
- }
- bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
- std::vector<std::string> const& command,
- std::string const& workingDirectory)
- {
- if (command.empty()) {
- return false;
- }
- // Create process instance
- {
- std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
- this->Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>();
- this->Proc_.ROP->setup(&result, true, command, workingDirectory);
- }
- // Send asynchronous process start request to libuv loop
- this->Proc_.Request.send();
- // Wait until the process has been finished and destroyed
- {
- std::unique_lock<std::mutex> ulock(this->Proc_.Mutex);
- while (this->Proc_.ROP) {
- this->Proc_.Condition.wait(ulock);
- }
- }
- return !result.error();
- }
- void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
- {
- auto* worker = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
- bool startFailed = false;
- {
- auto& Proc = worker->Proc_;
- std::lock_guard<std::mutex> lock(Proc.Mutex);
- if (Proc.ROP && !Proc.ROP->IsStarted()) {
- startFailed = !Proc.ROP->start(
- handle->loop, [worker] { worker->UVProcessFinished(); });
- }
- }
- // Clean up if starting of the process failed
- if (startFailed) {
- worker->UVProcessFinished();
- }
- }
- void cmWorkerPoolWorker::UVProcessFinished()
- {
- std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
- if (this->Proc_.ROP &&
- (this->Proc_.ROP->IsFinished() || !this->Proc_.ROP->IsStarted())) {
- this->Proc_.ROP.reset();
- }
- // Notify idling thread
- this->Proc_.Condition.notify_one();
- }
- /**
- * @brief Private worker pool internals
- */
- class cmWorkerPoolInternal
- {
- public:
- // -- Constructors
- cmWorkerPoolInternal(cmWorkerPool* pool);
- ~cmWorkerPoolInternal();
- /**
- * Runs the libuv loop.
- */
- bool Process();
- /**
- * Clear queue and abort threads.
- */
- void Abort();
- /**
- * Push a job to the queue and notify a worker.
- */
- bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
- /**
- * Worker thread main loop method.
- */
- void Work(unsigned int workerIndex);
- // -- Request slots
- static void UVSlotBegin(uv_async_t* handle);
- static void UVSlotEnd(uv_async_t* handle);
- // -- UV loop
- std::unique_ptr<uv_loop_t> UVLoop;
- cm::uv_async_ptr UVRequestBegin;
- cm::uv_async_ptr UVRequestEnd;
- // -- Thread pool and job queue
- std::mutex Mutex;
- bool Processing = false;
- bool Aborting = false;
- bool FenceProcessing = false;
- unsigned int WorkersRunning = 0;
- unsigned int WorkersIdle = 0;
- unsigned int JobsProcessing = 0;
- std::deque<cmWorkerPool::JobHandleT> Queue;
- std::condition_variable Condition;
- std::condition_variable ConditionFence;
- std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
- // -- References
- cmWorkerPool* Pool = nullptr;
- };
- void cmWorkerPool::ProcessResultT::reset()
- {
- this->ExitStatus = 0;
- this->TermSignal = 0;
- if (!this->StdOut.empty()) {
- this->StdOut.clear();
- this->StdOut.shrink_to_fit();
- }
- if (!this->StdErr.empty()) {
- this->StdErr.clear();
- this->StdErr.shrink_to_fit();
- }
- if (!this->ErrorMessage.empty()) {
- this->ErrorMessage.clear();
- this->ErrorMessage.shrink_to_fit();
- }
- }
- cmWorkerPoolInternal::cmWorkerPoolInternal(cmWorkerPool* pool)
- : Pool(pool)
- {
- // Initialize libuv loop
- uv_disable_stdio_inheritance();
- this->UVLoop = cm::make_unique<uv_loop_t>();
- uv_loop_init(this->UVLoop.get());
- }
- cmWorkerPoolInternal::~cmWorkerPoolInternal()
- {
- uv_loop_close(this->UVLoop.get());
- }
- bool cmWorkerPoolInternal::Process()
- {
- // Reset state flags
- this->Processing = true;
- this->Aborting = false;
- // Initialize libuv asynchronous request
- this->UVRequestBegin.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotBegin,
- this);
- this->UVRequestEnd.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotEnd,
- this);
- // Send begin request
- this->UVRequestBegin.send();
- // Run libuv loop
- bool success = (uv_run(this->UVLoop.get(), UV_RUN_DEFAULT) == 0);
- // Update state flags
- this->Processing = false;
- this->Aborting = false;
- return success;
- }
- void cmWorkerPoolInternal::Abort()
- {
- // Clear all jobs and set abort flag
- std::lock_guard<std::mutex> guard(this->Mutex);
- if (!this->Aborting) {
- // Register abort and clear queue
- this->Aborting = true;
- this->Queue.clear();
- this->Condition.notify_all();
- }
- }
- inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle)
- {
- std::lock_guard<std::mutex> guard(this->Mutex);
- if (this->Aborting) {
- return false;
- }
- // Append the job to the queue
- this->Queue.emplace_back(std::move(jobHandle));
- // Notify an idle worker if there's one
- if (this->WorkersIdle != 0) {
- this->Condition.notify_one();
- }
- // Return success
- return true;
- }
- void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle)
- {
- auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
- // Create worker threads
- {
- unsigned int const num = gint.Pool->ThreadCount();
- // Create workers
- gint.Workers.reserve(num);
- for (unsigned int ii = 0; ii != num; ++ii) {
- gint.Workers.emplace_back(
- cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
- }
- // Start worker threads
- for (unsigned int ii = 0; ii != num; ++ii) {
- gint.Workers[ii]->SetThread(
- std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
- }
- }
- // Destroy begin request
- gint.UVRequestBegin.reset();
- }
- void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle)
- {
- auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
- // Join and destroy worker threads
- gint.Workers.clear();
- // Destroy end request
- gint.UVRequestEnd.reset();
- }
- void cmWorkerPoolInternal::Work(unsigned int workerIndex)
- {
- cmWorkerPool::JobHandleT jobHandle;
- std::unique_lock<std::mutex> uLock(this->Mutex);
- // Increment running workers count
- ++this->WorkersRunning;
- // Enter worker main loop
- while (true) {
- // Abort on request
- if (this->Aborting) {
- break;
- }
- // Wait for new jobs on the main CV
- if (this->Queue.empty()) {
- ++this->WorkersIdle;
- this->Condition.wait(uLock);
- --this->WorkersIdle;
- continue;
- }
- // If there is a fence currently active or waiting,
- // sleep on the main CV and try again.
- if (this->FenceProcessing) {
- this->Condition.wait(uLock);
- continue;
- }
- // Pop next job from queue
- jobHandle = std::move(this->Queue.front());
- this->Queue.pop_front();
- // Check for fence jobs
- bool raisedFence = false;
- if (jobHandle->IsFence()) {
- this->FenceProcessing = true;
- raisedFence = true;
- // Wait on the Fence CV until all pending jobs are done.
- while (this->JobsProcessing != 0 && !this->Aborting) {
- this->ConditionFence.wait(uLock);
- }
- // When aborting, explicitly kick all threads alive once more.
- if (this->Aborting) {
- this->FenceProcessing = false;
- this->Condition.notify_all();
- break;
- }
- }
- // Unlocked scope for job processing
- ++this->JobsProcessing;
- {
- uLock.unlock();
- jobHandle->Work(this->Pool, workerIndex); // Process job
- jobHandle.reset(); // Destroy job
- uLock.lock();
- }
- --this->JobsProcessing;
- // If this was the thread that entered fence processing
- // originally, notify all idling workers that the fence
- // is done.
- if (raisedFence) {
- this->FenceProcessing = false;
- this->Condition.notify_all();
- }
- // If fence processing is still not done, notify the
- // the fencing worker when all active jobs are done.
- if (this->FenceProcessing && this->JobsProcessing == 0) {
- this->ConditionFence.notify_all();
- }
- }
- // Decrement running workers count
- if (--this->WorkersRunning == 0) {
- // Last worker thread about to finish. Send libuv event.
- this->UVRequestEnd.send();
- }
- }
- cmWorkerPool::JobT::~JobT() = default;
- bool cmWorkerPool::JobT::RunProcess(ProcessResultT& result,
- std::vector<std::string> const& command,
- std::string const& workingDirectory)
- {
- // Get worker by index
- auto* worker = this->Pool_->Int_->Workers.at(this->WorkerIndex_).get();
- return worker->RunProcess(result, command, workingDirectory);
- }
- cmWorkerPool::cmWorkerPool()
- : Int_(cm::make_unique<cmWorkerPoolInternal>(this))
- {
- }
- cmWorkerPool::~cmWorkerPool() = default;
- void cmWorkerPool::SetThreadCount(unsigned int threadCount)
- {
- if (!this->Int_->Processing) {
- this->ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
- }
- }
- bool cmWorkerPool::Process(void* userData)
- {
- // Setup user data
- this->UserData_ = userData;
- // Run libuv loop
- bool success = this->Int_->Process();
- // Clear user data
- this->UserData_ = nullptr;
- // Return
- return success;
- }
- bool cmWorkerPool::PushJob(JobHandleT&& jobHandle)
- {
- return this->Int_->PushJob(std::move(jobHandle));
- }
- void cmWorkerPool::Abort()
- {
- this->Int_->Abort();
- }
|