Browse Source

multiple auth threads

mom040267 11 years ago
parent
commit
c3da54c292

+ 1 - 1
src/apps/relay/dbdrivers/dbd_mysql.c

@@ -240,6 +240,7 @@ static MYSQL *get_mydb_connection(void) {
 					mydbconnection=NULL;
 				} else if(!donot_print_connection_success) {
 					TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO, "MySQL DB connection success: %s\n",pud->userdb);
+					donot_print_connection_success = 1;
 				}
 			}
 			MyconninfoFree(co);
@@ -876,7 +877,6 @@ static int mysql_list_realm_options(u08bits *realm) {
   
 static void mysql_auth_ping(void * rch) {
 	UNUSED_ARG(rch);
-	donot_print_connection_success = 1;
 	MYSQL * myc = get_mydb_connection();
 	if(myc) {
 		char statement[TURN_LONG_STRING_SIZE];

+ 1 - 1
src/apps/relay/dbdrivers/dbd_pgsql.c

@@ -77,6 +77,7 @@ static PGconn *get_pqdb_connection(void) {
 					TURN_LOG_FUNC(TURN_LOG_LEVEL_ERROR, "Cannot open PostgreSQL DB connection: <%s>, runtime error\n",pud->userdb);
 				} else if(!donot_print_connection_success){
 					TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO, "PostgreSQL DB connection success: %s\n",pud->userdb);
+					donot_print_connection_success = 1;
 				}
 			}
 		}
@@ -633,7 +634,6 @@ static int pgsql_list_realm_options(u08bits *realm) {
   
 static void pgsql_auth_ping(void * rch) {
 	UNUSED_ARG(rch);
-	donot_print_connection_success = 1;
 	PGconn * pqc = get_pqdb_connection();
 	if(pqc) {
 		char statement[TURN_LONG_STRING_SIZE];

+ 2 - 1
src/apps/relay/dbdrivers/dbd_redis.c

@@ -254,6 +254,7 @@ redis_context_handle get_redis_async_connection(struct event_base *base, const c
 				TURN_LOG_FUNC(TURN_LOG_LEVEL_ERROR, "Cannot initialize Redis DB connection\n");
 			} else if (is_redis_asyncconn_good(ret) && !donot_print_connection_success) {
 				TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO, "Redis DB async connection to be used: %s\n", connection_string);
+				donot_print_connection_success = 1;
 			}
 			RyconninfoFree(co);
 		}
@@ -348,6 +349,7 @@ static redisContext *get_redis_connection(void) {
 				TURN_LOG_FUNC(TURN_LOG_LEVEL_ERROR, "Cannot initialize Redis DB connection\n");
 			} else if (!donot_print_connection_success) {
 				TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO, "Redis DB sync connection success: %s\n", pud->userdb);
+				donot_print_connection_success = 1;
 			}
 
 			RyconninfoFree(co);
@@ -1024,7 +1026,6 @@ static int redis_list_realm_options(u08bits *realm) {
 }
   
 static void redis_auth_ping(void * rch) {
-	donot_print_connection_success = 1;
 	redisContext *rc = get_redis_connection();
 	if(rc) {
 		turnFreeRedisReply(redisCommand(rc, "keys turn/origin/*"));

+ 1 - 0
src/apps/relay/dbdrivers/dbd_sqlite.c

@@ -135,6 +135,7 @@ static sqlite3 * get_sqlite_connection(void) {
 		} else if(!donot_print_connection_success){
 			init_sqlite_database(sqliteconnection);
 			TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO, "SQLite DB connection success: %s\n",pud->userdb);
+			donot_print_connection_success = 1;
 		}
 		if(sqliteconnection) {
 			(void) pthread_setspecific(connection_key, sqliteconnection);

+ 9 - 8
src/apps/relay/mainrelay.c

@@ -120,8 +120,9 @@ LOW_DEFAULT_PORTS_BOUNDARY,HIGH_DEFAULT_PORTS_BOUNDARY,0,0,0,"",
 /////////////// MISC PARAMS ////////////////
 0,0,0,0,0,SHATYPE_SHA1,':',0,0,TURN_CREDENTIALS_NONE,0,0,0,0,0,0,
 ///////////// Users DB //////////////
-{ (TURN_USERDB_TYPE)0, {"\0"}, {0,NULL,NULL, {NULL,0}} }
-
+{ (TURN_USERDB_TYPE)0, {"\0"}, {0,NULL,NULL, {NULL,0}} },
+///////////// CPUs //////////////////
+DEFAULT_CPUS_NUMBER
 };
 
 //////////////// OpenSSL Init //////////////////////
@@ -1801,14 +1802,14 @@ int main(int argc, char **argv)
 #if defined(_SC_NPROCESSORS_ONLN)
 
 	{
-		 long cpus = (long)sysconf(_SC_NPROCESSORS_CONF);
+		 turn_params.cpus = (long)sysconf(_SC_NPROCESSORS_CONF);
 
-		 if(cpus<1)
-			 cpus = 1;
-		 else if(cpus>MAX_NUMBER_OF_GENERAL_RELAY_SERVERS)
-			 cpus = MAX_NUMBER_OF_GENERAL_RELAY_SERVERS;
+		 if(turn_params.cpus<DEFAULT_CPUS_NUMBER)
+			 turn_params.cpus = DEFAULT_CPUS_NUMBER;
+		 else if(turn_params.cpus>MAX_NUMBER_OF_GENERAL_RELAY_SERVERS)
+			 turn_params.cpus = MAX_NUMBER_OF_GENERAL_RELAY_SERVERS;
 
-		 turn_params.general_relay_servers_number = (turnserver_id)cpus;
+		 turn_params.general_relay_servers_number = (turnserver_id)turn_params.cpus;
 	}
 
 #endif

+ 6 - 0
src/apps/relay/mainrelay.h

@@ -100,6 +100,8 @@ extern "C" {
 #define TURNSERVER_ID_BOUNDARY_BETWEEN_TCP_AND_UDP MAX_NUMBER_OF_GENERAL_RELAY_SERVERS
 #define TURNSERVER_ID_BOUNDARY_BETWEEN_UDP_AND_TCP TURNSERVER_ID_BOUNDARY_BETWEEN_TCP_AND_UDP
 
+#define DEFAULT_CPUS_NUMBER (2)
+
 /////////// TYPES ///////////////////////////////////
 
 enum _DH_KEY_SIZE {
@@ -304,6 +306,10 @@ typedef struct _turn_params_ {
 
   default_users_db_t default_users_db;
 
+/////// CPUs //////////////
+
+  unsigned long cpus;
+
 } turn_params_t;
 
 extern turn_params_t turn_params;

+ 66 - 26
src/apps/relay/netengine.c

@@ -39,7 +39,10 @@ static pthread_barrier_t barrier;
 
 ////////////// Auth Server ////////////////
 
+typedef unsigned char authserver_id;
+
 struct auth_server {
+	authserver_id id;
 	struct event_base* event_base;
 	struct bufferevent *in_buf;
 	struct bufferevent *out_buf;
@@ -47,7 +50,9 @@ struct auth_server {
 	redis_context_handle rch;
 };
 
-static struct auth_server authserver = {NULL,NULL,NULL,0,NULL};
+#define MIN_AUTHSERVER_NUMBER (3)
+static authserver_id authserver_number = MIN_AUTHSERVER_NUMBER;
+static struct auth_server authserver[256];
 
 //////////////////////////////////////////////
 
@@ -366,9 +371,20 @@ static void allocate_relay_addrs_ports(void) {
 
 static int handle_relay_message(relay_server_handle rs, struct message_to_relay *sm);
 
+static pthread_mutex_t auth_message_counter_mutex = PTHREAD_MUTEX_INITIALIZER;
+static authserver_id auth_message_counter = 1;
+
 void send_auth_message_to_auth_server(struct auth_message *am)
 {
-	struct evbuffer *output = bufferevent_get_output(authserver.out_buf);
+	pthread_mutex_lock(&auth_message_counter_mutex);
+	if(auth_message_counter>=authserver_number) auth_message_counter = 1;
+	else if(auth_message_counter<1) auth_message_counter = 1;
+	authserver_id sn = auth_message_counter++;
+	pthread_mutex_unlock(&auth_message_counter_mutex);
+
+	printf("%s: 111.111: %d\n",__FUNCTION__,(int)sn);
+
+	struct evbuffer *output = bufferevent_get_output(authserver[sn].out_buf);
 	if(evbuffer_add(output,am,sizeof(struct auth_message))<0) {
 		fprintf(stderr,"%s: Weird buffer error\n",__FUNCTION__);
 	}
@@ -1703,35 +1719,48 @@ static void* run_auth_server_thread(void *arg)
 
 	struct auth_server *as = (struct auth_server*)arg;
 
-	ns_bzero(as,sizeof(struct auth_server));
+	authserver_id id = as->id;
 
-	as->event_base = turn_event_base_new();
-	TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO,"IO method (auth thread): %s\n",event_base_get_method(as->event_base));
+	if(id == 0) {
 
-	struct bufferevent *pair[2];
-
-	bufferevent_pair_new(as->event_base, TURN_BUFFEREVENTS_OPTIONS, pair);
-	as->in_buf = pair[0];
-	as->out_buf = pair[1];
-	bufferevent_setcb(as->in_buf, auth_server_receive_message, NULL, NULL, as);
-	bufferevent_enable(as->in_buf, EV_READ);
+		barrier_wait();
 
-#if !defined(TURN_NO_HIREDIS)
-	as->rch = get_redis_async_connection(as->event_base, turn_params.redis_statsdb, 1);
+		while(run_auth_server_flag) {
+			reread_realms();
+			update_white_and_black_lists();
+#if defined(DB_TEST)
+			run_db_test();
 #endif
+			sleep(5);
+		}
 
-	struct event_base *eb = as->event_base;
+	} else {
 
-	barrier_wait();
+		ns_bzero(as,sizeof(struct auth_server));
 
-	while(run_auth_server_flag) {
-		reread_realms();
-		update_white_and_black_lists();
-		auth_ping(as->rch);
-		run_events(eb,NULL);
-#if defined(DB_TEST)
-		run_db_test();
+		as->id = id;
+
+		as->event_base = turn_event_base_new();
+		TURN_LOG_FUNC(TURN_LOG_LEVEL_INFO,"IO method (auth thread): %s\n",event_base_get_method(as->event_base));
+
+		struct bufferevent *pair[2];
+
+		bufferevent_pair_new(as->event_base, TURN_BUFFEREVENTS_OPTIONS, pair);
+		as->in_buf = pair[0];
+		as->out_buf = pair[1];
+		bufferevent_setcb(as->in_buf, auth_server_receive_message, NULL, NULL, as);
+		bufferevent_enable(as->in_buf, EV_READ);
+
+#if !defined(TURN_NO_HIREDIS)
+		as->rch = get_redis_async_connection(as->event_base, turn_params.redis_statsdb, 1);
 #endif
+
+		barrier_wait();
+
+		while(run_auth_server_flag) {
+			auth_ping(as->rch);
+			run_events(as->event_base,NULL);
+		}
 	}
 
 	return arg;
@@ -1781,11 +1810,15 @@ void setup_server(void)
 
 	pthread_mutex_init(&mutex_bps, NULL);
 
+	authserver_number = 1 + (authserver_id)(turn_params.cpus / 2);
+
+	if(authserver_number < MIN_AUTHSERVER_NUMBER) authserver_number = MIN_AUTHSERVER_NUMBER;
+
 #if !defined(TURN_NO_THREAD_BARRIERS)
 
-	/* relay threads plus auth thread plus main listener thread */
+	/* relay threads plus auth threads plus main listener thread */
 	/* udp address listener thread(s) will start later */
-	barrier_count = turn_params.general_relay_servers_number+2;
+	barrier_count = turn_params.general_relay_servers_number+authserver_number+1;
 
 	if(use_cli) {
 		barrier_count += 1;
@@ -1828,7 +1861,14 @@ void setup_server(void)
 		}
 	}
 
-	setup_auth_server(&authserver);
+	{
+		authserver_id sn = 0;
+		for(sn = 0; sn < authserver_number;++sn) {
+			authserver[sn].id = sn;
+			setup_auth_server(&(authserver[sn]));
+		}
+	}
+
 	if(use_cli)
 		setup_cli_server();
 

+ 20 - 20
src/apps/relay/userdb.c

@@ -108,37 +108,37 @@ void create_default_realm()
 	o_to_realm = ur_string_map_create(turn_free_simple);
 	default_realm_params_ptr = &_default_realm_params;
 	realms = ur_string_map_create(NULL);
-	ur_string_map_lock(realms);
+	lock_realms();
 	default_realm_params_ptr->status.alloc_counters =  ur_string_map_create(NULL);
-	ur_string_map_unlock(realms);
+	unlock_realms();
 }
 
 void get_default_realm_options(realm_options_t* ro)
 {
 	if(ro) {
-		ur_string_map_lock(realms);
+		lock_realms();
 		ns_bcopy(&(default_realm_params_ptr->options),ro,sizeof(realm_options_t));
-		ur_string_map_unlock(realms);
+		unlock_realms();
 	}
 }
 
 void set_default_realm_name(char *realm) {
-	ur_string_map_lock(realms);
+	lock_realms();
 	ur_string_map_value_type value = (ur_string_map_value_type)default_realm_params_ptr;
 	STRCPY(default_realm_params_ptr->options.name,realm);
 	ur_string_map_put(realms, (ur_string_map_key_type)default_realm_params_ptr->options.name, value);
 	add_to_secrets_list(&realms_list, realm);
-	ur_string_map_unlock(realms);
+	unlock_realms();
 }
 
 realm_params_t* get_realm(char* name)
 {
 	if(name && name[0]) {
-		ur_string_map_lock(realms);
+		lock_realms();
 		ur_string_map_value_type value = 0;
 		ur_string_map_key_type key = (ur_string_map_key_type)name;
 		if (ur_string_map_get(realms, key, &value)) {
-			ur_string_map_unlock(realms);
+			unlock_realms();
 			return (realm_params_t*)value;
 		} else {
 			realm_params_t *ret = (realm_params_t*)turn_malloc(sizeof(realm_params_t));
@@ -148,7 +148,7 @@ realm_params_t* get_realm(char* name)
 			ur_string_map_put(realms, key, value);
 			ret->status.alloc_counters =  ur_string_map_create(NULL);
 			add_to_secrets_list(&realms_list, name);
-			ur_string_map_unlock(realms);
+			unlock_realms();
 			return ret;
 		}
 	}
@@ -158,9 +158,9 @@ realm_params_t* get_realm(char* name)
 
 int get_realm_data(char* name, realm_params_t* rp)
 {
-	ur_string_map_lock(realms);
+	lock_realms();
 	ns_bcopy(get_realm(name),rp,sizeof(realm_params_t));
-	ur_string_map_unlock(realms);
+	unlock_realms();
 	return 0;
 }
 
@@ -193,20 +193,20 @@ void get_realm_options_by_name(char *realm, realm_options_t* ro)
 int change_total_quota(char *realm, int value)
 {
 	int ret = value;
-	ur_string_map_lock(realms);
+	lock_realms();
 	realm_params_t* rp = get_realm(realm);
 	rp->options.perf_options.total_quota = value;
-	ur_string_map_unlock(realms);
+	unlock_realms();
 	return ret;
 }
 
 int change_user_quota(char *realm, int value)
 {
 	int ret = value;
-	ur_string_map_lock(realms);
+	lock_realms();
 	realm_params_t* rp = get_realm(realm);
 	rp->options.perf_options.user_quota = value;
-	ur_string_map_unlock(realms);
+	unlock_realms();
 	return ret;
 }
 
@@ -1285,16 +1285,16 @@ void reread_realms(void)
 {
 	{
 		realm_params_t* defrp = get_realm(NULL);
-		ur_string_map_lock(realms);
+		lock_realms();
 		defrp->options.perf_options.max_bps = turn_params.max_bps;
 		defrp->options.perf_options.total_quota = turn_params.total_quota;
 		defrp->options.perf_options.user_quota = turn_params.user_quota;
-		ur_string_map_unlock(realms);
+		unlock_realms();
 	}
 
-  const turn_dbdriver_t * dbd = get_dbdriver();
-  if (dbd && dbd->reread_realms) {
-    (*dbd->reread_realms)(&realms_list);
+	const turn_dbdriver_t * dbd = get_dbdriver();
+	if (dbd && dbd->reread_realms) {
+		(*dbd->reread_realms)(&realms_list);
 	}
 }