cmWorkerPool.cxx 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776
  1. /* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
  2. file Copyright.txt or https://cmake.org/licensing for details. */
  3. #include "cmWorkerPool.h"
  4. #include <algorithm>
  5. #include <array>
  6. #include <condition_variable>
  7. #include <cstddef>
  8. #include <deque>
  9. #include <functional>
  10. #include <mutex>
  11. #include <thread>
  12. #include <cm/memory>
  13. #include <cm3p/uv.h>
  14. #include "cmRange.h"
  15. #include "cmStringAlgorithms.h"
  16. #include "cmUVHandlePtr.h"
  17. #include "cmUVSignalHackRAII.h" // IWYU pragma: keep
  18. /**
  19. * @brief libuv pipe buffer class
  20. */
  21. class cmUVPipeBuffer
  22. {
  23. public:
  24. using DataRange = cmRange<const char*>;
  25. using DataFunction = std::function<void(DataRange)>;
  26. /// On error the ssize_t argument is a non zero libuv error code
  27. using EndFunction = std::function<void(ssize_t)>;
  28. public:
  29. /**
  30. * Reset to construction state
  31. */
  32. void reset();
  33. /**
  34. * Initializes uv_pipe(), uv_stream() and uv_handle()
  35. * @return true on success
  36. */
  37. bool init(uv_loop_t* uv_loop);
  38. /**
  39. * Start reading
  40. * @return true on success
  41. */
  42. bool startRead(DataFunction dataFunction, EndFunction endFunction);
  43. //! libuv pipe
  44. uv_pipe_t* uv_pipe() const { return UVPipe_.get(); }
  45. //! uv_pipe() casted to libuv stream
  46. uv_stream_t* uv_stream() const { return static_cast<uv_stream_t*>(UVPipe_); }
  47. //! uv_pipe() casted to libuv handle
  48. uv_handle_t* uv_handle() { return static_cast<uv_handle_t*>(UVPipe_); }
  49. private:
  50. // -- Libuv callbacks
  51. static void UVAlloc(uv_handle_t* handle, size_t suggestedSize,
  52. uv_buf_t* buf);
  53. static void UVData(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf);
  54. private:
  55. cm::uv_pipe_ptr UVPipe_;
  56. std::vector<char> Buffer_;
  57. DataFunction DataFunction_;
  58. EndFunction EndFunction_;
  59. };
  60. void cmUVPipeBuffer::reset()
  61. {
  62. if (UVPipe_.get() != nullptr) {
  63. EndFunction_ = nullptr;
  64. DataFunction_ = nullptr;
  65. Buffer_.clear();
  66. Buffer_.shrink_to_fit();
  67. UVPipe_.reset();
  68. }
  69. }
  70. bool cmUVPipeBuffer::init(uv_loop_t* uv_loop)
  71. {
  72. reset();
  73. if (uv_loop == nullptr) {
  74. return false;
  75. }
  76. int ret = UVPipe_.init(*uv_loop, 0, this);
  77. return (ret == 0);
  78. }
  79. bool cmUVPipeBuffer::startRead(DataFunction dataFunction,
  80. EndFunction endFunction)
  81. {
  82. if (UVPipe_.get() == nullptr) {
  83. return false;
  84. }
  85. if (!dataFunction || !endFunction) {
  86. return false;
  87. }
  88. DataFunction_ = std::move(dataFunction);
  89. EndFunction_ = std::move(endFunction);
  90. int ret = uv_read_start(uv_stream(), &cmUVPipeBuffer::UVAlloc,
  91. &cmUVPipeBuffer::UVData);
  92. return (ret == 0);
  93. }
  94. void cmUVPipeBuffer::UVAlloc(uv_handle_t* handle, size_t suggestedSize,
  95. uv_buf_t* buf)
  96. {
  97. auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(handle->data);
  98. pipe.Buffer_.resize(suggestedSize);
  99. buf->base = pipe.Buffer_.data();
  100. buf->len = static_cast<unsigned long>(pipe.Buffer_.size());
  101. }
  102. void cmUVPipeBuffer::UVData(uv_stream_t* stream, ssize_t nread,
  103. const uv_buf_t* buf)
  104. {
  105. auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(stream->data);
  106. if (nread > 0) {
  107. if (buf->base != nullptr) {
  108. // Call data function
  109. pipe.DataFunction_(DataRange(buf->base, buf->base + nread));
  110. }
  111. } else if (nread < 0) {
  112. // Save the end function on the stack before resetting the pipe
  113. EndFunction efunc;
  114. efunc.swap(pipe.EndFunction_);
  115. // Reset pipe before calling the end function
  116. pipe.reset();
  117. // Call end function
  118. efunc((nread == UV_EOF) ? 0 : nread);
  119. }
  120. }
  121. /**
  122. * @brief External process management class
  123. */
  124. class cmUVReadOnlyProcess
  125. {
  126. public:
  127. // -- Types
  128. //! @brief Process settings
  129. struct SetupT
  130. {
  131. std::string WorkingDirectory;
  132. std::vector<std::string> Command;
  133. cmWorkerPool::ProcessResultT* Result = nullptr;
  134. bool MergedOutput = false;
  135. };
  136. public:
  137. // -- Const accessors
  138. SetupT const& Setup() const { return Setup_; }
  139. cmWorkerPool::ProcessResultT* Result() const { return Setup_.Result; }
  140. bool IsStarted() const { return IsStarted_; }
  141. bool IsFinished() const { return IsFinished_; }
  142. // -- Runtime
  143. void setup(cmWorkerPool::ProcessResultT* result, bool mergedOutput,
  144. std::vector<std::string> const& command,
  145. std::string const& workingDirectory = std::string());
  146. bool start(uv_loop_t* uv_loop, std::function<void()> finishedCallback);
  147. private:
  148. // -- Libuv callbacks
  149. static void UVExit(uv_process_t* handle, int64_t exitStatus, int termSignal);
  150. void UVPipeOutData(cmUVPipeBuffer::DataRange data);
  151. void UVPipeOutEnd(ssize_t error);
  152. void UVPipeErrData(cmUVPipeBuffer::DataRange data);
  153. void UVPipeErrEnd(ssize_t error);
  154. void UVTryFinish();
  155. private:
  156. // -- Setup
  157. SetupT Setup_;
  158. // -- Runtime
  159. bool IsStarted_ = false;
  160. bool IsFinished_ = false;
  161. std::function<void()> FinishedCallback_;
  162. std::vector<const char*> CommandPtr_;
  163. std::array<uv_stdio_container_t, 3> UVOptionsStdIO_;
  164. uv_process_options_t UVOptions_;
  165. cm::uv_process_ptr UVProcess_;
  166. cmUVPipeBuffer UVPipeOut_;
  167. cmUVPipeBuffer UVPipeErr_;
  168. };
  169. void cmUVReadOnlyProcess::setup(cmWorkerPool::ProcessResultT* result,
  170. bool mergedOutput,
  171. std::vector<std::string> const& command,
  172. std::string const& workingDirectory)
  173. {
  174. Setup_.WorkingDirectory = workingDirectory;
  175. Setup_.Command = command;
  176. Setup_.Result = result;
  177. Setup_.MergedOutput = mergedOutput;
  178. }
  179. bool cmUVReadOnlyProcess::start(uv_loop_t* uv_loop,
  180. std::function<void()> finishedCallback)
  181. {
  182. if (IsStarted() || (Result() == nullptr)) {
  183. return false;
  184. }
  185. // Reset result before the start
  186. Result()->reset();
  187. // Fill command string pointers
  188. if (!Setup().Command.empty()) {
  189. CommandPtr_.reserve(Setup().Command.size() + 1);
  190. for (std::string const& arg : Setup().Command) {
  191. CommandPtr_.push_back(arg.c_str());
  192. }
  193. CommandPtr_.push_back(nullptr);
  194. } else {
  195. Result()->ErrorMessage = "Empty command";
  196. }
  197. if (!Result()->error()) {
  198. if (!UVPipeOut_.init(uv_loop)) {
  199. Result()->ErrorMessage = "libuv stdout pipe initialization failed";
  200. }
  201. }
  202. if (!Result()->error()) {
  203. if (!UVPipeErr_.init(uv_loop)) {
  204. Result()->ErrorMessage = "libuv stderr pipe initialization failed";
  205. }
  206. }
  207. if (!Result()->error()) {
  208. // -- Setup process stdio options
  209. // stdin
  210. UVOptionsStdIO_[0].flags = UV_IGNORE;
  211. UVOptionsStdIO_[0].data.stream = nullptr;
  212. // stdout
  213. UVOptionsStdIO_[1].flags =
  214. static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
  215. UVOptionsStdIO_[1].data.stream = UVPipeOut_.uv_stream();
  216. // stderr
  217. UVOptionsStdIO_[2].flags =
  218. static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
  219. UVOptionsStdIO_[2].data.stream = UVPipeErr_.uv_stream();
  220. // -- Setup process options
  221. std::fill_n(reinterpret_cast<char*>(&UVOptions_), sizeof(UVOptions_), 0);
  222. UVOptions_.exit_cb = &cmUVReadOnlyProcess::UVExit;
  223. UVOptions_.file = CommandPtr_[0];
  224. UVOptions_.args = const_cast<char**>(CommandPtr_.data());
  225. UVOptions_.cwd = Setup_.WorkingDirectory.c_str();
  226. UVOptions_.flags = UV_PROCESS_WINDOWS_HIDE;
  227. UVOptions_.stdio_count = static_cast<int>(UVOptionsStdIO_.size());
  228. UVOptions_.stdio = UVOptionsStdIO_.data();
  229. // -- Spawn process
  230. int uvErrorCode = UVProcess_.spawn(*uv_loop, UVOptions_, this);
  231. if (uvErrorCode != 0) {
  232. Result()->ErrorMessage = "libuv process spawn failed";
  233. if (const char* uvErr = uv_strerror(uvErrorCode)) {
  234. Result()->ErrorMessage += ": ";
  235. Result()->ErrorMessage += uvErr;
  236. }
  237. }
  238. }
  239. // -- Start reading from stdio streams
  240. if (!Result()->error()) {
  241. if (!UVPipeOut_.startRead(
  242. [this](cmUVPipeBuffer::DataRange range) {
  243. this->UVPipeOutData(range);
  244. },
  245. [this](ssize_t error) { this->UVPipeOutEnd(error); })) {
  246. Result()->ErrorMessage = "libuv start reading from stdout pipe failed";
  247. }
  248. }
  249. if (!Result()->error()) {
  250. if (!UVPipeErr_.startRead(
  251. [this](cmUVPipeBuffer::DataRange range) {
  252. this->UVPipeErrData(range);
  253. },
  254. [this](ssize_t error) { this->UVPipeErrEnd(error); })) {
  255. Result()->ErrorMessage = "libuv start reading from stderr pipe failed";
  256. }
  257. }
  258. if (!Result()->error()) {
  259. IsStarted_ = true;
  260. FinishedCallback_ = std::move(finishedCallback);
  261. } else {
  262. // Clear libuv handles and finish
  263. UVProcess_.reset();
  264. UVPipeOut_.reset();
  265. UVPipeErr_.reset();
  266. CommandPtr_.clear();
  267. }
  268. return IsStarted();
  269. }
  270. void cmUVReadOnlyProcess::UVExit(uv_process_t* handle, int64_t exitStatus,
  271. int termSignal)
  272. {
  273. auto& proc = *reinterpret_cast<cmUVReadOnlyProcess*>(handle->data);
  274. if (proc.IsStarted() && !proc.IsFinished()) {
  275. // Set error message on demand
  276. proc.Result()->ExitStatus = exitStatus;
  277. proc.Result()->TermSignal = termSignal;
  278. if (!proc.Result()->error()) {
  279. if (termSignal != 0) {
  280. proc.Result()->ErrorMessage = cmStrCat(
  281. "Process was terminated by signal ", proc.Result()->TermSignal);
  282. } else if (exitStatus != 0) {
  283. proc.Result()->ErrorMessage = cmStrCat(
  284. "Process failed with return value ", proc.Result()->ExitStatus);
  285. }
  286. }
  287. // Reset process handle
  288. proc.UVProcess_.reset();
  289. // Try finish
  290. proc.UVTryFinish();
  291. }
  292. }
  293. void cmUVReadOnlyProcess::UVPipeOutData(cmUVPipeBuffer::DataRange data)
  294. {
  295. Result()->StdOut.append(data.begin(), data.end());
  296. }
  297. void cmUVReadOnlyProcess::UVPipeOutEnd(ssize_t error)
  298. {
  299. // Process pipe error
  300. if ((error != 0) && !Result()->error()) {
  301. Result()->ErrorMessage = cmStrCat(
  302. "Reading from stdout pipe failed with libuv error code ", error);
  303. }
  304. // Try finish
  305. UVTryFinish();
  306. }
  307. void cmUVReadOnlyProcess::UVPipeErrData(cmUVPipeBuffer::DataRange data)
  308. {
  309. std::string* str =
  310. Setup_.MergedOutput ? &Result()->StdOut : &Result()->StdErr;
  311. str->append(data.begin(), data.end());
  312. }
  313. void cmUVReadOnlyProcess::UVPipeErrEnd(ssize_t error)
  314. {
  315. // Process pipe error
  316. if ((error != 0) && !Result()->error()) {
  317. Result()->ErrorMessage = cmStrCat(
  318. "Reading from stderr pipe failed with libuv error code ", error);
  319. }
  320. // Try finish
  321. UVTryFinish();
  322. }
  323. void cmUVReadOnlyProcess::UVTryFinish()
  324. {
  325. // There still might be data in the pipes after the process has finished.
  326. // Therefore check if the process is finished AND all pipes are closed
  327. // before signaling the worker thread to continue.
  328. if ((UVProcess_.get() != nullptr) || (UVPipeOut_.uv_pipe() != nullptr) ||
  329. (UVPipeErr_.uv_pipe() != nullptr)) {
  330. return;
  331. }
  332. IsFinished_ = true;
  333. FinishedCallback_();
  334. }
  335. /**
  336. * @brief Worker pool worker thread
  337. */
  338. class cmWorkerPoolWorker
  339. {
  340. public:
  341. cmWorkerPoolWorker(uv_loop_t& uvLoop);
  342. ~cmWorkerPoolWorker();
  343. cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
  344. cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;
  345. /**
  346. * Set the internal thread
  347. */
  348. void SetThread(std::thread&& aThread) { Thread_ = std::move(aThread); }
  349. /**
  350. * Run an external process
  351. */
  352. bool RunProcess(cmWorkerPool::ProcessResultT& result,
  353. std::vector<std::string> const& command,
  354. std::string const& workingDirectory);
  355. private:
  356. // -- Libuv callbacks
  357. static void UVProcessStart(uv_async_t* handle);
  358. void UVProcessFinished();
  359. private:
  360. // -- Process management
  361. struct
  362. {
  363. std::mutex Mutex;
  364. cm::uv_async_ptr Request;
  365. std::condition_variable Condition;
  366. std::unique_ptr<cmUVReadOnlyProcess> ROP;
  367. } Proc_;
  368. // -- System thread
  369. std::thread Thread_;
  370. };
  371. cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
  372. {
  373. Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
  374. }
  375. cmWorkerPoolWorker::~cmWorkerPoolWorker()
  376. {
  377. if (Thread_.joinable()) {
  378. Thread_.join();
  379. }
  380. }
  381. bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
  382. std::vector<std::string> const& command,
  383. std::string const& workingDirectory)
  384. {
  385. if (command.empty()) {
  386. return false;
  387. }
  388. // Create process instance
  389. {
  390. std::lock_guard<std::mutex> lock(Proc_.Mutex);
  391. Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>();
  392. Proc_.ROP->setup(&result, true, command, workingDirectory);
  393. }
  394. // Send asynchronous process start request to libuv loop
  395. Proc_.Request.send();
  396. // Wait until the process has been finished and destroyed
  397. {
  398. std::unique_lock<std::mutex> ulock(Proc_.Mutex);
  399. while (Proc_.ROP) {
  400. Proc_.Condition.wait(ulock);
  401. }
  402. }
  403. return !result.error();
  404. }
  405. void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
  406. {
  407. auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
  408. bool startFailed = false;
  409. {
  410. auto& Proc = wrk->Proc_;
  411. std::lock_guard<std::mutex> lock(Proc.Mutex);
  412. if (Proc.ROP && !Proc.ROP->IsStarted()) {
  413. startFailed =
  414. !Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); });
  415. }
  416. }
  417. // Clean up if starting of the process failed
  418. if (startFailed) {
  419. wrk->UVProcessFinished();
  420. }
  421. }
  422. void cmWorkerPoolWorker::UVProcessFinished()
  423. {
  424. std::lock_guard<std::mutex> lock(Proc_.Mutex);
  425. if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) {
  426. Proc_.ROP.reset();
  427. }
  428. // Notify idling thread
  429. Proc_.Condition.notify_one();
  430. }
  431. /**
  432. * @brief Private worker pool internals
  433. */
  434. class cmWorkerPoolInternal
  435. {
  436. public:
  437. // -- Constructors
  438. cmWorkerPoolInternal(cmWorkerPool* pool);
  439. ~cmWorkerPoolInternal();
  440. /**
  441. * Runs the libuv loop.
  442. */
  443. bool Process();
  444. /**
  445. * Clear queue and abort threads.
  446. */
  447. void Abort();
  448. /**
  449. * Push a job to the queue and notify a worker.
  450. */
  451. bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
  452. /**
  453. * Worker thread main loop method.
  454. */
  455. void Work(unsigned int workerIndex);
  456. // -- Request slots
  457. static void UVSlotBegin(uv_async_t* handle);
  458. static void UVSlotEnd(uv_async_t* handle);
  459. public:
  460. // -- UV loop
  461. #ifdef CMAKE_UV_SIGNAL_HACK
  462. std::unique_ptr<cmUVSignalHackRAII> UVHackRAII;
  463. #endif
  464. std::unique_ptr<uv_loop_t> UVLoop;
  465. cm::uv_async_ptr UVRequestBegin;
  466. cm::uv_async_ptr UVRequestEnd;
  467. // -- Thread pool and job queue
  468. std::mutex Mutex;
  469. bool Processing = false;
  470. bool Aborting = false;
  471. bool FenceProcessing = false;
  472. unsigned int WorkersRunning = 0;
  473. unsigned int WorkersIdle = 0;
  474. unsigned int JobsProcessing = 0;
  475. std::deque<cmWorkerPool::JobHandleT> Queue;
  476. std::condition_variable Condition;
  477. std::condition_variable ConditionFence;
  478. std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
  479. // -- References
  480. cmWorkerPool* Pool = nullptr;
  481. };
  482. void cmWorkerPool::ProcessResultT::reset()
  483. {
  484. ExitStatus = 0;
  485. TermSignal = 0;
  486. if (!StdOut.empty()) {
  487. StdOut.clear();
  488. StdOut.shrink_to_fit();
  489. }
  490. if (!StdErr.empty()) {
  491. StdErr.clear();
  492. StdErr.shrink_to_fit();
  493. }
  494. if (!ErrorMessage.empty()) {
  495. ErrorMessage.clear();
  496. ErrorMessage.shrink_to_fit();
  497. }
  498. }
  499. cmWorkerPoolInternal::cmWorkerPoolInternal(cmWorkerPool* pool)
  500. : Pool(pool)
  501. {
  502. // Initialize libuv loop
  503. uv_disable_stdio_inheritance();
  504. #ifdef CMAKE_UV_SIGNAL_HACK
  505. UVHackRAII = cm::make_unique<cmUVSignalHackRAII>();
  506. #endif
  507. UVLoop = cm::make_unique<uv_loop_t>();
  508. uv_loop_init(UVLoop.get());
  509. }
  510. cmWorkerPoolInternal::~cmWorkerPoolInternal()
  511. {
  512. uv_loop_close(UVLoop.get());
  513. }
  514. bool cmWorkerPoolInternal::Process()
  515. {
  516. // Reset state flags
  517. Processing = true;
  518. Aborting = false;
  519. // Initialize libuv asynchronous request
  520. UVRequestBegin.init(*UVLoop, &cmWorkerPoolInternal::UVSlotBegin, this);
  521. UVRequestEnd.init(*UVLoop, &cmWorkerPoolInternal::UVSlotEnd, this);
  522. // Send begin request
  523. UVRequestBegin.send();
  524. // Run libuv loop
  525. bool success = (uv_run(UVLoop.get(), UV_RUN_DEFAULT) == 0);
  526. // Update state flags
  527. Processing = false;
  528. Aborting = false;
  529. return success;
  530. }
  531. void cmWorkerPoolInternal::Abort()
  532. {
  533. // Clear all jobs and set abort flag
  534. std::lock_guard<std::mutex> guard(Mutex);
  535. if (!Aborting) {
  536. // Register abort and clear queue
  537. Aborting = true;
  538. Queue.clear();
  539. Condition.notify_all();
  540. }
  541. }
  542. inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle)
  543. {
  544. std::lock_guard<std::mutex> guard(Mutex);
  545. if (Aborting) {
  546. return false;
  547. }
  548. // Append the job to the queue
  549. Queue.emplace_back(std::move(jobHandle));
  550. // Notify an idle worker if there's one
  551. if (WorkersIdle != 0) {
  552. Condition.notify_one();
  553. }
  554. // Return success
  555. return true;
  556. }
  557. void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle)
  558. {
  559. auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
  560. // Create worker threads
  561. {
  562. unsigned int const num = gint.Pool->ThreadCount();
  563. // Create workers
  564. gint.Workers.reserve(num);
  565. for (unsigned int ii = 0; ii != num; ++ii) {
  566. gint.Workers.emplace_back(
  567. cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
  568. }
  569. // Start worker threads
  570. for (unsigned int ii = 0; ii != num; ++ii) {
  571. gint.Workers[ii]->SetThread(
  572. std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
  573. }
  574. }
  575. // Destroy begin request
  576. gint.UVRequestBegin.reset();
  577. }
  578. void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle)
  579. {
  580. auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
  581. // Join and destroy worker threads
  582. gint.Workers.clear();
  583. // Destroy end request
  584. gint.UVRequestEnd.reset();
  585. }
  586. void cmWorkerPoolInternal::Work(unsigned int workerIndex)
  587. {
  588. cmWorkerPool::JobHandleT jobHandle;
  589. std::unique_lock<std::mutex> uLock(Mutex);
  590. // Increment running workers count
  591. ++WorkersRunning;
  592. // Enter worker main loop
  593. while (true) {
  594. // Abort on request
  595. if (Aborting) {
  596. break;
  597. }
  598. // Wait for new jobs on the main CV
  599. if (Queue.empty()) {
  600. ++WorkersIdle;
  601. Condition.wait(uLock);
  602. --WorkersIdle;
  603. continue;
  604. }
  605. // If there is a fence currently active or waiting,
  606. // sleep on the main CV and try again.
  607. if (FenceProcessing) {
  608. Condition.wait(uLock);
  609. continue;
  610. }
  611. // Pop next job from queue
  612. jobHandle = std::move(Queue.front());
  613. Queue.pop_front();
  614. // Check for fence jobs
  615. bool raisedFence = false;
  616. if (jobHandle->IsFence()) {
  617. FenceProcessing = true;
  618. raisedFence = true;
  619. // Wait on the Fence CV until all pending jobs are done.
  620. while (JobsProcessing != 0 && !Aborting) {
  621. ConditionFence.wait(uLock);
  622. }
  623. // When aborting, explicitly kick all threads alive once more.
  624. if (Aborting) {
  625. FenceProcessing = false;
  626. Condition.notify_all();
  627. break;
  628. }
  629. }
  630. // Unlocked scope for job processing
  631. ++JobsProcessing;
  632. {
  633. uLock.unlock();
  634. jobHandle->Work(Pool, workerIndex); // Process job
  635. jobHandle.reset(); // Destroy job
  636. uLock.lock();
  637. }
  638. --JobsProcessing;
  639. // If this was the thread that entered fence processing
  640. // originally, notify all idling workers that the fence
  641. // is done.
  642. if (raisedFence) {
  643. FenceProcessing = false;
  644. Condition.notify_all();
  645. }
  646. // If fence processing is still not done, notify the
  647. // the fencing worker when all active jobs are done.
  648. if (FenceProcessing && JobsProcessing == 0) {
  649. ConditionFence.notify_all();
  650. }
  651. }
  652. // Decrement running workers count
  653. if (--WorkersRunning == 0) {
  654. // Last worker thread about to finish. Send libuv event.
  655. UVRequestEnd.send();
  656. }
  657. }
  658. cmWorkerPool::JobT::~JobT() = default;
  659. bool cmWorkerPool::JobT::RunProcess(ProcessResultT& result,
  660. std::vector<std::string> const& command,
  661. std::string const& workingDirectory)
  662. {
  663. // Get worker by index
  664. auto* wrk = Pool_->Int_->Workers.at(WorkerIndex_).get();
  665. return wrk->RunProcess(result, command, workingDirectory);
  666. }
  667. cmWorkerPool::cmWorkerPool()
  668. : Int_(cm::make_unique<cmWorkerPoolInternal>(this))
  669. {
  670. }
  671. cmWorkerPool::~cmWorkerPool() = default;
  672. void cmWorkerPool::SetThreadCount(unsigned int threadCount)
  673. {
  674. if (!Int_->Processing) {
  675. ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
  676. }
  677. }
  678. bool cmWorkerPool::Process(void* userData)
  679. {
  680. // Setup user data
  681. UserData_ = userData;
  682. // Run libuv loop
  683. bool success = Int_->Process();
  684. // Clear user data
  685. UserData_ = nullptr;
  686. // Return
  687. return success;
  688. }
  689. bool cmWorkerPool::PushJob(JobHandleT&& jobHandle)
  690. {
  691. return Int_->PushJob(std::move(jobHandle));
  692. }
  693. void cmWorkerPool::Abort()
  694. {
  695. Int_->Abort();
  696. }