Selaa lähdekoodia

Redis now usable as a message queue

Grant Limberg 5 vuotta sitten
vanhempi
sitoutus
563655a1a4
3 muutettua tiedostoa jossa 102 lisäystä ja 17 poistoa
  1. 96 13
      controller/PostgreSQL.cpp
  2. 4 3
      controller/PostgreSQL.hpp
  3. 2 1
      make-mac.mk

+ 96 - 13
controller/PostgreSQL.cpp

@@ -68,6 +68,10 @@ std::string join(const std::vector<std::string> &elements, const char * const se
 
 using namespace ZeroTier;
 
+using Attrs = std::vector<std::pair<std::string, std::string>>;
+using Item = std::pair<std::string, Attrs>;
+using ItemStream = std::vector<Item>;
+
 PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, RedisConfig *rc)
 	: DB()
 	, _myId(myId)
@@ -124,9 +128,9 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, R
 		opts.db = 0;
 		poolOpts.size = 10;
 		if (_rc->clusterMode) {
-			_cluster = new sw::redis::RedisCluster(opts, poolOpts);
+			_cluster = std::make_shared<sw::redis::RedisCluster>(opts, poolOpts);
 		} else {
-			_redis = new sw::redis::Redis(opts, poolOpts);
+			_redis = std::make_shared<sw::redis::Redis>(opts, poolOpts);
 		}
 	}
 
@@ -145,17 +149,15 @@ PostgreSQL::~PostgreSQL()
 	_run = 0;
 	std::this_thread::sleep_for(std::chrono::milliseconds(100));
 
-	
-
 	_heartbeatThread.join();
 	_membersDbWatcher.join();
 	_networksDbWatcher.join();
+	_commitQueue.stop();
 	for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) {
 		_commitThread[i].join();
 	}
 	_onlineNotificationThread.join();
-	delete _redis;
-	delete _cluster;
+	fprintf(stderr, "~PostgreSQL() done\n");
 }
 
 
@@ -651,6 +653,7 @@ void PostgreSQL::heartbeat()
 
 	PQfinish(conn);
 	conn = NULL;
+	fprintf(stderr, "Exited heartbeat thread\n");
 }
 
 void PostgreSQL::membersDbWatcher()
@@ -664,10 +667,10 @@ void PostgreSQL::membersDbWatcher()
 
 	initializeMembers(conn);
 
-	if (false) {
-		// PQfinish(conn);
-		// conn = NULL;
-		// _membersWatcher_RabbitMQ();
+	if (_rc) {
+		PQfinish(conn);
+		conn = NULL;
+		_membersWatcher_Redis();
 	} else {
 		_membersWatcher_Postgres(conn);
 		PQfinish(conn);
@@ -722,9 +725,47 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
 	}
 }
 
-void PostgreSQL::_membersWatcher_Reids() {
-	char buff[11] = {0};
+void PostgreSQL::_membersWatcher_Redis() {
+	char buf[11] = {0};
+	std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}";
 	
+	while (_run == 1) {
+		json tmp;
+		std::unordered_map<std::string, ItemStream> result;
+		if (_rc->clusterMode) {
+			_cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
+		} else {
+			_redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
+		}
+		if (!result.empty()) {
+			for (auto element : result) {
+				fprintf(stdout, "Received notification from: %s\n", element.first.c_str());
+				for (auto rec : element.second) {
+					std::string id = rec.first;
+					auto attrs = rec.second;
+					fprintf(stdout, "Record ID: %s\n", id.c_str());
+					fprintf(stdout, "attrs len: %lu\n", attrs.size());
+					for (auto a : attrs) {
+						fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str());
+						try {
+							tmp = json::parse(a.second);
+							json &ov = tmp["old_val"];
+							json &nv = tmp["new_val"];
+							json oldConfig, newConfig;
+							if (ov.is_object()) oldConfig = ov;
+							if (nv.is_object()) newConfig = nv;
+							if (oldConfig.is_object()||newConfig.is_object()) {
+								_memberChanged(oldConfig,newConfig,(this->_ready >= 2));
+							}
+						} catch (...) {
+							fprintf(stderr, "json parse error in networkWatcher_Redis\n");
+						}
+					}
+				}
+			}
+		}
+	}
+	fprintf(stderr, "membersWatcher ended\n");
 }
 
 void PostgreSQL::networksDbWatcher()
@@ -795,7 +836,48 @@ void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) {
 }
 
 void PostgreSQL::_networksWatcher_Redis() {
-
+	char buf[11] = {0};
+	std::string key = "network-stream:{" + std::string(_myAddress.toString(buf)) + "}";
+	
+	while (_run == 1) {
+		json tmp;
+		std::unordered_map<std::string, ItemStream> result;
+		if (_rc->clusterMode) {
+			_cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
+		} else {
+			_redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
+		}
+		
+		if (!result.empty()) {
+			for (auto element : result) {
+
+				fprintf(stdout, "Received notification from: %s\n", element.first.c_str());
+				for (auto rec : element.second) {
+					std::string id = rec.first;
+					auto attrs = rec.second;
+					fprintf(stdout, "Record ID: %s\n", id.c_str());
+					fprintf(stdout, "attrs len: %lu\n", attrs.size());
+					for (auto a : attrs) {
+						fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str());
+						try {
+							tmp = json::parse(a.second);
+							json &ov = tmp["old_val"];
+							json &nv = tmp["new_val"];
+							json oldConfig, newConfig;
+							if (ov.is_object()) oldConfig = ov;
+							if (nv.is_object()) newConfig = nv;
+							if (oldConfig.is_object()||newConfig.is_object()) {
+								_networkChanged(oldConfig,newConfig,(this->_ready >= 2));
+							}
+						} catch (...) {
+							fprintf(stderr, "json parse error in networkWatcher_Redis\n");
+						}
+					}
+				}
+			}
+		}
+	}
+	fprintf(stderr, "networksWatcher ended\n");
 }
 
 void PostgreSQL::commitThread()
@@ -1293,6 +1375,7 @@ void PostgreSQL::commitThread()
 		fprintf(stderr, "ERROR: %s commitThread should still be running! Exiting Controller.\n", _myAddressStr.c_str());
 		exit(7);
 	}
+	fprintf(stderr, "commitThread finished\n");
 }
 
 void PostgreSQL::onlineNotificationThread()

+ 4 - 3
controller/PostgreSQL.hpp

@@ -20,6 +20,7 @@
 
 #define ZT_CENTRAL_CONTROLLER_COMMIT_THREADS 4
 
+#include <memory>
 #include <redis++/redis++.h>
 
 extern "C" {
@@ -64,7 +65,7 @@ private:
 	void networksDbWatcher();
 	void _networksWatcher_Postgres(PGconn *conn);
 
-	void _membersWatcher_Reids();
+	void _membersWatcher_Redis();
 	void _networksWatcher_Redis();
 
 	void commitThread();
@@ -100,8 +101,8 @@ private:
 	int _listenPort;
 
 	RedisConfig *_rc;
-	sw::redis::Redis *_redis;
-	sw::redis::RedisCluster *_cluster;
+	std::shared_ptr<sw::redis::Redis> _redis;
+	std::shared_ptr<sw::redis::RedisCluster> _cluster;
 };
 
 } // namespace ZeroTier

+ 2 - 1
make-mac.mk

@@ -28,9 +28,10 @@ include objects.mk
 ONE_OBJS+=osdep/MacEthernetTap.o osdep/MacKextEthernetTap.o ext/http-parser/http_parser.o
 
 ifeq ($(ZT_CONTROLLER),1)
-	LIBS+=-L/usr/local/opt/libpq/lib -lpq -Lext/redis-plus-plus-1.1.1/install/macos/lib -lredis++ -Lext/hiredis-0.14.1/lib/macos -lhiredis
+	LIBS+=-L/usr/local/opt/libpq/lib -lpq ext/redis-plus-plus-1.1.1/install/macos/lib/libredis++.a ext/hiredis-0.14.1/lib/macos/libhiredis.a
 	DEFS+=-DZT_CONTROLLER_USE_LIBPQ -DZT_CONTROLLER_USE_REDIS -DZT_CONTROLLER 
 	INCLUDES+=-I/usr/local/opt/libpq/include -Iext/hiredis-0.14.1/include/ -Iext/redis-plus-plus-1.1.1/install/macos/include/sw/
+	
 endif
 
 # Official releases are signed with our Apple cert and apply software updates by default