| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518 |
- /* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
- file Copyright.txt or https://cmake.org/licensing for details. */
- #include "cmUVJobServerClient.h"
- #include <cassert>
- #include <utility>
- #ifndef _WIN32
- # include <cstdio>
- # include <string>
- # include <vector>
- # include <fcntl.h>
- # include <unistd.h>
- #endif
- #include <cm/memory>
- #include <cm/optional>
- #include <cm/string_view>
- #include "cmRange.h"
- #include "cmStringAlgorithms.h"
- #include "cmSystemTools.h"
- #include "cmUVHandlePtr.h"
- class cmUVJobServerClient::Impl
- {
- public:
- uv_loop_t& Loop;
- cm::uv_idle_ptr ImplicitToken;
- std::function<void()> OnToken;
- std::function<void(int)> OnDisconnect;
- // The number of tokens held by this client.
- unsigned int HeldTokens = 0;
- // The number of tokens we need to receive from the job server.
- unsigned int NeedTokens = 0;
- Impl(uv_loop_t& loop);
- virtual ~Impl();
- virtual void SendToken() = 0;
- virtual void StartReceivingTokens() = 0;
- virtual void StopReceivingTokens() = 0;
- void RequestToken();
- void ReleaseToken();
- void RequestExplicitToken();
- void DecrementNeedTokens();
- void HoldToken();
- void RequestImplicitToken();
- void ReleaseImplicitToken();
- void ReceivedToken();
- void Disconnected(int status);
- };
- cmUVJobServerClient::Impl::Impl(uv_loop_t& loop)
- : Loop(loop)
- {
- this->ImplicitToken.init(this->Loop, this);
- }
- cmUVJobServerClient::Impl::~Impl() = default;
- void cmUVJobServerClient::Impl::RequestToken()
- {
- if (this->HeldTokens == 0 && !uv_is_active(this->ImplicitToken)) {
- this->RequestImplicitToken();
- } else {
- this->RequestExplicitToken();
- }
- }
- void cmUVJobServerClient::Impl::ReleaseToken()
- {
- assert(this->HeldTokens > 0);
- --this->HeldTokens;
- if (this->HeldTokens == 0) {
- // This was the token implicitly owned by our process.
- this->ReleaseImplicitToken();
- } else {
- // This was a token we received from the job server. Send it back.
- this->SendToken();
- }
- }
- void cmUVJobServerClient::Impl::RequestExplicitToken()
- {
- ++this->NeedTokens;
- this->StartReceivingTokens();
- }
- void cmUVJobServerClient::Impl::DecrementNeedTokens()
- {
- assert(this->NeedTokens > 0);
- --this->NeedTokens;
- if (this->NeedTokens == 0) {
- this->StopReceivingTokens();
- }
- }
- void cmUVJobServerClient::Impl::HoldToken()
- {
- ++this->HeldTokens;
- if (this->OnToken) {
- this->OnToken();
- } else {
- this->ReleaseToken();
- }
- }
- void cmUVJobServerClient::Impl::RequestImplicitToken()
- {
- assert(this->HeldTokens == 0);
- this->ImplicitToken.start([](uv_idle_t* handle) {
- uv_idle_stop(handle);
- auto* self = static_cast<Impl*>(handle->data);
- self->HoldToken();
- });
- }
- void cmUVJobServerClient::Impl::ReleaseImplicitToken()
- {
- assert(this->HeldTokens == 0);
- // Use the implicit token in place of receiving one from the job server.
- if (this->NeedTokens > 0) {
- this->DecrementNeedTokens();
- this->RequestImplicitToken();
- }
- }
- void cmUVJobServerClient::Impl::ReceivedToken()
- {
- this->DecrementNeedTokens();
- this->HoldToken();
- }
- void cmUVJobServerClient::Impl::Disconnected(int status)
- {
- if (this->OnDisconnect) {
- this->OnDisconnect(status);
- }
- }
- //---------------------------------------------------------------------------
- // Implementation on POSIX platforms.
- // https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
- #ifndef _WIN32
- namespace {
- class ImplPosix : public cmUVJobServerClient::Impl
- {
- public:
- enum class Connection
- {
- None,
- FDs,
- FIFO,
- };
- Connection Conn = Connection::None;
- cm::uv_pipe_ptr ConnRead;
- cm::uv_pipe_ptr ConnWrite;
- cm::uv_pipe_ptr ConnFIFO;
- std::shared_ptr<std::function<void(int)>> OnWrite;
- void Connect();
- void ConnectFDs(int rfd, int wfd);
- void ConnectFIFO(const char* path);
- void Disconnect(int status);
- cm::uv_pipe_ptr OpenFD(int fd);
- uv_stream_t* GetWriter() const;
- uv_stream_t* GetReader() const;
- static void OnAllocateCB(uv_handle_t* handle, size_t suggested_size,
- uv_buf_t* buf);
- static void OnReadCB(uv_stream_t* stream, ssize_t nread,
- const uv_buf_t* buf);
- void OnAllocate(size_t suggested_size, uv_buf_t* buf);
- void OnRead(ssize_t nread, const uv_buf_t* buf);
- char ReadBuf = '.';
- bool ReceivingTokens = false;
- bool IsConnected() const;
- void SendToken() override;
- void StartReceivingTokens() override;
- void StopReceivingTokens() override;
- ImplPosix(uv_loop_t& loop);
- ~ImplPosix() override;
- };
- ImplPosix::ImplPosix(uv_loop_t& loop)
- : Impl(loop)
- , OnWrite(std::make_shared<std::function<void(int)>>([this](int status) {
- if (status != 0) {
- this->Disconnect(status);
- }
- }))
- {
- this->Connect();
- }
- ImplPosix::~ImplPosix()
- {
- this->Disconnect(0);
- }
- void ImplPosix::Connect()
- {
- // --jobserver-auth= for gnu make versions >= 4.2
- // --jobserver-fds= for gnu make versions < 4.2
- // -J for bsd make
- static const std::vector<cm::string_view> prefixes = {
- "--jobserver-auth=", "--jobserver-fds=", "-J"
- };
- cm::optional<std::string> makeflags = cmSystemTools::GetEnvVar("MAKEFLAGS");
- if (!makeflags) {
- return;
- }
- // Look for the *last* occurrence of jobserver flags.
- cm::optional<std::string> auth;
- std::vector<std::string> args;
- cmSystemTools::ParseUnixCommandLine(makeflags->c_str(), args);
- for (cm::string_view arg : cmReverseRange(args)) {
- for (cm::string_view prefix : prefixes) {
- if (cmHasPrefix(arg, prefix)) {
- auth = cmTrimWhitespace(arg.substr(prefix.length()));
- break;
- }
- }
- if (auth) {
- break;
- }
- }
- if (!auth) {
- return;
- }
- // fifo:PATH
- if (cmHasLiteralPrefix(*auth, "fifo:")) {
- ConnectFIFO(auth->substr(cmStrLen("fifo:")).c_str());
- return;
- }
- // reader,writer
- int reader;
- int writer;
- if (std::sscanf(auth->c_str(), "%d,%d", &reader, &writer) == 2) {
- ConnectFDs(reader, writer);
- }
- }
- cm::uv_pipe_ptr ImplPosix::OpenFD(int fd)
- {
- // Create a CLOEXEC duplicate so `uv_pipe_ptr` can close it
- // without closing the original file descriptor, which our
- // child processes might want to use too.
- cm::uv_pipe_ptr p;
- int fd_dup = dup(fd);
- if (fd_dup < 0) {
- return p;
- }
- if (fcntl(fd_dup, F_SETFD, FD_CLOEXEC) == -1) {
- close(fd_dup);
- return p;
- }
- p.init(this->Loop, 0, this);
- if (uv_pipe_open(p, fd_dup) < 0) {
- close(fd_dup);
- }
- return p;
- }
- void ImplPosix::ConnectFDs(int rfd, int wfd)
- {
- cm::uv_pipe_ptr connRead = this->OpenFD(rfd);
- cm::uv_pipe_ptr connWrite = this->OpenFD(wfd);
- // Verify that the read end is readable and the write end is writable.
- if (!connRead || !uv_is_readable(connRead) || //
- !connWrite || !uv_is_writable(connWrite)) {
- return;
- }
- this->ConnRead = std::move(connRead);
- this->ConnWrite = std::move(connWrite);
- this->Conn = Connection::FDs;
- }
- void ImplPosix::ConnectFIFO(const char* path)
- {
- int fd = open(path, O_RDWR);
- if (fd < 0) {
- return;
- }
- cm::uv_pipe_ptr connFIFO;
- connFIFO.init(this->Loop, 0, this);
- if (uv_pipe_open(connFIFO, fd) != 0) {
- close(fd);
- return;
- }
- // Verify that the fifo is both readable and writable.
- if (!connFIFO || !uv_is_readable(connFIFO) || !uv_is_writable(connFIFO)) {
- return;
- }
- this->ConnFIFO = std::move(connFIFO);
- this->Conn = Connection::FIFO;
- }
- void ImplPosix::Disconnect(int status)
- {
- if (this->Conn == Connection::None) {
- return;
- }
- this->StopReceivingTokens();
- switch (this->Conn) {
- case Connection::FDs:
- this->ConnRead.reset();
- this->ConnWrite.reset();
- break;
- case Connection::FIFO:
- this->ConnFIFO.reset();
- break;
- default:
- break;
- }
- this->Conn = Connection::None;
- if (status != 0) {
- this->Disconnected(status);
- }
- }
- uv_stream_t* ImplPosix::GetWriter() const
- {
- switch (this->Conn) {
- case Connection::FDs:
- return this->ConnWrite;
- case Connection::FIFO:
- return this->ConnFIFO;
- default:
- return nullptr;
- }
- }
- uv_stream_t* ImplPosix::GetReader() const
- {
- switch (this->Conn) {
- case Connection::FDs:
- return this->ConnRead;
- case Connection::FIFO:
- return this->ConnFIFO;
- default:
- return nullptr;
- }
- }
- void ImplPosix::OnAllocateCB(uv_handle_t* handle, size_t suggested_size,
- uv_buf_t* buf)
- {
- auto* self = static_cast<ImplPosix*>(handle->data);
- self->OnAllocate(suggested_size, buf);
- }
- void ImplPosix::OnReadCB(uv_stream_t* stream, ssize_t nread,
- const uv_buf_t* buf)
- {
- auto* self = static_cast<ImplPosix*>(stream->data);
- self->OnRead(nread, buf);
- }
- void ImplPosix::OnAllocate(size_t /*suggested_size*/, uv_buf_t* buf)
- {
- *buf = uv_buf_init(&this->ReadBuf, 1);
- }
- void ImplPosix::OnRead(ssize_t nread, const uv_buf_t* /*buf*/)
- {
- if (nread == 0) {
- return;
- }
- if (nread < 0) {
- auto status = static_cast<int>(nread);
- this->Disconnect(status);
- return;
- }
- assert(nread == 1);
- this->ReceivedToken();
- }
- bool ImplPosix::IsConnected() const
- {
- return this->Conn != Connection::None;
- }
- void ImplPosix::SendToken()
- {
- if (this->Conn == Connection::None) {
- return;
- }
- static char token = '.';
- uv_buf_t const buf = uv_buf_init(&token, sizeof(token));
- int status = cm::uv_write(this->GetWriter(), &buf, 1, this->OnWrite);
- if (status != 0) {
- this->Disconnect(status);
- }
- }
- void ImplPosix::StartReceivingTokens()
- {
- if (this->Conn == Connection::None) {
- return;
- }
- if (this->ReceivingTokens) {
- return;
- }
- int status = uv_read_start(this->GetReader(), &ImplPosix::OnAllocateCB,
- &ImplPosix::OnReadCB);
- if (status != 0) {
- this->Disconnect(status);
- return;
- }
- this->ReceivingTokens = true;
- }
- void ImplPosix::StopReceivingTokens()
- {
- if (this->Conn == Connection::None) {
- return;
- }
- if (!this->ReceivingTokens) {
- return;
- }
- this->ReceivingTokens = false;
- uv_read_stop(this->GetReader());
- }
- }
- #endif
- //---------------------------------------------------------------------------
- // Implementation of public interface.
- cmUVJobServerClient::cmUVJobServerClient(std::unique_ptr<Impl> impl)
- : Impl_(std::move(impl))
- {
- }
- cmUVJobServerClient::~cmUVJobServerClient() = default;
- cmUVJobServerClient::cmUVJobServerClient(cmUVJobServerClient&&) noexcept =
- default;
- cmUVJobServerClient& cmUVJobServerClient::operator=(
- cmUVJobServerClient&&) noexcept = default;
- void cmUVJobServerClient::RequestToken()
- {
- this->Impl_->RequestToken();
- }
- void cmUVJobServerClient::ReleaseToken()
- {
- this->Impl_->ReleaseToken();
- }
- int cmUVJobServerClient::GetHeldTokens() const
- {
- return this->Impl_->HeldTokens;
- }
- int cmUVJobServerClient::GetNeedTokens() const
- {
- return this->Impl_->NeedTokens;
- }
- cm::optional<cmUVJobServerClient> cmUVJobServerClient::Connect(
- uv_loop_t& loop, std::function<void()> onToken,
- std::function<void(int)> onDisconnect)
- {
- #if defined(_WIN32)
- // FIXME: Windows job server client not yet implemented.
- static_cast<void>(loop);
- static_cast<void>(onToken);
- static_cast<void>(onDisconnect);
- #else
- auto impl = cm::make_unique<ImplPosix>(loop);
- if (impl && impl->IsConnected()) {
- impl->OnToken = std::move(onToken);
- impl->OnDisconnect = std::move(onDisconnect);
- return cmUVJobServerClient(std::move(impl));
- }
- #endif
- return cm::nullopt;
- }
|