PubSubWriter.cpp 13 KB


  1. #include "PubSubWriter.hpp"
  2. #include "../../osdep/OSUtils.hpp"
  3. #include "CtlUtil.hpp"
  4. #include "member.pb.h"
  5. #include "member_status.pb.h"
  6. #include "network.pb.h"
  7. #include <chrono>
  8. #include <google/cloud/options.h>
  9. #include <google/cloud/pubsub/message.h>
  10. #include <google/cloud/pubsub/publisher.h>
  11. #include <google/cloud/pubsub/topic.h>
  12. #include <opentelemetry/trace/provider.h>
  13. namespace pubsub = ::google::cloud::pubsub;
  14. namespace ZeroTier {
  15. pbmessages::NetworkChange*
  16. networkChangeFromJson(std::string controllerID, const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork);
  17. pbmessages::MemberChange*
  18. memberChangeFromJson(std::string controllerID, const nlohmann::json& oldMember, const nlohmann::json& newMember);
  19. PubSubWriter::PubSubWriter(std::string project, std::string topic, std::string controller_id)
  20. : _controller_id(controller_id)
  21. , _project(project)
  22. , _topic(topic)
  23. {
  24. fprintf(
  25. stderr, "PubSubWriter for controller %s project %s topic %s\n", controller_id.c_str(), project.c_str(),
  26. topic.c_str());
  27. GOOGLE_PROTOBUF_VERIFY_VERSION;
  28. // If PUBSUB_EMULATOR_HOST is set, create the topic if it doesn't exist
  29. const char* emulatorHost = std::getenv("PUBSUB_EMULATOR_HOST");
  30. if (emulatorHost != nullptr) {
  31. create_gcp_pubsub_topic_if_needed(project, topic);
  32. }
  33. auto options =
  34. ::google::cloud::Options {}
  35. .set<pubsub::RetryPolicyOption>(pubsub::LimitedTimeRetryPolicy(std::chrono::seconds(5)).clone())
  36. .set<pubsub::BackoffPolicyOption>(
  37. pubsub::ExponentialBackoffPolicy(std::chrono::milliseconds(100), std::chrono::seconds(2), 1.3).clone());
  38. auto publisher = pubsub::MakePublisherConnection(pubsub::Topic(project, topic), std::move(options));
  39. _publisher = std::make_shared<pubsub::Publisher>(std::move(publisher));
  40. }
  41. PubSubWriter::~PubSubWriter()
  42. {
  43. }
  44. bool PubSubWriter::publishMessage(const std::string& payload, const std::string& frontend)
  45. {
  46. fprintf(stderr, "Publishing message to %s\n", _topic.c_str());
  47. std::vector<std::pair<std::string, std::string> > attributes;
  48. attributes.emplace_back("controller_id", _controller_id);
  49. if (! frontend.empty()) {
  50. attributes.emplace_back("frontend", frontend);
  51. }
  52. auto msg = pubsub::MessageBuilder {}.SetData(payload).SetAttributes(attributes).Build();
  53. auto message_id = _publisher->Publish(std::move(msg)).get();
  54. if (! message_id) {
  55. fprintf(stderr, "Failed to publish message: %s\n", std::move(message_id).status().message().c_str());
  56. return false;
  57. }
  58. fprintf(stderr, "Published message to %s\n", _topic.c_str());
  59. return true;
  60. }
  61. bool PubSubWriter::publishNetworkChange(
  62. const nlohmann::json& oldNetwork,
  63. const nlohmann::json& newNetwork,
  64. const std::string& frontend)
  65. {
  66. fprintf(stderr, "Publishing network change\n");
  67. pbmessages::NetworkChange* nc = networkChangeFromJson(_controller_id, oldNetwork, newNetwork);
  68. std::string payload;
  69. if (! nc->SerializeToString(&payload)) {
  70. fprintf(stderr, "Failed to serialize NetworkChange protobuf message\n");
  71. delete nc;
  72. return false;
  73. }
  74. delete nc;
  75. return publishMessage(payload, frontend);
  76. }
  77. bool PubSubWriter::publishMemberChange(
  78. const nlohmann::json& oldMember,
  79. const nlohmann::json& newMember,
  80. const std::string& frontend)
  81. {
  82. fprintf(stderr, "Publishing member change\n");
  83. pbmessages::MemberChange* mc = memberChangeFromJson(_controller_id, oldMember, newMember);
  84. std::string payload;
  85. if (! mc->SerializeToString(&payload)) {
  86. fprintf(stderr, "Failed to serialize MemberChange protobuf message\n");
  87. delete mc;
  88. return false;
  89. }
  90. delete mc;
  91. return publishMessage(payload, frontend);
  92. }
  93. bool PubSubWriter::publishStatusChange(
  94. std::string frontend,
  95. std::string network_id,
  96. std::string node_id,
  97. std::string os,
  98. std::string arch,
  99. std::string version,
  100. int64_t last_seen)
  101. {
  102. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  103. auto tracer = provider->GetTracer("PubSubWriter");
  104. auto span = tracer->StartSpan("PubSubWriter::publishStatusChange");
  105. auto scope = tracer->WithActiveSpan(span);
  106. pbmessages::MemberStatus_MemberStatusMetadata* metadata = new pbmessages::MemberStatus_MemberStatusMetadata();
  107. metadata->set_controller_id(_controller_id);
  108. metadata->set_trace_id(""); // TODO: generate a trace ID
  109. pbmessages::MemberStatus ms;
  110. ms.set_network_id(network_id);
  111. ms.set_member_id(node_id);
  112. ms.set_os(os);
  113. ms.set_arch(arch);
  114. ms.set_version(version);
  115. ms.set_timestamp(last_seen);
  116. ms.set_allocated_metadata(metadata);
  117. std::string payload;
  118. if (! ms.SerializeToString(&payload)) {
  119. fprintf(stderr, "Failed to serialize StatusChange protobuf message\n");
  120. return false;
  121. }
  122. return publishMessage(payload, "");
  123. }
  124. pbmessages::NetworkChange_Network* networkFromJson(const nlohmann::json& j)
  125. {
  126. if (! j.is_object()) {
  127. return nullptr;
  128. }
  129. pbmessages::NetworkChange_Network* n = new pbmessages::NetworkChange_Network();
  130. try {
  131. n->set_network_id(j.value("id", ""));
  132. n->set_name(j.value("name", ""));
  133. n->set_capabilities(OSUtils::jsonDump(j.value("capabilities", "[]"), -1));
  134. n->set_creation_time(j.value("creationTime", 0));
  135. n->set_enable_broadcast(j.value("enableBroadcast", false));
  136. for (const auto& p : j["ipAssignmentPools"]) {
  137. if (p.is_object()) {
  138. auto pool = n->add_assignment_pools();
  139. pool->set_start_ip(p.value("ipRangeStart", ""));
  140. pool->set_end_ip(p.value("ipRangeEnd", ""));
  141. }
  142. }
  143. n->set_mtu(j.value("mtu", 2800));
  144. n->set_multicast_limit(j.value("multicastLimit", 32));
  145. n->set_is_private(j.value("private", true));
  146. n->set_remote_trace_level(j.value("remoteTraceLevel", 0));
  147. n->set_remote_trace_target(j.value("remoteTraceTarget", ""));
  148. n->set_revision(j.value("revision", 0));
  149. for (const auto& p : j["routes"]) {
  150. if (p.is_object()) {
  151. auto r = n->add_routes();
  152. r->set_target(p.value("target", ""));
  153. r->set_via(p.value("via", ""));
  154. }
  155. }
  156. n->set_rules("");
  157. n->set_tags(OSUtils::jsonDump(j.value("tags", "[]"), -1));
  158. pbmessages::NetworkChange_IPV4AssignMode* v4am = new pbmessages::NetworkChange_IPV4AssignMode();
  159. if (j["v4AssignMode"].is_object()) {
  160. v4am->set_zt(j["v4AssignMode"].value("zt", false));
  161. }
  162. n->set_allocated_ipv4_assign_mode(v4am);
  163. pbmessages::NetworkChange_IPV6AssignMode* v6am = new pbmessages::NetworkChange_IPV6AssignMode();
  164. if (j["v6AssignMode"].is_object()) {
  165. v6am->set_zt(j["v6AssignMode"].value("zt", false));
  166. v6am->set_six_plane(j["v6AssignMode"].value("6plane", false));
  167. v6am->set_rfc4193(j["v6AssignMode"].value("rfc4193", false));
  168. }
  169. n->set_allocated_ipv6_assign_mode(v6am);
  170. nlohmann::json jdns = j.value("dns", nullptr);
  171. if (jdns.is_object()) {
  172. pbmessages::NetworkChange_DNS* dns = new pbmessages::NetworkChange_DNS();
  173. dns->set_domain(jdns.value("domain", ""));
  174. for (const auto& s : jdns["servers"]) {
  175. if (s.is_string()) {
  176. auto server = dns->add_nameservers();
  177. *server = s;
  178. }
  179. }
  180. n->set_allocated_dns(dns);
  181. }
  182. n->set_sso_enabled(j.value("ssoEnabled", false));
  183. if (j.value("ssoEnabled", false)) {
  184. n->set_sso_provider(j.value("provider", ""));
  185. n->set_sso_client_id(j.value("clientId", ""));
  186. n->set_sso_authorization_endpoint(j.value("authorizationEndpoint", ""));
  187. n->set_sso_issuer(j.value("issuer", ""));
  188. n->set_sso_provider(j.value("provider", ""));
  189. }
  190. n->set_rules_source(j.value("rulesSource", ""));
  191. }
  192. catch (const std::exception& e) {
  193. fprintf(stderr, "Exception parsing network JSON: %s\n", e.what());
  194. delete n;
  195. return nullptr;
  196. }
  197. return n;
  198. }
  199. pbmessages::NetworkChange*
  200. networkChangeFromJson(std::string controllerID, const nlohmann::json& oldNetwork, const nlohmann::json& newNetwork)
  201. {
  202. pbmessages::NetworkChange* nc = new pbmessages::NetworkChange();
  203. nc->set_allocated_old(networkFromJson(oldNetwork));
  204. nc->set_allocated_new_(networkFromJson(newNetwork));
  205. nc->set_change_source(pbmessages::NetworkChange_ChangeSource::NetworkChange_ChangeSource_CONTROLLER);
  206. pbmessages::NetworkChange_NetworkChangeMetadata* metadata = new pbmessages::NetworkChange_NetworkChangeMetadata();
  207. metadata->set_controller_id(controllerID);
  208. metadata->set_trace_id(""); // TODO: generate a trace ID
  209. nc->set_allocated_metadata(metadata);
  210. return nc;
  211. }
  212. pbmessages::MemberChange_Member* memberFromJson(const nlohmann::json& j)
  213. {
  214. if (! j.is_object()) {
  215. fprintf(stderr, "memberFromJson: JSON is not an object\n");
  216. return nullptr;
  217. }
  218. fprintf(stderr, "memberFromJSON: %s\n", j.dump().c_str());
  219. pbmessages::MemberChange_Member* m = new pbmessages::MemberChange_Member();
  220. try {
  221. m->set_network_id(j.value("networkId", ""));
  222. m->set_device_id(j.value("id", ""));
  223. m->set_identity(j.value("identity", ""));
  224. m->set_authorized(j.value("authorized", false));
  225. if (j["ipAssignments"].is_array()) {
  226. fprintf(stderr, "memberFromJSON: has ipAssignments\n");
  227. for (const auto& addr : j["ipAssignments"]) {
  228. if (addr.is_string()) {
  229. auto a = m->add_ip_assignments();
  230. std::string address = addr.get<std::string>();
  231. *a = address;
  232. }
  233. }
  234. }
  235. else {
  236. fprintf(stderr, "memberFromJSON: no ipAssignments\n");
  237. }
  238. fprintf(stderr, "ipAssignments set\n");
  239. m->set_active_bridge(j.value("activeBridge", false));
  240. fprintf(stderr, "activeBridge set\n");
  241. if (j["tags"].is_array()) {
  242. fprintf(stderr, "memberFromJSON: has tags\n");
  243. nlohmann::json tags = j["tags"];
  244. std::string tagsStr = OSUtils::jsonDump(tags, -1);
  245. m->set_tags(tagsStr);
  246. fprintf(stderr, "tags set\n");
  247. }
  248. else {
  249. fprintf(stderr, "memberFromJSON: no tags\n");
  250. nlohmann::json tags = nlohmann::json::array();
  251. std::string tagsStr = OSUtils::jsonDump(tags, -1);
  252. m->set_tags(tagsStr);
  253. fprintf(stderr, "tags set\n");
  254. }
  255. if (j["capabilities"].is_array()) {
  256. fprintf(stderr, "memberFromJSON: has capabilities\n");
  257. nlohmann::json caps = j["capabilities"];
  258. std::string capsStr = OSUtils::jsonDump(caps, -1);
  259. m->set_capabilities(capsStr);
  260. fprintf(stderr, "capabilities set\n");
  261. }
  262. else {
  263. fprintf(stderr, "memberFromJSON: no capabilities\n");
  264. nlohmann::json caps = nlohmann::json::array();
  265. std::string capsStr = OSUtils::jsonDump(caps, -1);
  266. m->set_capabilities(capsStr);
  267. fprintf(stderr, "capabilities set\n");
  268. }
  269. m->set_creation_time(j.value("creationTime", 0));
  270. fprintf(stderr, "creationTime set\n");
  271. m->set_no_auto_assign_ips(j.value("noAutoAssignIps", false));
  272. fprintf(stderr, "noAutoAssignIps set\n");
  273. m->set_revision(j.value("revision", 0));
  274. fprintf(stderr, "revision set\n");
  275. m->set_last_authorized_time(j.value("lastAuthorizedTime", 0));
  276. fprintf(stderr, "lastAuthorizedTime set\n");
  277. m->set_last_deauthorized_time(j.value("lastDeauthorizedTime", 0));
  278. fprintf(stderr, "lastDeauthorizedTime set\n");
  279. m->set_last_authorized_credential_type(j.value("lastAuthorizedCredentialType", ""));
  280. fprintf(stderr, "lastAuthorizedCredentialType set\n");
  281. m->set_last_authorized_credential(j.value("lastAuthorizedCredential", ""));
  282. fprintf(stderr, "lastAuthorizedCredential set\n");
  283. m->set_version_major(j.value("versionMajor", 0));
  284. fprintf(stderr, "versionMajor set\n");
  285. m->set_version_minor(j.value("versionMinor", 0));
  286. fprintf(stderr, "versionMinor set\n");
  287. m->set_version_rev(j.value("versionRev", 0));
  288. fprintf(stderr, "versionRev set\n");
  289. m->set_version_protocol(j.value("versionProtocol", 0));
  290. fprintf(stderr, "versionProtocol set\n");
  291. m->set_remote_trace_level(j.value("remoteTraceLevel", 0));
  292. fprintf(stderr, "remoteTraceLevel set\n");
  293. m->set_remote_trace_target(j.value("remoteTraceTarget", ""));
  294. fprintf(stderr, "remoteTraceTarget set\n");
  295. m->set_sso_exempt(j.value("ssoExempt", false));
  296. fprintf(stderr, "ssoExempt set\n");
  297. m->set_auth_expiry_time(j.value("authExpiryTime", 0));
  298. fprintf(stderr, "authExpiryTime set\n");
  299. }
  300. catch (const std::exception& e) {
  301. fprintf(stderr, "Exception parsing member JSON: %s\n", e.what());
  302. delete m;
  303. return nullptr;
  304. }
  305. fprintf(stderr, "memberFromJSON complete\n");
  306. return m;
  307. }
  308. pbmessages::MemberChange*
  309. memberChangeFromJson(std::string controllerID, const nlohmann::json& oldMember, const nlohmann::json& newMember)
  310. {
  311. fprintf(stderr, "memberrChangeFromJson: old: %s\n", oldMember.dump().c_str());
  312. fprintf(stderr, "memberrChangeFromJson: new: %s\n", newMember.dump().c_str());
  313. pbmessages::MemberChange* mc = new pbmessages::MemberChange();
  314. pbmessages::MemberChange_Member* om = memberFromJson(oldMember);
  315. if (om != nullptr) {
  316. mc->set_allocated_old(om);
  317. }
  318. pbmessages::MemberChange_Member* nm = memberFromJson(newMember);
  319. if (nm != nullptr) {
  320. mc->set_allocated_new_(nm);
  321. }
  322. mc->set_change_source(pbmessages::MemberChange_ChangeSource::MemberChange_ChangeSource_CONTROLLER);
  323. pbmessages::MemberChange_MemberChangeMetadata* metadata = new pbmessages::MemberChange_MemberChangeMetadata();
  324. metadata->set_controller_id(controllerID);
  325. metadata->set_trace_id(""); // TODO: generate a trace ID
  326. mc->set_allocated_metadata(metadata);
  327. return mc;
  328. }
  329. } // namespace ZeroTier