浏览代码

rework commit thread & some connection pool borrowing issues

Grant Limberg 4 年之前
父节点
当前提交
ac0dc7844f
共有 2 个文件被更改,包括 19 次插入9 次删除
  1. 1 1
      controller/ConnectionPool.hpp
  2. 18 8
      controller/PostgreSQL.cpp

+ 1 - 1
controller/ConnectionPool.hpp

@@ -95,7 +95,7 @@ public:
 
 
         if(m_pool.size()==0){
         if(m_pool.size()==0){
             
             
-            if ((m_pool.size() + m_borrowed.size()) <= m_maxPoolSize) {
+            if ((m_pool.size() + m_borrowed.size()) < m_maxPoolSize) {
                 try {
                 try {
                     std::shared_ptr<Connection> conn = m_factory->create();
                     std::shared_ptr<Connection> conn = m_factory->create();
                     m_borrowed.insert(conn);
                     m_borrowed.insert(conn);

+ 18 - 8
controller/PostgreSQL.cpp

@@ -1025,6 +1025,19 @@ void PostgreSQL::commitThread()
 			fprintf(stderr, "not an object\n");
 			fprintf(stderr, "not an object\n");
 			continue;
 			continue;
 		}
 		}
+
+		std::shared_ptr<PostgresConnection> c;
+		try {
+			c = _pool->borrow();
+		} catch (std::exception &e) {
+			fprintf(stderr, "ERROR: %s\n", e.what());
+			continue;
+		}
+
+		if (!c) {
+			fprintf(stderr, "Error getting database connection\n");
+			continue;
+		}
 		
 		
 		try {
 		try {
 			nlohmann::json *config = &(qitem.first);
 			nlohmann::json *config = &(qitem.first);
@@ -1032,7 +1045,6 @@ void PostgreSQL::commitThread()
 			if (objtype == "member") {
 			if (objtype == "member") {
 				// fprintf(stderr, "%s: commitThread: member\n", _myAddressStr.c_str());
 				// fprintf(stderr, "%s: commitThread: member\n", _myAddressStr.c_str());
 				try {
 				try {
-					auto c = _pool->borrow();
 					pqxx::work w(*c->c);
 					pqxx::work w(*c->c);
 
 
 					std::string memberId = (*config)["id"];
 					std::string memberId = (*config)["id"];
@@ -1097,11 +1109,13 @@ void PostgreSQL::commitThread()
 						fprintf(stderr, "%s: ipAssignError\n", _myAddressStr.c_str());
 						fprintf(stderr, "%s: ipAssignError\n", _myAddressStr.c_str());
 						delete config;
 						delete config;
 						config = nullptr;
 						config = nullptr;
+						w.abort();
+						_pool->unborrow(c);
+						c.reset();
 						continue;
 						continue;
 					}
 					}
 
 
 					w.commit();
 					w.commit();
-					_pool->unborrow(c);
 
 
 					const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL);
 					const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL);
 					const uint64_t memberidInt = OSUtils::jsonIntHex((*config)["id"], 0ULL);
 					const uint64_t memberidInt = OSUtils::jsonIntHex((*config)["id"], 0ULL);
@@ -1124,7 +1138,6 @@ void PostgreSQL::commitThread()
 			} else if (objtype == "network") {
 			} else if (objtype == "network") {
 				try {
 				try {
 					// fprintf(stderr, "%s: commitThread: network\n", _myAddressStr.c_str());
 					// fprintf(stderr, "%s: commitThread: network\n", _myAddressStr.c_str());
-					auto c = _pool->borrow();
 					pqxx::work w(*c->c);
 					pqxx::work w(*c->c);
 
 
 					std::string id = (*config)["id"];
 					std::string id = (*config)["id"];
@@ -1244,7 +1257,6 @@ void PostgreSQL::commitThread()
 						id, domain, s);
 						id, domain, s);
 
 
 					w.commit();
 					w.commit();
-					_pool->unborrow(c);
 
 
 					const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL);
 					const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL);
 					if (nwidInt) {
 					if (nwidInt) {
@@ -1264,7 +1276,6 @@ void PostgreSQL::commitThread()
 			} else if (objtype == "_delete_network") {
 			} else if (objtype == "_delete_network") {
 				// fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str());
 				// fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str());
 				try {
 				try {
-					auto c = _pool->borrow();
 					pqxx::work w(*c->c);
 					pqxx::work w(*c->c);
 
 
 					std::string networkId = (*config)["nwid"];
 					std::string networkId = (*config)["nwid"];
@@ -1273,7 +1284,6 @@ void PostgreSQL::commitThread()
 						networkId);
 						networkId);
 
 
 					w.commit();
 					w.commit();
-					_pool->unborrow(c);
 				} catch (std::exception &e) {
 				} catch (std::exception &e) {
 					fprintf(stderr, "%s ERROR: Error deleting network: %s\n", _myAddressStr.c_str(), e.what());
 					fprintf(stderr, "%s ERROR: Error deleting network: %s\n", _myAddressStr.c_str(), e.what());
 				}
 				}
@@ -1281,7 +1291,6 @@ void PostgreSQL::commitThread()
 			} else if (objtype == "_delete_member") {
 			} else if (objtype == "_delete_member") {
 				// fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str());
 				// fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str());
 				try {
 				try {
-					auto c = _pool->borrow();
 					pqxx::work w(*c->c);
 					pqxx::work w(*c->c);
 
 
 					std::string memberId = (*config)["id"];
 					std::string memberId = (*config)["id"];
@@ -1292,7 +1301,6 @@ void PostgreSQL::commitThread()
 						memberId, networkId);
 						memberId, networkId);
 
 
 					w.commit();
 					w.commit();
-					_pool->unborrow(c);
 				} catch (std::exception &e) {
 				} catch (std::exception &e) {
 					fprintf(stderr, "%s ERROR: Error deleting member: %s\n", _myAddressStr.c_str(), e.what());
 					fprintf(stderr, "%s ERROR: Error deleting member: %s\n", _myAddressStr.c_str(), e.what());
 				}
 				}
@@ -1302,6 +1310,8 @@ void PostgreSQL::commitThread()
 		} catch (std::exception &e) {
 		} catch (std::exception &e) {
 			fprintf(stderr, "%s ERROR: Error getting objtype: %s\n", _myAddressStr.c_str(), e.what());
 			fprintf(stderr, "%s ERROR: Error getting objtype: %s\n", _myAddressStr.c_str(), e.what());
 		}
 		}
+		_pool->unborrow(c);
+		c.reset();
 		std::this_thread::sleep_for(std::chrono::milliseconds(100));
 		std::this_thread::sleep_for(std::chrono::milliseconds(100));
 	}
 	}