123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516 |
- // Copyright 2019 Google LLC
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // https://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- #include "content_stream.h"
- #include "dap/any.h"
- #include "dap/session.h"
- #include "chan.h"
- #include "json_serializer.h"
- #include "socket.h"
- #include <stdarg.h>
- #include <stdio.h>
- #include <atomic>
- #include <deque>
- #include <memory>
- #include <mutex>
- #include <thread>
- #include <unordered_map>
- #include <vector>
- namespace {
- class Impl : public dap::Session {
- public:
- void onError(const ErrorHandler& handler) override { handlers.put(handler); }
- void registerHandler(const dap::TypeInfo* typeinfo,
- const GenericRequestHandler& handler) override {
- handlers.put(typeinfo, handler);
- }
- void registerHandler(const dap::TypeInfo* typeinfo,
- const GenericEventHandler& handler) override {
- handlers.put(typeinfo, handler);
- }
- void registerHandler(const dap::TypeInfo* typeinfo,
- const GenericResponseSentHandler& handler) override {
- handlers.put(typeinfo, handler);
- }
- std::function<void()> getPayload() override {
- auto request = reader.read();
- if (request.size() > 0) {
- if (auto payload = processMessage(request)) {
- return payload;
- }
- }
- return {};
- }
- void connect(const std::shared_ptr<dap::Reader>& r,
- const std::shared_ptr<dap::Writer>& w) override {
- if (isBound.exchange(true)) {
- handlers.error("Session::connect called twice");
- return;
- }
- reader = dap::ContentReader(r);
- writer = dap::ContentWriter(w);
- }
- void startProcessingMessages(
- const ClosedHandler& onClose /* = {} */) override {
- if (isProcessingMessages.exchange(true)) {
- handlers.error("Session::startProcessingMessages() called twice");
- return;
- }
- recvThread = std::thread([this, onClose] {
- while (reader.isOpen()) {
- if (auto payload = getPayload()) {
- inbox.put(std::move(payload));
- }
- }
- if (onClose) {
- onClose();
- }
- });
- dispatchThread = std::thread([this] {
- while (auto payload = inbox.take()) {
- payload.value()();
- }
- });
- }
- bool send(const dap::TypeInfo* requestTypeInfo,
- const dap::TypeInfo* responseTypeInfo,
- const void* request,
- const GenericResponseHandler& responseHandler) override {
- int seq = nextSeq++;
- handlers.put(seq, responseTypeInfo, responseHandler);
- dap::json::Serializer s;
- if (!s.object([&](dap::FieldSerializer* fs) {
- return fs->field("seq", dap::integer(seq)) &&
- fs->field("type", "request") &&
- fs->field("command", requestTypeInfo->name()) &&
- fs->field("arguments", [&](dap::Serializer* s) {
- return requestTypeInfo->serialize(s, request);
- });
- })) {
- return false;
- }
- return send(s.dump());
- }
- bool send(const dap::TypeInfo* typeinfo, const void* event) override {
- dap::json::Serializer s;
- if (!s.object([&](dap::FieldSerializer* fs) {
- return fs->field("seq", dap::integer(nextSeq++)) &&
- fs->field("type", "event") &&
- fs->field("event", typeinfo->name()) &&
- fs->field("body", [&](dap::Serializer* s) {
- return typeinfo->serialize(s, event);
- });
- })) {
- return false;
- }
- return send(s.dump());
- }
- ~Impl() {
- inbox.close();
- reader.close();
- writer.close();
- if (recvThread.joinable()) {
- recvThread.join();
- }
- if (dispatchThread.joinable()) {
- dispatchThread.join();
- }
- }
- private:
- using Payload = std::function<void()>;
- class EventHandlers {
- public:
- void put(const ErrorHandler& handler) {
- std::unique_lock<std::mutex> lock(errorMutex);
- errorHandler = handler;
- }
- void error(const char* format, ...) {
- va_list vararg;
- va_start(vararg, format);
- std::unique_lock<std::mutex> lock(errorMutex);
- errorLocked(format, vararg);
- va_end(vararg);
- }
- std::pair<const dap::TypeInfo*, GenericRequestHandler> request(
- const std::string& name) {
- std::unique_lock<std::mutex> lock(requestMutex);
- auto it = requestMap.find(name);
- return (it != requestMap.end()) ? it->second : decltype(it->second){};
- }
- void put(const dap::TypeInfo* typeinfo,
- const GenericRequestHandler& handler) {
- std::unique_lock<std::mutex> lock(requestMutex);
- auto added =
- requestMap
- .emplace(typeinfo->name(), std::make_pair(typeinfo, handler))
- .second;
- if (!added) {
- errorfLocked("Request handler for '%s' already registered",
- typeinfo->name().c_str());
- }
- }
- std::pair<const dap::TypeInfo*, GenericResponseHandler> response(
- int64_t seq) {
- std::unique_lock<std::mutex> lock(responseMutex);
- auto responseIt = responseMap.find(seq);
- if (responseIt == responseMap.end()) {
- errorfLocked("Unknown response with sequence %d", seq);
- return {};
- }
- auto out = std::move(responseIt->second);
- responseMap.erase(seq);
- return out;
- }
- void put(int seq,
- const dap::TypeInfo* typeinfo,
- const GenericResponseHandler& handler) {
- std::unique_lock<std::mutex> lock(responseMutex);
- auto added =
- responseMap.emplace(seq, std::make_pair(typeinfo, handler)).second;
- if (!added) {
- errorfLocked("Response handler for sequence %d already registered",
- seq);
- }
- }
- std::pair<const dap::TypeInfo*, GenericEventHandler> event(
- const std::string& name) {
- std::unique_lock<std::mutex> lock(eventMutex);
- auto it = eventMap.find(name);
- return (it != eventMap.end()) ? it->second : decltype(it->second){};
- }
- void put(const dap::TypeInfo* typeinfo,
- const GenericEventHandler& handler) {
- std::unique_lock<std::mutex> lock(eventMutex);
- auto added =
- eventMap.emplace(typeinfo->name(), std::make_pair(typeinfo, handler))
- .second;
- if (!added) {
- errorfLocked("Event handler for '%s' already registered",
- typeinfo->name().c_str());
- }
- }
- GenericResponseSentHandler responseSent(const dap::TypeInfo* typeinfo) {
- std::unique_lock<std::mutex> lock(responseSentMutex);
- auto it = responseSentMap.find(typeinfo);
- return (it != responseSentMap.end()) ? it->second
- : decltype(it->second){};
- }
- void put(const dap::TypeInfo* typeinfo,
- const GenericResponseSentHandler& handler) {
- std::unique_lock<std::mutex> lock(responseSentMutex);
- auto added = responseSentMap.emplace(typeinfo, handler).second;
- if (!added) {
- errorfLocked("Response sent handler for '%s' already registered",
- typeinfo->name().c_str());
- }
- }
- private:
- void errorfLocked(const char* format, ...) {
- va_list vararg;
- va_start(vararg, format);
- errorLocked(format, vararg);
- va_end(vararg);
- }
- void errorLocked(const char* format, va_list args) {
- char buf[2048];
- vsnprintf(buf, sizeof(buf), format, args);
- if (errorHandler) {
- errorHandler(buf);
- }
- }
- std::mutex errorMutex;
- ErrorHandler errorHandler;
- std::mutex requestMutex;
- std::unordered_map<std::string,
- std::pair<const dap::TypeInfo*, GenericRequestHandler>>
- requestMap;
- std::mutex responseMutex;
- std::unordered_map<int64_t,
- std::pair<const dap::TypeInfo*, GenericResponseHandler>>
- responseMap;
- std::mutex eventMutex;
- std::unordered_map<std::string,
- std::pair<const dap::TypeInfo*, GenericEventHandler>>
- eventMap;
- std::mutex responseSentMutex;
- std::unordered_map<const dap::TypeInfo*, GenericResponseSentHandler>
- responseSentMap;
- }; // EventHandlers
- Payload processMessage(const std::string& str) {
- auto d = dap::json::Deserializer(str);
- dap::string type;
- if (!d.field("type", &type)) {
- handlers.error("Message missing string 'type' field");
- return {};
- }
- dap::integer sequence = 0;
- if (!d.field("seq", &sequence)) {
- handlers.error("Message missing number 'seq' field");
- return {};
- }
- if (type == "request") {
- return processRequest(&d, sequence);
- } else if (type == "event") {
- return processEvent(&d);
- } else if (type == "response") {
- processResponse(&d);
- return {};
- } else {
- handlers.error("Unknown message type '%s'", type.c_str());
- }
- return {};
- }
- Payload processRequest(dap::json::Deserializer* d, dap::integer sequence) {
- dap::string command;
- if (!d->field("command", &command)) {
- handlers.error("Request missing string 'command' field");
- return {};
- }
- const dap::TypeInfo* typeinfo;
- GenericRequestHandler handler;
- std::tie(typeinfo, handler) = handlers.request(command);
- if (!typeinfo) {
- handlers.error("No request handler registered for command '%s'",
- command.c_str());
- return {};
- }
- auto data = new uint8_t[typeinfo->size()];
- typeinfo->construct(data);
- if (!d->field("arguments", [&](dap::Deserializer* d) {
- return typeinfo->deserialize(d, data);
- })) {
- handlers.error("Failed to deserialize request");
- typeinfo->destruct(data);
- delete[] data;
- return {};
- }
- return [=] {
- handler(
- data,
- [=](const dap::TypeInfo* typeinfo, const void* data) {
- // onSuccess
- dap::json::Serializer s;
- s.object([&](dap::FieldSerializer* fs) {
- return fs->field("seq", dap::integer(nextSeq++)) &&
- fs->field("type", "response") &&
- fs->field("request_seq", sequence) &&
- fs->field("success", dap::boolean(true)) &&
- fs->field("command", command) &&
- fs->field("body", [&](dap::Serializer* s) {
- return typeinfo->serialize(s, data);
- });
- });
- send(s.dump());
- if (auto handler = handlers.responseSent(typeinfo)) {
- handler(data, nullptr);
- }
- },
- [=](const dap::TypeInfo* typeinfo, const dap::Error& error) {
- // onError
- dap::json::Serializer s;
- s.object([&](dap::FieldSerializer* fs) {
- return fs->field("seq", dap::integer(nextSeq++)) &&
- fs->field("type", "response") &&
- fs->field("request_seq", sequence) &&
- fs->field("success", dap::boolean(false)) &&
- fs->field("command", command) &&
- fs->field("message", error.message);
- });
- send(s.dump());
- if (auto handler = handlers.responseSent(typeinfo)) {
- handler(nullptr, &error);
- }
- });
- typeinfo->destruct(data);
- delete[] data;
- };
- }
- Payload processEvent(dap::json::Deserializer* d) {
- dap::string event;
- if (!d->field("event", &event)) {
- handlers.error("Event missing string 'event' field");
- return {};
- }
- const dap::TypeInfo* typeinfo;
- GenericEventHandler handler;
- std::tie(typeinfo, handler) = handlers.event(event);
- if (!typeinfo) {
- handlers.error("No event handler registered for event '%s'",
- event.c_str());
- return {};
- }
- auto data = new uint8_t[typeinfo->size()];
- typeinfo->construct(data);
- // "body" is an optional field for some events, such as "Terminated Event".
- bool body_ok = true;
- d->field("body", [&](dap::Deserializer* d) {
- if (!typeinfo->deserialize(d, data)) {
- body_ok = false;
- }
- return true;
- });
- if (!body_ok) {
- handlers.error("Failed to deserialize event '%s' body", event.c_str());
- typeinfo->destruct(data);
- delete[] data;
- return {};
- }
- return [=] {
- handler(data);
- typeinfo->destruct(data);
- delete[] data;
- };
- }
- void processResponse(const dap::Deserializer* d) {
- dap::integer requestSeq = 0;
- if (!d->field("request_seq", &requestSeq)) {
- handlers.error("Response missing int 'request_seq' field");
- return;
- }
- const dap::TypeInfo* typeinfo;
- GenericResponseHandler handler;
- std::tie(typeinfo, handler) = handlers.response(requestSeq);
- if (!typeinfo) {
- handlers.error("Unknown response with sequence %d", requestSeq);
- return;
- }
- dap::boolean success = false;
- if (!d->field("success", &success)) {
- handlers.error("Response missing int 'success' field");
- return;
- }
- if (success) {
- auto data = std::unique_ptr<uint8_t[]>(new uint8_t[typeinfo->size()]);
- typeinfo->construct(data.get());
- // "body" field in Response is an optional field.
- d->field("body", [&](const dap::Deserializer* d) {
- return typeinfo->deserialize(d, data.get());
- });
- handler(data.get(), nullptr);
- typeinfo->destruct(data.get());
- } else {
- std::string message;
- if (!d->field("message", &message)) {
- handlers.error("Failed to deserialize message");
- return;
- }
- auto error = dap::Error("%s", message.c_str());
- handler(nullptr, &error);
- }
- }
- bool send(const std::string& s) {
- std::unique_lock<std::mutex> lock(sendMutex);
- if (!writer.isOpen()) {
- handlers.error("Send failed as the writer is closed");
- return false;
- }
- return writer.write(s);
- }
- std::atomic<bool> isBound = {false};
- std::atomic<bool> isProcessingMessages = {false};
- dap::ContentReader reader;
- dap::ContentWriter writer;
- std::atomic<bool> shutdown = {false};
- EventHandlers handlers;
- std::thread recvThread;
- std::thread dispatchThread;
- dap::Chan<Payload> inbox;
- std::atomic<uint32_t> nextSeq = {1};
- std::mutex sendMutex;
- };
- } // anonymous namespace
- namespace dap {
- Error::Error(const std::string& message) : message(message) {}
- Error::Error(const char* msg, ...) {
- char buf[2048];
- va_list vararg;
- va_start(vararg, msg);
- vsnprintf(buf, sizeof(buf), msg, vararg);
- va_end(vararg);
- message = buf;
- }
- Session::~Session() = default;
- std::unique_ptr<Session> Session::create() {
- return std::unique_ptr<Session>(new Impl());
- }
- } // namespace dap
|