| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 | /* * Copyright (c)2019 ZeroTier, Inc. * * Use of this software is governed by the Business Source License included * in the LICENSE.TXT file in the project's root directory. * * Change Date: 2023-01-01 * * On the date above, in accordance with the Business Source License, use * of this software will be governed by version 2.0 of the Apache License. *//****/#include "RabbitMQ.hpp"#ifdef ZT_CONTROLLER_USE_LIBPQ#include <amqp.h>#include <amqp_tcp_socket.h>#include <stdexcept>#include <cstring>namespace ZeroTier{RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)	: _mqc(cfg)	, _qName(queueName)	, _socket(NULL)	, _status(0){}RabbitMQ::~RabbitMQ(){	amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);	amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);	amqp_destroy_connection(_conn);}void RabbitMQ::init(){	struct timeval tval;	memset(&tval, 0, sizeof(struct timeval));	tval.tv_sec = 5;	fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);	_conn = amqp_new_connection();	_socket = amqp_tcp_socket_new(_conn);	if (!_socket) {		throw std::runtime_error("Can't create socket for RabbitMQ");	}	fprintf(stderr, "RabbitMQ: amqp://%s:%s@%s:%d\n", _mqc->username.c_str(), _mqc->password.c_str(), _mqc->host.c_str(), _mqc->port);	_status = amqp_socket_open_noblock(_socket, _mqc->host.c_str(), _mqc->port, &tval);	if (_status) {		throw std::runtime_error("Can't connect to RabbitMQ");	}	amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,		_mqc->username.c_str(), _mqc->password.c_str());	if (r.reply_type != AMQP_RESPONSE_NORMAL) {		throw std::runtime_error("RabbitMQ Login Error");	}	static int chan = 0;	{		Mutex::Lock l(_chan_m);		_channel = ++chan;	}	amqp_channel_open(_conn, _channel);	r = amqp_get_rpc_reply(_conn);	if(r.reply_type != AMQP_RESPONSE_NORMAL) {		throw std::runtime_error("Error opening communication channel");	}	_q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);	r = amqp_get_rpc_reply(_conn);	if (r.reply_type != AMQP_RESPONSE_NORMAL) {		throw std::runtime_error("Error declaring queue " + std::string(_qName));	}	amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);	r = amqp_get_rpc_reply(_conn);	if (r.reply_type != AMQP_RESPONSE_NORMAL) {		throw std::runtime_error("Error consuming queue " + std::string(_qName));	}	fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);}std::string RabbitMQ::consume(){	amqp_rpc_reply_t res;	amqp_envelope_t envelope;	amqp_maybe_release_buffers(_conn);	struct timeval timeout;	timeout.tv_sec = 1;	timeout.tv_usec = 0;	res = amqp_consume_message(_conn, &envelope, &timeout, 0);	if (res.reply_type != AMQP_RESPONSE_NORMAL) {		if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {			// timeout waiting for message.  Return empty string			return "";		} else {			throw std::runtime_error("Error getting message");		}	}	std::string msg(		(const char*)envelope.message.body.bytes,		envelope.message.body.len	);	amqp_destroy_envelope(&envelope);	return msg;}}#endif // ZT_CONTROLLER_USE_LIBPQ
 |