cmWorkerPool.cxx 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773
  1. /* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
  2. file Copyright.txt or https://cmake.org/licensing for details. */
  3. #include "cmWorkerPool.h"
  4. #include <algorithm>
  5. #include <array>
  6. #include <condition_variable>
  7. #include <cstddef>
  8. #include <deque>
  9. #include <functional>
  10. #include <mutex>
  11. #include <thread>
  12. #include <cm/memory>
  13. #include <cm3p/uv.h>
  14. #include "cmRange.h"
  15. #include "cmStringAlgorithms.h"
  16. #include "cmUVHandlePtr.h"
  17. /**
  18. * @brief libuv pipe buffer class
  19. */
  20. class cmUVPipeBuffer
  21. {
  22. public:
  23. using DataRange = cmRange<const char*>;
  24. using DataFunction = std::function<void(DataRange)>;
  25. /// On error the ssize_t argument is a non zero libuv error code
  26. using EndFunction = std::function<void(ssize_t)>;
  27. /**
  28. * Reset to construction state
  29. */
  30. void reset();
  31. /**
  32. * Initializes uv_pipe(), uv_stream() and uv_handle()
  33. * @return true on success
  34. */
  35. bool init(uv_loop_t* uv_loop);
  36. /**
  37. * Start reading
  38. * @return true on success
  39. */
  40. bool startRead(DataFunction dataFunction, EndFunction endFunction);
  41. //! libuv pipe
  42. uv_pipe_t* uv_pipe() const { return this->UVPipe_.get(); }
  43. //! uv_pipe() casted to libuv stream
  44. uv_stream_t* uv_stream() const
  45. {
  46. return static_cast<uv_stream_t*>(this->UVPipe_);
  47. }
  48. //! uv_pipe() casted to libuv handle
  49. uv_handle_t* uv_handle() { return static_cast<uv_handle_t*>(this->UVPipe_); }
  50. private:
  51. // -- Libuv callbacks
  52. static void UVAlloc(uv_handle_t* handle, size_t suggestedSize,
  53. uv_buf_t* buf);
  54. static void UVData(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf);
  55. cm::uv_pipe_ptr UVPipe_;
  56. std::vector<char> Buffer_;
  57. DataFunction DataFunction_;
  58. EndFunction EndFunction_;
  59. };
  60. void cmUVPipeBuffer::reset()
  61. {
  62. if (this->UVPipe_.get()) {
  63. this->EndFunction_ = nullptr;
  64. this->DataFunction_ = nullptr;
  65. this->Buffer_.clear();
  66. this->Buffer_.shrink_to_fit();
  67. this->UVPipe_.reset();
  68. }
  69. }
  70. bool cmUVPipeBuffer::init(uv_loop_t* uv_loop)
  71. {
  72. this->reset();
  73. if (!uv_loop) {
  74. return false;
  75. }
  76. int ret = this->UVPipe_.init(*uv_loop, 0, this);
  77. return (ret == 0);
  78. }
  79. bool cmUVPipeBuffer::startRead(DataFunction dataFunction,
  80. EndFunction endFunction)
  81. {
  82. if (!this->UVPipe_.get()) {
  83. return false;
  84. }
  85. if (!dataFunction || !endFunction) {
  86. return false;
  87. }
  88. this->DataFunction_ = std::move(dataFunction);
  89. this->EndFunction_ = std::move(endFunction);
  90. int ret = uv_read_start(this->uv_stream(), &cmUVPipeBuffer::UVAlloc,
  91. &cmUVPipeBuffer::UVData);
  92. return (ret == 0);
  93. }
  94. void cmUVPipeBuffer::UVAlloc(uv_handle_t* handle, size_t suggestedSize,
  95. uv_buf_t* buf)
  96. {
  97. auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(handle->data);
  98. pipe.Buffer_.resize(suggestedSize);
  99. buf->base = pipe.Buffer_.data();
  100. buf->len = static_cast<unsigned long>(pipe.Buffer_.size());
  101. }
  102. void cmUVPipeBuffer::UVData(uv_stream_t* stream, ssize_t nread,
  103. const uv_buf_t* buf)
  104. {
  105. auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(stream->data);
  106. if (nread > 0) {
  107. if (buf->base) {
  108. // Call data function
  109. pipe.DataFunction_(DataRange(buf->base, buf->base + nread));
  110. }
  111. } else if (nread < 0) {
  112. // Save the end function on the stack before resetting the pipe
  113. EndFunction efunc;
  114. efunc.swap(pipe.EndFunction_);
  115. // Reset pipe before calling the end function
  116. pipe.reset();
  117. // Call end function
  118. efunc((nread == UV_EOF) ? 0 : nread);
  119. }
  120. }
  121. /**
  122. * @brief External process management class
  123. */
  124. class cmUVReadOnlyProcess
  125. {
  126. public:
  127. // -- Types
  128. //! @brief Process settings
  129. struct SetupT
  130. {
  131. std::string WorkingDirectory;
  132. std::vector<std::string> Command;
  133. cmWorkerPool::ProcessResultT* Result = nullptr;
  134. bool MergedOutput = false;
  135. };
  136. // -- Const accessors
  137. SetupT const& Setup() const { return this->Setup_; }
  138. cmWorkerPool::ProcessResultT* Result() const { return this->Setup_.Result; }
  139. bool IsStarted() const { return this->IsStarted_; }
  140. bool IsFinished() const { return this->IsFinished_; }
  141. // -- Runtime
  142. void setup(cmWorkerPool::ProcessResultT* result, bool mergedOutput,
  143. std::vector<std::string> const& command,
  144. std::string const& workingDirectory = std::string());
  145. bool start(uv_loop_t* uv_loop, std::function<void()> finishedCallback);
  146. private:
  147. // -- Libuv callbacks
  148. static void UVExit(uv_process_t* handle, int64_t exitStatus, int termSignal);
  149. void UVPipeOutData(cmUVPipeBuffer::DataRange data) const;
  150. void UVPipeOutEnd(ssize_t error);
  151. void UVPipeErrData(cmUVPipeBuffer::DataRange data) const;
  152. void UVPipeErrEnd(ssize_t error);
  153. void UVTryFinish();
  154. // -- Setup
  155. SetupT Setup_;
  156. // -- Runtime
  157. bool IsStarted_ = false;
  158. bool IsFinished_ = false;
  159. std::function<void()> FinishedCallback_;
  160. std::vector<const char*> CommandPtr_;
  161. std::array<uv_stdio_container_t, 3> UVOptionsStdIO_;
  162. uv_process_options_t UVOptions_;
  163. cm::uv_process_ptr UVProcess_;
  164. cmUVPipeBuffer UVPipeOut_;
  165. cmUVPipeBuffer UVPipeErr_;
  166. };
  167. void cmUVReadOnlyProcess::setup(cmWorkerPool::ProcessResultT* result,
  168. bool mergedOutput,
  169. std::vector<std::string> const& command,
  170. std::string const& workingDirectory)
  171. {
  172. this->Setup_.WorkingDirectory = workingDirectory;
  173. this->Setup_.Command = command;
  174. this->Setup_.Result = result;
  175. this->Setup_.MergedOutput = mergedOutput;
  176. }
  177. bool cmUVReadOnlyProcess::start(uv_loop_t* uv_loop,
  178. std::function<void()> finishedCallback)
  179. {
  180. if (this->IsStarted() || !this->Result()) {
  181. return false;
  182. }
  183. // Reset result before the start
  184. this->Result()->reset();
  185. // Fill command string pointers
  186. if (!this->Setup().Command.empty()) {
  187. this->CommandPtr_.reserve(this->Setup().Command.size() + 1);
  188. for (std::string const& arg : this->Setup().Command) {
  189. this->CommandPtr_.push_back(arg.c_str());
  190. }
  191. this->CommandPtr_.push_back(nullptr);
  192. } else {
  193. this->Result()->ErrorMessage = "Empty command";
  194. }
  195. if (!this->Result()->error()) {
  196. if (!this->UVPipeOut_.init(uv_loop)) {
  197. this->Result()->ErrorMessage = "libuv stdout pipe initialization failed";
  198. }
  199. }
  200. if (!this->Result()->error()) {
  201. if (!this->UVPipeErr_.init(uv_loop)) {
  202. this->Result()->ErrorMessage = "libuv stderr pipe initialization failed";
  203. }
  204. }
  205. if (!this->Result()->error()) {
  206. // -- Setup process stdio options
  207. // stdin
  208. this->UVOptionsStdIO_[0].flags = UV_IGNORE;
  209. this->UVOptionsStdIO_[0].data.stream = nullptr;
  210. // stdout
  211. this->UVOptionsStdIO_[1].flags =
  212. static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
  213. this->UVOptionsStdIO_[1].data.stream = this->UVPipeOut_.uv_stream();
  214. // stderr
  215. this->UVOptionsStdIO_[2].flags =
  216. static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
  217. this->UVOptionsStdIO_[2].data.stream = this->UVPipeErr_.uv_stream();
  218. // -- Setup process options
  219. std::fill_n(reinterpret_cast<char*>(&this->UVOptions_),
  220. sizeof(this->UVOptions_), 0);
  221. this->UVOptions_.exit_cb = &cmUVReadOnlyProcess::UVExit;
  222. this->UVOptions_.file = this->CommandPtr_[0];
  223. this->UVOptions_.args = const_cast<char**>(this->CommandPtr_.data());
  224. this->UVOptions_.cwd = this->Setup_.WorkingDirectory.c_str();
  225. this->UVOptions_.flags = UV_PROCESS_WINDOWS_HIDE;
  226. this->UVOptions_.stdio_count =
  227. static_cast<int>(this->UVOptionsStdIO_.size());
  228. this->UVOptions_.stdio = this->UVOptionsStdIO_.data();
  229. // -- Spawn process
  230. int uvErrorCode = this->UVProcess_.spawn(*uv_loop, this->UVOptions_, this);
  231. if (uvErrorCode != 0) {
  232. this->Result()->ErrorMessage = "libuv process spawn failed";
  233. if (const char* uvErr = uv_strerror(uvErrorCode)) {
  234. this->Result()->ErrorMessage += ": ";
  235. this->Result()->ErrorMessage += uvErr;
  236. }
  237. }
  238. }
  239. // -- Start reading from stdio streams
  240. if (!this->Result()->error()) {
  241. if (!this->UVPipeOut_.startRead(
  242. [this](cmUVPipeBuffer::DataRange range) {
  243. this->UVPipeOutData(range);
  244. },
  245. [this](ssize_t error) { this->UVPipeOutEnd(error); })) {
  246. this->Result()->ErrorMessage =
  247. "libuv start reading from stdout pipe failed";
  248. }
  249. }
  250. if (!this->Result()->error()) {
  251. if (!this->UVPipeErr_.startRead(
  252. [this](cmUVPipeBuffer::DataRange range) {
  253. this->UVPipeErrData(range);
  254. },
  255. [this](ssize_t error) { this->UVPipeErrEnd(error); })) {
  256. this->Result()->ErrorMessage =
  257. "libuv start reading from stderr pipe failed";
  258. }
  259. }
  260. if (!this->Result()->error()) {
  261. this->IsStarted_ = true;
  262. this->FinishedCallback_ = std::move(finishedCallback);
  263. } else {
  264. // Clear libuv handles and finish
  265. this->UVProcess_.reset();
  266. this->UVPipeOut_.reset();
  267. this->UVPipeErr_.reset();
  268. this->CommandPtr_.clear();
  269. }
  270. return this->IsStarted();
  271. }
  272. void cmUVReadOnlyProcess::UVExit(uv_process_t* handle, int64_t exitStatus,
  273. int termSignal)
  274. {
  275. auto& proc = *reinterpret_cast<cmUVReadOnlyProcess*>(handle->data);
  276. if (proc.IsStarted() && !proc.IsFinished()) {
  277. // Set error message on demand
  278. proc.Result()->ExitStatus = exitStatus;
  279. proc.Result()->TermSignal = termSignal;
  280. if (proc.Result()->ErrorMessage.empty()) {
  281. if (termSignal != 0) {
  282. proc.Result()->ErrorMessage = cmStrCat(
  283. "Process was terminated by signal ", proc.Result()->TermSignal);
  284. } else if (exitStatus != 0) {
  285. proc.Result()->ErrorMessage = cmStrCat(
  286. "Process failed with return value ", proc.Result()->ExitStatus);
  287. }
  288. }
  289. // Reset process handle
  290. proc.UVProcess_.reset();
  291. // Try finish
  292. proc.UVTryFinish();
  293. }
  294. }
  295. void cmUVReadOnlyProcess::UVPipeOutData(cmUVPipeBuffer::DataRange data) const
  296. {
  297. this->Result()->StdOut.append(data.begin(), data.end());
  298. }
  299. void cmUVReadOnlyProcess::UVPipeOutEnd(ssize_t error)
  300. {
  301. // Process pipe error
  302. if ((error != 0) && !this->Result()->error()) {
  303. this->Result()->ErrorMessage = cmStrCat(
  304. "Reading from stdout pipe failed with libuv error code ", error);
  305. }
  306. // Try finish
  307. this->UVTryFinish();
  308. }
  309. void cmUVReadOnlyProcess::UVPipeErrData(cmUVPipeBuffer::DataRange data) const
  310. {
  311. std::string* str = this->Setup_.MergedOutput ? &this->Result()->StdOut
  312. : &this->Result()->StdErr;
  313. str->append(data.begin(), data.end());
  314. }
  315. void cmUVReadOnlyProcess::UVPipeErrEnd(ssize_t error)
  316. {
  317. // Process pipe error
  318. if ((error != 0) && !this->Result()->error()) {
  319. this->Result()->ErrorMessage = cmStrCat(
  320. "Reading from stderr pipe failed with libuv error code ", error);
  321. }
  322. // Try finish
  323. this->UVTryFinish();
  324. }
  325. void cmUVReadOnlyProcess::UVTryFinish()
  326. {
  327. // There still might be data in the pipes after the process has finished.
  328. // Therefore check if the process is finished AND all pipes are closed
  329. // before signaling the worker thread to continue.
  330. if ((this->UVProcess_.get()) || (this->UVPipeOut_.uv_pipe()) ||
  331. (this->UVPipeErr_.uv_pipe())) {
  332. return;
  333. }
  334. this->IsFinished_ = true;
  335. this->FinishedCallback_();
  336. }
  337. /**
  338. * @brief Worker pool worker thread
  339. */
  340. class cmWorkerPoolWorker
  341. {
  342. public:
  343. cmWorkerPoolWorker(uv_loop_t& uvLoop);
  344. ~cmWorkerPoolWorker();
  345. cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
  346. cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;
  347. /**
  348. * Set the internal thread
  349. */
  350. void SetThread(std::thread&& aThread) { this->Thread_ = std::move(aThread); }
  351. /**
  352. * Run an external process
  353. */
  354. bool RunProcess(cmWorkerPool::ProcessResultT& result,
  355. std::vector<std::string> const& command,
  356. std::string const& workingDirectory);
  357. private:
  358. // -- Libuv callbacks
  359. static void UVProcessStart(uv_async_t* handle);
  360. void UVProcessFinished();
  361. // -- Process management
  362. struct
  363. {
  364. std::mutex Mutex;
  365. cm::uv_async_ptr Request;
  366. std::condition_variable Condition;
  367. std::unique_ptr<cmUVReadOnlyProcess> ROP;
  368. } Proc_;
  369. // -- System thread
  370. std::thread Thread_;
  371. };
  372. cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
  373. {
  374. this->Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
  375. }
  376. cmWorkerPoolWorker::~cmWorkerPoolWorker()
  377. {
  378. if (this->Thread_.joinable()) {
  379. this->Thread_.join();
  380. }
  381. }
  382. bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
  383. std::vector<std::string> const& command,
  384. std::string const& workingDirectory)
  385. {
  386. if (command.empty()) {
  387. return false;
  388. }
  389. // Create process instance
  390. {
  391. std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
  392. this->Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>();
  393. this->Proc_.ROP->setup(&result, true, command, workingDirectory);
  394. }
  395. // Send asynchronous process start request to libuv loop
  396. this->Proc_.Request.send();
  397. // Wait until the process has been finished and destroyed
  398. {
  399. std::unique_lock<std::mutex> ulock(this->Proc_.Mutex);
  400. while (this->Proc_.ROP) {
  401. this->Proc_.Condition.wait(ulock);
  402. }
  403. }
  404. return !result.error();
  405. }
  406. void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
  407. {
  408. auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
  409. bool startFailed = false;
  410. {
  411. auto& Proc = wrk->Proc_;
  412. std::lock_guard<std::mutex> lock(Proc.Mutex);
  413. if (Proc.ROP && !Proc.ROP->IsStarted()) {
  414. startFailed =
  415. !Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); });
  416. }
  417. }
  418. // Clean up if starting of the process failed
  419. if (startFailed) {
  420. wrk->UVProcessFinished();
  421. }
  422. }
  423. void cmWorkerPoolWorker::UVProcessFinished()
  424. {
  425. std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
  426. if (this->Proc_.ROP &&
  427. (this->Proc_.ROP->IsFinished() || !this->Proc_.ROP->IsStarted())) {
  428. this->Proc_.ROP.reset();
  429. }
  430. // Notify idling thread
  431. this->Proc_.Condition.notify_one();
  432. }
  433. /**
  434. * @brief Private worker pool internals
  435. */
  436. class cmWorkerPoolInternal
  437. {
  438. public:
  439. // -- Constructors
  440. cmWorkerPoolInternal(cmWorkerPool* pool);
  441. ~cmWorkerPoolInternal();
  442. /**
  443. * Runs the libuv loop.
  444. */
  445. bool Process();
  446. /**
  447. * Clear queue and abort threads.
  448. */
  449. void Abort();
  450. /**
  451. * Push a job to the queue and notify a worker.
  452. */
  453. bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
  454. /**
  455. * Worker thread main loop method.
  456. */
  457. void Work(unsigned int workerIndex);
  458. // -- Request slots
  459. static void UVSlotBegin(uv_async_t* handle);
  460. static void UVSlotEnd(uv_async_t* handle);
  461. // -- UV loop
  462. std::unique_ptr<uv_loop_t> UVLoop;
  463. cm::uv_async_ptr UVRequestBegin;
  464. cm::uv_async_ptr UVRequestEnd;
  465. // -- Thread pool and job queue
  466. std::mutex Mutex;
  467. bool Processing = false;
  468. bool Aborting = false;
  469. bool FenceProcessing = false;
  470. unsigned int WorkersRunning = 0;
  471. unsigned int WorkersIdle = 0;
  472. unsigned int JobsProcessing = 0;
  473. std::deque<cmWorkerPool::JobHandleT> Queue;
  474. std::condition_variable Condition;
  475. std::condition_variable ConditionFence;
  476. std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
  477. // -- References
  478. cmWorkerPool* Pool = nullptr;
  479. };
  480. void cmWorkerPool::ProcessResultT::reset()
  481. {
  482. this->ExitStatus = 0;
  483. this->TermSignal = 0;
  484. if (!this->StdOut.empty()) {
  485. this->StdOut.clear();
  486. this->StdOut.shrink_to_fit();
  487. }
  488. if (!this->StdErr.empty()) {
  489. this->StdErr.clear();
  490. this->StdErr.shrink_to_fit();
  491. }
  492. if (!this->ErrorMessage.empty()) {
  493. this->ErrorMessage.clear();
  494. this->ErrorMessage.shrink_to_fit();
  495. }
  496. }
  497. cmWorkerPoolInternal::cmWorkerPoolInternal(cmWorkerPool* pool)
  498. : Pool(pool)
  499. {
  500. // Initialize libuv loop
  501. uv_disable_stdio_inheritance();
  502. this->UVLoop = cm::make_unique<uv_loop_t>();
  503. uv_loop_init(this->UVLoop.get());
  504. }
  505. cmWorkerPoolInternal::~cmWorkerPoolInternal()
  506. {
  507. uv_loop_close(this->UVLoop.get());
  508. }
  509. bool cmWorkerPoolInternal::Process()
  510. {
  511. // Reset state flags
  512. this->Processing = true;
  513. this->Aborting = false;
  514. // Initialize libuv asynchronous request
  515. this->UVRequestBegin.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotBegin,
  516. this);
  517. this->UVRequestEnd.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotEnd,
  518. this);
  519. // Send begin request
  520. this->UVRequestBegin.send();
  521. // Run libuv loop
  522. bool success = (uv_run(this->UVLoop.get(), UV_RUN_DEFAULT) == 0);
  523. // Update state flags
  524. this->Processing = false;
  525. this->Aborting = false;
  526. return success;
  527. }
  528. void cmWorkerPoolInternal::Abort()
  529. {
  530. // Clear all jobs and set abort flag
  531. std::lock_guard<std::mutex> guard(this->Mutex);
  532. if (!this->Aborting) {
  533. // Register abort and clear queue
  534. this->Aborting = true;
  535. this->Queue.clear();
  536. this->Condition.notify_all();
  537. }
  538. }
  539. inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle)
  540. {
  541. std::lock_guard<std::mutex> guard(this->Mutex);
  542. if (this->Aborting) {
  543. return false;
  544. }
  545. // Append the job to the queue
  546. this->Queue.emplace_back(std::move(jobHandle));
  547. // Notify an idle worker if there's one
  548. if (this->WorkersIdle != 0) {
  549. this->Condition.notify_one();
  550. }
  551. // Return success
  552. return true;
  553. }
  554. void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle)
  555. {
  556. auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
  557. // Create worker threads
  558. {
  559. unsigned int const num = gint.Pool->ThreadCount();
  560. // Create workers
  561. gint.Workers.reserve(num);
  562. for (unsigned int ii = 0; ii != num; ++ii) {
  563. gint.Workers.emplace_back(
  564. cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
  565. }
  566. // Start worker threads
  567. for (unsigned int ii = 0; ii != num; ++ii) {
  568. gint.Workers[ii]->SetThread(
  569. std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
  570. }
  571. }
  572. // Destroy begin request
  573. gint.UVRequestBegin.reset();
  574. }
  575. void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle)
  576. {
  577. auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
  578. // Join and destroy worker threads
  579. gint.Workers.clear();
  580. // Destroy end request
  581. gint.UVRequestEnd.reset();
  582. }
  583. void cmWorkerPoolInternal::Work(unsigned int workerIndex)
  584. {
  585. cmWorkerPool::JobHandleT jobHandle;
  586. std::unique_lock<std::mutex> uLock(this->Mutex);
  587. // Increment running workers count
  588. ++this->WorkersRunning;
  589. // Enter worker main loop
  590. while (true) {
  591. // Abort on request
  592. if (this->Aborting) {
  593. break;
  594. }
  595. // Wait for new jobs on the main CV
  596. if (this->Queue.empty()) {
  597. ++this->WorkersIdle;
  598. this->Condition.wait(uLock);
  599. --this->WorkersIdle;
  600. continue;
  601. }
  602. // If there is a fence currently active or waiting,
  603. // sleep on the main CV and try again.
  604. if (this->FenceProcessing) {
  605. this->Condition.wait(uLock);
  606. continue;
  607. }
  608. // Pop next job from queue
  609. jobHandle = std::move(this->Queue.front());
  610. this->Queue.pop_front();
  611. // Check for fence jobs
  612. bool raisedFence = false;
  613. if (jobHandle->IsFence()) {
  614. this->FenceProcessing = true;
  615. raisedFence = true;
  616. // Wait on the Fence CV until all pending jobs are done.
  617. while (this->JobsProcessing != 0 && !this->Aborting) {
  618. this->ConditionFence.wait(uLock);
  619. }
  620. // When aborting, explicitly kick all threads alive once more.
  621. if (this->Aborting) {
  622. this->FenceProcessing = false;
  623. this->Condition.notify_all();
  624. break;
  625. }
  626. }
  627. // Unlocked scope for job processing
  628. ++this->JobsProcessing;
  629. {
  630. uLock.unlock();
  631. jobHandle->Work(this->Pool, workerIndex); // Process job
  632. jobHandle.reset(); // Destroy job
  633. uLock.lock();
  634. }
  635. --this->JobsProcessing;
  636. // If this was the thread that entered fence processing
  637. // originally, notify all idling workers that the fence
  638. // is done.
  639. if (raisedFence) {
  640. this->FenceProcessing = false;
  641. this->Condition.notify_all();
  642. }
  643. // If fence processing is still not done, notify the
  644. // the fencing worker when all active jobs are done.
  645. if (this->FenceProcessing && this->JobsProcessing == 0) {
  646. this->ConditionFence.notify_all();
  647. }
  648. }
  649. // Decrement running workers count
  650. if (--this->WorkersRunning == 0) {
  651. // Last worker thread about to finish. Send libuv event.
  652. this->UVRequestEnd.send();
  653. }
  654. }
  655. cmWorkerPool::JobT::~JobT() = default;
  656. bool cmWorkerPool::JobT::RunProcess(ProcessResultT& result,
  657. std::vector<std::string> const& command,
  658. std::string const& workingDirectory)
  659. {
  660. // Get worker by index
  661. auto* wrk = this->Pool_->Int_->Workers.at(this->WorkerIndex_).get();
  662. return wrk->RunProcess(result, command, workingDirectory);
  663. }
  664. cmWorkerPool::cmWorkerPool()
  665. : Int_(cm::make_unique<cmWorkerPoolInternal>(this))
  666. {
  667. }
  668. cmWorkerPool::~cmWorkerPool() = default;
  669. void cmWorkerPool::SetThreadCount(unsigned int threadCount)
  670. {
  671. if (!this->Int_->Processing) {
  672. this->ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
  673. }
  674. }
  675. bool cmWorkerPool::Process(void* userData)
  676. {
  677. // Setup user data
  678. this->UserData_ = userData;
  679. // Run libuv loop
  680. bool success = this->Int_->Process();
  681. // Clear user data
  682. this->UserData_ = nullptr;
  683. // Return
  684. return success;
  685. }
  686. bool cmWorkerPool::PushJob(JobHandleT&& jobHandle)
  687. {
  688. return this->Int_->PushJob(std::move(jobHandle));
  689. }
  690. void cmWorkerPool::Abort()
  691. {
  692. this->Int_->Abort();
  693. }