Browse Source

add PollSet class (backported from develop)

Guenter Obiltschnig 8 years ago
parent
commit
341aed39fe

+ 2 - 1
Net/Makefile

@@ -31,7 +31,8 @@ objects = \
 	NTPClient NTPEventArgs NTPPacket \
 	RemoteSyslogChannel RemoteSyslogListener SMTPChannel \
 	WebSocket WebSocketImpl \
-	OAuth10Credentials OAuth20Credentials
+	OAuth10Credentials OAuth20Credentials \
+	PollSet
 
 target         = PocoNet
 target_version = $(LIBVERSION)

+ 86 - 0
Net/include/Poco/Net/PollSet.h

@@ -0,0 +1,86 @@
+//
+// PollSet.h
+//
+// Library: Net
+// Package: Sockets
+// Module:  PollSet
+//
+// Definition of the PollSet class.
+//
+// Copyright (c) 2016, Applied Informatics Software Engineering GmbH.
+// All rights reserved.
+//
+// SPDX-License-Identifier:	BSL-1.0
+//
+
+
+#ifndef Net_PollSet_INCLUDED
+#define Net_PollSet_INCLUDED
+
+
+#include "Poco/Net/Socket.h"
+#include <map>
+
+
+namespace Poco {
+namespace Net {
+
+
+class PollSetImpl;
+
+
+class Net_API PollSet
+	/// A set of sockets that can be efficiently polled as a whole.
+	///
+	/// If supported, PollSet is implemented using epoll (Linux) or
+	/// poll (BSD) APIs. A fallback implementation using select()
+	/// is also provided.
+{
+public:
+	enum Mode
+	{
+		POLL_READ  = 0x01,
+		POLL_WRITE = 0x02,
+		POLL_ERROR = 0x04
+	};
+
+	typedef std::map<Poco::Net::Socket, int> SocketModeMap;
+
+	PollSet();
+		/// Creates an empty PollSet.
+
+	~PollSet();
+		/// Destroys the PollSet.
+
+	void add(const Poco::Net::Socket& socket, int mode);
+		/// Adds the given socket to the set, for polling with
+		/// the given mode, which can be an OR'd combination of
+		/// POLL_READ, POLL_WRITE and POLL_ERROR.
+
+	void remove(const Poco::Net::Socket& socket);
+		/// Removes the given socket from the set.
+
+	void update(const Poco::Net::Socket& socket, int mode);
+		/// Updates the mode of the given socket.
+
+	void clear();
+		/// Removes all sockets from the PollSet.
+
+	SocketModeMap poll(const Poco::Timespan& timeout);
+		/// Waits until the state of at least one of the PollSet's sockets
+		/// changes accordingly to its mode, or the timeout expires.
+		/// Returns a PollMap containing the sockets that have had
+		/// their state changed.
+
+private:
+	PollSetImpl* _pImpl;
+
+	PollSet(const PollSet&);
+	PollSet& operator = (const PollSet&);
+};
+
+
+} } // namespace Poco::Net
+
+
+#endif // Net_PollSet_INCLUDED

+ 55 - 54
Net/include/Poco/Net/SocketImpl.h

@@ -31,7 +31,7 @@ namespace Net {
 
 class Net_API SocketImpl: public Poco::RefCountedObject
 	/// This class encapsulates the Berkeley sockets API.
-	/// 
+	///
 	/// Subclasses implement specific socket types like
 	/// stream or datagram sockets.
 	///
@@ -56,9 +56,9 @@ public:
 		/// with the client.
 		///
 		/// The client socket's address is returned in clientAddr.
-	
+
 	virtual void connect(const SocketAddress& address);
-		/// Initializes the socket and establishes a connection to 
+		/// Initializes the socket and establishes a connection to
 		/// the TCP server at the given address.
 		///
 		/// Can also be used for UDP sockets. In this case, no
@@ -66,14 +66,14 @@ public:
 		/// packets are restricted to the specified address.
 
 	virtual void connect(const SocketAddress& address, const Poco::Timespan& timeout);
-		/// Initializes the socket, sets the socket timeout and 
+		/// Initializes the socket, sets the socket timeout and
 		/// establishes a connection to the TCP server at the given address.
 
 	virtual void connectNB(const SocketAddress& address);
-		/// Initializes the socket and establishes a connection to 
+		/// Initializes the socket and establishes a connection to
 		/// the TCP server at the given address. Prior to opening the
 		/// connection the socket is set to nonblocking mode.
-	
+
 	virtual void bind(const SocketAddress& address, bool reuseAddress = false);
 		/// Bind a local address to the socket.
 		///
@@ -149,14 +149,14 @@ public:
 
 	virtual void shutdownReceive();
 		/// Shuts down the receiving part of the socket connection.
-		
+
 	virtual void shutdownSend();
 		/// Shuts down the sending part of the socket connection.
-		
+
 	virtual void shutdown();
 		/// Shuts down both the receiving and the sending part
 		/// of the socket connection.
-	
+
 	virtual int sendBytes(const void* buffer, int length, int flags = 0);
 		/// Sends the contents of the given buffer through
 		/// the socket.
@@ -166,7 +166,7 @@ public:
 		///
 		/// Certain socket implementations may also return a negative
 		/// value denoting a certain condition.
-	
+
 	virtual int receiveBytes(void* buffer, int length, int flags = 0);
 		/// Receives data from the socket and stores it
 		/// in buffer. Up to length bytes are received.
@@ -175,21 +175,21 @@ public:
 		///
 		/// Certain socket implementations may also return a negative
 		/// value denoting a certain condition.
-	
+
 	virtual int sendTo(const void* buffer, int length, const SocketAddress& address, int flags = 0);
 		/// Sends the contents of the given buffer through
 		/// the socket to the given address.
 		///
 		/// Returns the number of bytes sent, which may be
 		/// less than the number of bytes specified.
-	
+
 	virtual int receiveFrom(void* buffer, int length, SocketAddress& address, int flags = 0);
 		/// Receives data from the socket and stores it
 		/// in buffer. Up to length bytes are received.
 		/// Stores the address of the sender in address.
 		///
 		/// Returns the number of bytes received.
-	
+
 	virtual void sendUrgent(unsigned char data);
 		/// Sends one byte of urgent data through
 		/// the socket.
@@ -198,24 +198,24 @@ public:
 		///
 		/// The preferred way for a socket to receive urgent data
 		/// is by enabling the SO_OOBINLINE option.
-	
+
 	virtual int available();
 		/// Returns the number of bytes available that can be read
 		/// without causing the socket to block.
-		
+
 	virtual bool poll(const Poco::Timespan& timeout, int mode);
-		/// Determines the status of the socket, using a 
+		/// Determines the status of the socket, using a
 		/// call to select().
-		/// 
+		///
 		/// The mode argument is constructed by combining the values
 		/// of the SelectMode enumeration.
 		///
 		/// Returns true if the next operation corresponding to
 		/// mode will not block, false otherwise.
-		
+
 	virtual void setSendBufferSize(int size);
 		/// Sets the size of the send buffer.
-		
+
 	virtual int getSendBufferSize();
 		/// Returns the size of the send buffer.
 		///
@@ -225,7 +225,7 @@ public:
 
 	virtual void setReceiveBufferSize(int size);
 		/// Sets the size of the receive buffer.
-		
+
 	virtual int getReceiveBufferSize();
 		/// Returns the size of the receive buffer.
 		///
@@ -235,7 +235,7 @@ public:
 
 	virtual void setSendTimeout(const Poco::Timespan& timeout);
 		/// Sets the send timeout for the socket.
-	
+
 	virtual Poco::Timespan getSendTimeout();
 		/// Returns the send timeout for the socket.
 		///
@@ -248,20 +248,20 @@ public:
 		///
 		/// On systems that do not support SO_RCVTIMEO, a
 		/// workaround using poll() is provided.
-	
+
 	virtual Poco::Timespan getReceiveTimeout();
 		/// Returns the receive timeout for the socket.
 		///
 		/// The returned timeout may be different than the
 		/// timeout previously set with setReceiveTimeout(),
 		/// as the system is free to adjust the value.
-	
+
 	virtual SocketAddress address();
 		/// Returns the IP address and port number of the socket.
-		
+
 	virtual SocketAddress peerAddress();
 		/// Returns the IP address and port number of the peer socket.
-	
+
 	void setOption(int level, int option, int value);
 		/// Sets the socket option specified by level and option
 		/// to the given integer value.
@@ -273,64 +273,64 @@ public:
 	void setOption(int level, int option, unsigned char value);
 		/// Sets the socket option specified by level and option
 		/// to the given integer value.
-		
+
 	void setOption(int level, int option, const Poco::Timespan& value);
 		/// Sets the socket option specified by level and option
 		/// to the given time value.
-		
+
 	void setOption(int level, int option, const IPAddress& value);
 		/// Sets the socket option specified by level and option
 		/// to the given time value.
-		
+
 	virtual void setRawOption(int level, int option, const void* value, poco_socklen_t length);
 		/// Sets the socket option specified by level and option
 		/// to the given time value.
-		
+
 	void getOption(int level, int option, int& value);
-		/// Returns the value of the socket option 
+		/// Returns the value of the socket option
 		/// specified by level and option.
 
 	void getOption(int level, int option, unsigned& value);
-		/// Returns the value of the socket option 
+		/// Returns the value of the socket option
 		/// specified by level and option.
 
 	void getOption(int level, int option, unsigned char& value);
-		/// Returns the value of the socket option 
+		/// Returns the value of the socket option
 		/// specified by level and option.
 
 	void getOption(int level, int option, Poco::Timespan& value);
-		/// Returns the value of the socket option 
+		/// Returns the value of the socket option
 		/// specified by level and option.
-	
+
 	void getOption(int level, int option, IPAddress& value);
-		/// Returns the value of the socket option 
+		/// Returns the value of the socket option
 		/// specified by level and option.
 
 	virtual void getRawOption(int level, int option, void* value, poco_socklen_t& length);
-		/// Returns the value of the socket option 
-		/// specified by level and option.	
-	
+		/// Returns the value of the socket option
+		/// specified by level and option.
+
 	void setLinger(bool on, int seconds);
 		/// Sets the value of the SO_LINGER socket option.
-		
+
 	void getLinger(bool& on, int& seconds);
 		/// Returns the value of the SO_LINGER socket option.
-	
+
 	void setNoDelay(bool flag);
 		/// Sets the value of the TCP_NODELAY socket option.
-		
+
 	bool getNoDelay();
 		/// Returns the value of the TCP_NODELAY socket option.
-	
+
 	void setKeepAlive(bool flag);
 		/// Sets the value of the SO_KEEPALIVE socket option.
-		
+
 	bool getKeepAlive();
 		/// Returns the value of the SO_KEEPALIVE socket option.
-	
+
 	void setReuseAddress(bool flag);
 		/// Sets the value of the SO_REUSEADDR socket option.
-	
+
 	bool getReuseAddress();
 		/// Returns the value of the SO_REUSEADDR socket option.
 
@@ -338,22 +338,22 @@ public:
 		/// Sets the value of the SO_REUSEPORT socket option.
 		/// Does nothing if the socket implementation does not
 		/// support SO_REUSEPORT.
-	
+
 	bool getReusePort();
 		/// Returns the value of the SO_REUSEPORT socket option.
 		///
 		/// Returns false if the socket implementation does not
 		/// support SO_REUSEPORT.
-		
+
 	void setOOBInline(bool flag);
 		/// Sets the value of the SO_OOBINLINE socket option.
-	
+
 	bool getOOBInline();
 		/// Returns the value of the SO_OOBINLINE socket option.
-	
+
 	void setBroadcast(bool flag);
 		/// Sets the value of the SO_BROADCAST socket option.
-		
+
 	bool getBroadcast();
 		/// Returns the value of the SO_BROADCAST socket option.
 
@@ -397,7 +397,7 @@ public:
 protected:
 	SocketImpl();
 		/// Creates a SocketImpl.
-		
+
 	SocketImpl(poco_socket_t sockfd);
 		/// Creates a SocketImpl using the given native socket.
 
@@ -443,22 +443,23 @@ protected:
 
 	static void error(int code);
 		/// Throws an appropriate exception for the given error code.
-		
+
 	static void error(int code, const std::string& arg);
 		/// Throws an appropriate exception for the given error code.
 
 private:
 	SocketImpl(const SocketImpl&);
 	SocketImpl& operator = (const SocketImpl&);
-	
+
 	poco_socket_t _sockfd;
 	Poco::Timespan _recvTimeout;
 	Poco::Timespan _sndTimeout;
 	bool          _blocking;
 	bool          _isBrokenTimeout;
-	
+
 	friend class Socket;
 	friend class SecureSocketImpl;
+	friend class PollSetImpl;
 };
 
 

+ 518 - 0
Net/src/PollSet.cpp

@@ -0,0 +1,518 @@
+//
+// PollSet.cpp
+//
+// Library: Net
+// Package: Sockets
+// Module:  PollSet
+//
+// Copyright (c) 2016, Applied Informatics Software Engineering GmbH.
+// All rights reserved.
+//
+// SPDX-License-Identifier:	BSL-1.0
+//
+
+
+#include "Poco/Net/PollSet.h"
+#include "Poco/Net/SocketImpl.h"
+#include "Poco/Mutex.h"
+#include <set>
+
+
+#if defined(_WIN32) && _WIN32_WINNT >= 0x0600
+#ifndef POCO_HAVE_FD_POLL
+#define POCO_HAVE_FD_POLL 1
+#endif
+#elif defined(POCO_OS_FAMILY_BSD)
+#ifndef POCO_HAVE_FD_POLL
+#define POCO_HAVE_FD_POLL 1
+#endif
+#endif
+
+
+#if defined(POCO_HAVE_FD_EPOLL)
+#include <sys/epoll.h>
+#elif defined(POCO_HAVE_FD_POLL)
+#ifndef _WIN32
+#include <poll.h>
+#endif
+#endif
+
+
+namespace Poco {
+namespace Net {
+
+
+#if defined(POCO_HAVE_FD_EPOLL)
+
+
+//
+// Linux implementation using epoll
+//
+class PollSetImpl
+{
+public:
+	PollSetImpl():
+		_epollfd(-1),
+		_events(1024)
+	{
+		_epollfd = epoll_create(1);
+		if (_epollfd < 0)
+		{
+			SocketImpl::error();
+		}
+	}
+
+	~PollSetImpl()
+	{
+		if (_epollfd >= 0)
+			::close(_epollfd);
+	}
+
+	void add(const Socket& socket, int mode)
+	{
+		Poco::FastMutex::ScopedLock lock(_mutex);
+
+		poco_socket_t fd = socket.impl()->sockfd();
+		struct epoll_event ev;
+		ev.events = 0;
+		if (mode & PollSet::POLL_READ)
+			ev.events |= EPOLLIN;
+		if (mode & PollSet::POLL_WRITE)
+			ev.events |= EPOLLOUT;
+		if (mode & PollSet::POLL_ERROR)
+			ev.events |= EPOLLERR;
+		ev.data.ptr = socket.impl();
+		int err = epoll_ctl(_epollfd, EPOLL_CTL_ADD, fd, &ev);
+		if (err) SocketImpl::error();
+
+		_socketMap[socket.impl()] = socket;
+	}
+
+	void remove(const Socket& socket)
+	{
+		Poco::FastMutex::ScopedLock lock(_mutex);
+
+		poco_socket_t fd = socket.impl()->sockfd();
+		struct epoll_event ev;
+		ev.events = 0;
+		ev.data.ptr = 0;
+		int err = epoll_ctl(_epollfd, EPOLL_CTL_DEL, fd, &ev);
+		if (err) SocketImpl::error();
+
+		_socketMap.erase(socket.impl());
+	}
+
+	void update(const Socket& socket, int mode)
+	{
+		poco_socket_t fd = socket.impl()->sockfd();
+		struct epoll_event ev;
+		ev.events = 0;
+		if (mode & PollSet::POLL_READ)
+			ev.events |= EPOLLIN;
+		if (mode & PollSet::POLL_WRITE)
+			ev.events |= EPOLLOUT;
+		if (mode & PollSet::POLL_ERROR)
+			ev.events |= EPOLLERR;
+		ev.data.ptr = socket.impl();
+		int err = epoll_ctl(_epollfd, EPOLL_CTL_MOD, fd, &ev);
+		if (err)
+		{
+			SocketImpl::error();
+		}
+	}
+
+	void clear()
+	{
+		Poco::FastMutex::ScopedLock lock(_mutex);
+
+		::close(_epollfd);
+		_socketMap.clear();
+		_epollfd = epoll_create(1);
+		if (_epollfd < 0)
+		{
+			SocketImpl::error();
+		}
+	}
+
+	PollSet::SocketModeMap poll(const Poco::Timespan& timeout)
+	{
+		PollSet::SocketModeMap result;
+
+		if (_socketMap.empty()) return result;
+
+		Poco::Timespan remainingTime(timeout);
+		int rc;
+		do
+		{
+			Poco::Timestamp start;
+			rc = epoll_wait(_epollfd, &_events[0], _events.size(), remainingTime.totalMilliseconds());
+			if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
+			{
+				Poco::Timestamp end;
+				Poco::Timespan waited = end - start;
+				if (waited < remainingTime)
+					remainingTime -= waited;
+				else
+					remainingTime = 0;
+			}
+		}
+		while (rc < 0 && SocketImpl::lastError() == POCO_EINTR);
+		if (rc < 0) SocketImpl::error();
+
+		Poco::FastMutex::ScopedLock lock(_mutex);
+
+		for (int i = 0; i < rc; i++)
+		{
+			std::map<void*, Socket>::iterator it = _socketMap.find(_events[i].data.ptr);
+			if (it != _socketMap.end())
+			{
+				if (_events[i].events & EPOLLIN)
+					result[it->second] |= PollSet::POLL_READ;
+				if (_events[i].events & EPOLLOUT)
+					result[it->second] |= PollSet::POLL_WRITE;
+				if (_events[i].events & EPOLLERR)
+					result[it->second] |= PollSet::POLL_ERROR;
+			}
+		}
+
+		return result;
+	}
+
+private:
+	Poco::FastMutex _mutex;
+	int _epollfd;
+	std::map<void*, Socket> _socketMap;
+	std::vector<struct epoll_event> _events;
+};
+
+
+#elif defined(POCO_HAVE_FD_POLL)
+
+
+//
+// BSD implementation using poll
+//
+class PollSetImpl
+{
+public:
+	void add(const Socket& socket, int mode)
+	{
+		Poco::FastMutex::ScopedLock lock(_mutex);
+
+		poco_socket_t fd = socket.impl()->sockfd();
+		_addMap[fd] = mode;
+		_removeSet.erase(fd);
+		_socketMap[fd] = socket;
+	}
+
+	void remove(const Socket& socket)
+	{
+		Poco::FastMutex::ScopedLock lock(_mutex);
+
+		poco_socket_t fd = socket.impl()->sockfd();
+		_removeSet.insert(fd);
+		_addMap.erase(fd);
+		_socketMap.erase(fd);
+	}
+
+	void update(const Socket& socket, int mode)
+	{
+		Poco::FastMutex::ScopedLock lock(_mutex);
+
+		poco_socket_t fd = socket.impl()->sockfd();
+		for (std::vector<pollfd>::iterator it = _pollfds.begin(); it != _pollfds.end(); ++it)
+		{
+			if (it->fd == fd)
+			{
+				it->events = 0;
+				if (mode & PollSet::POLL_READ)
+					it->events |= POLLIN;
+				if (mode & PollSet::POLL_WRITE)
+					it->events |= POLLOUT;
+			}
+		}
+	}
+
+	void clear()
+	{
+		Poco::FastMutex::ScopedLock lock(_mutex);
+
+		_socketMap.clear();
+		_addMap.clear();
+		_removeSet.clear();
+		_pollfds.clear();
+	}
+
+	PollSet::SocketModeMap poll(const Poco::Timespan& timeout)
+	{
+		PollSet::SocketModeMap result;
+		{
+			Poco::FastMutex::ScopedLock lock(_mutex);
+
+			if (!_removeSet.empty())
+			{
+				for (std::vector<pollfd>::iterator it = _pollfds.begin(); it != _pollfds.end();)
+				{
+					if (_removeSet.find(it->fd) != _removeSet.end())
+					{
+						it = _pollfds.erase(it);
+					}
+					else ++it;
+				}
+				_removeSet.clear();
+			}
+
+			_pollfds.reserve(_pollfds.size() + _addMap.size());
+			for (std::map<poco_socket_t, int>::iterator it = _addMap.begin(); it != _addMap.end(); ++it)
+			{
+				pollfd pfd;
+				pfd.fd = it->first;
+				pfd.events = 0;
+				pfd.revents = 0;
+				if (it->second & PollSet::POLL_READ)
+					pfd.events |= POLLIN;
+				if (it->second & PollSet::POLL_WRITE)
+					pfd.events |= POLLOUT;
+
+				_pollfds.push_back(pfd);
+			}
+			_addMap.clear();
+		}
+
+		if (_pollfds.empty()) return result;
+
+		Poco::Timespan remainingTime(timeout);
+		int rc;
+		do
+		{
+			Poco::Timestamp start;
+#ifdef _WIN32
+			rc = WSAPoll(&_pollfds[0], _pollfds.size(), static_cast<INT>(timeout.totalMilliseconds()));
+#else
+			rc = ::poll(&_pollfds[0], _pollfds.size(), timeout.totalMilliseconds());
+#endif
+			if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
+			{
+				Poco::Timestamp end;
+				Poco::Timespan waited = end - start;
+				if (waited < remainingTime)
+					remainingTime -= waited;
+				else
+					remainingTime = 0;
+			}
+		}
+		while (rc < 0 && SocketImpl::lastError() == POCO_EINTR);
+		if (rc < 0) SocketImpl::error();
+
+		{
+			Poco::FastMutex::ScopedLock lock(_mutex);
+
+			if (!_socketMap.empty())
+			{
+				for (std::vector<pollfd>::iterator it = _pollfds.begin(); it != _pollfds.end(); ++it)
+				{
+					std::map<poco_socket_t, Socket>::const_iterator its = _socketMap.find(it->fd);
+					if (its != _socketMap.end())
+					{
+						if (it->revents & POLLIN)
+							result[its->second] |= PollSet::POLL_READ;
+						if (it->revents & POLLOUT)
+							result[its->second] |= PollSet::POLL_WRITE;
+						if (it->revents & POLLERR)
+							result[its->second] |= PollSet::POLL_ERROR;
+					}
+					it->revents = 0;
+				}
+			}
+		}
+
+		return result;
+	}
+
+private:
+	Poco::FastMutex _mutex;
+	std::map<poco_socket_t, Socket> _socketMap;
+	std::map<poco_socket_t, int> _addMap;
+	std::set<poco_socket_t> _removeSet;
+	std::vector<pollfd> _pollfds;
+};
+
+
+#else
+
+
+//
+// Fallback implementation using select()
+//
+class PollSetImpl
+{
+public:
+	void add(const Socket& socket, int mode)
+	{
+		Poco::FastMutex::ScopedLock lock(_mutex);
+
+		_map[socket] = mode;
+	}
+
+	void remove(const Socket& socket)
+	{
+		Poco::FastMutex::ScopedLock lock(_mutex);
+
+		_map.erase(socket);
+	}
+
+	void update(const Socket& socket, int mode)
+	{
+		Poco::FastMutex::ScopedLock lock(_mutex);
+
+		_map[socket] = mode;
+	}
+
+	void clear()
+	{
+		Poco::FastMutex::ScopedLock lock(_mutex);
+
+		_map.clear();
+	}
+
+	PollSet::SocketModeMap poll(const Poco::Timespan& timeout)
+	{
+		fd_set fdRead;
+		fd_set fdWrite;
+		fd_set fdExcept;
+		int nfd = 0;
+
+		FD_ZERO(&fdRead);
+		FD_ZERO(&fdWrite);
+		FD_ZERO(&fdExcept);
+
+		{
+			Poco::FastMutex::ScopedLock lock(_mutex);
+
+			for (PollSet::SocketModeMap::const_iterator it = _map.begin(); it != _map.end(); ++it)
+			{
+				poco_socket_t fd = it->first.impl()->sockfd();
+				if (fd != POCO_INVALID_SOCKET && it->second)
+				{
+					if (int(fd) > nfd) nfd = int(fd);
+
+					if (it->second & PollSet::POLL_READ)
+					{
+						FD_SET(fd, &fdRead);
+					}
+					if (it->second & PollSet::POLL_WRITE)
+					{
+						FD_SET(fd, &fdWrite);
+					}
+					if (it->second & PollSet::POLL_ERROR)
+					{
+						FD_SET(fd, &fdExcept);
+					}
+				}
+			}
+		}
+
+		PollSet::SocketModeMap result;
+		if (nfd == 0) return result;
+
+		Poco::Timespan remainingTime(timeout);
+		int rc;
+		do
+		{
+			struct timeval tv;
+			tv.tv_sec  = (long) remainingTime.totalSeconds();
+			tv.tv_usec = (long) remainingTime.useconds();
+			Poco::Timestamp start;
+			rc = ::select(nfd + 1, &fdRead, &fdWrite, &fdExcept, &tv);
+			if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
+			{
+				Poco::Timestamp end;
+				Poco::Timespan waited = end - start;
+				if (waited < remainingTime)
+					remainingTime -= waited;
+				else
+					remainingTime = 0;
+			}
+		}
+		while (rc < 0 && SocketImpl::lastError() == POCO_EINTR);
+		if (rc < 0) SocketImpl::error();
+
+		{
+			Poco::FastMutex::ScopedLock lock(_mutex);
+
+			for (PollSet::SocketModeMap::const_iterator it = _map.begin(); it != _map.end(); ++it)
+			{
+				poco_socket_t fd = it->first.impl()->sockfd();
+				if (fd != POCO_INVALID_SOCKET)
+				{
+					if (FD_ISSET(fd, &fdRead))
+					{
+						result[it->first] |= PollSet::POLL_READ;
+					}
+					if (FD_ISSET(fd, &fdWrite))
+					{
+						result[it->first] |= PollSet::POLL_WRITE;
+					}
+					if (FD_ISSET(fd, &fdExcept))
+					{
+						result[it->first] |= PollSet::POLL_ERROR;
+					}
+				}
+			}
+		}
+
+		return result;
+	}
+
+private:
+	Poco::FastMutex _mutex;
+	PollSet::SocketModeMap _map;
+};
+
+
+#endif
+
+
+PollSet::PollSet():
+	_pImpl(new PollSetImpl)
+{
+}
+
+
+PollSet::~PollSet()
+{
+	delete _pImpl;
+}
+
+
+void PollSet::add(const Socket& socket, int mode)
+{
+	_pImpl->add(socket, mode);
+}
+
+
+void PollSet::remove(const Socket& socket)
+{
+	_pImpl->remove(socket);
+}
+
+
+void PollSet::update(const Socket& socket, int mode)
+{
+	_pImpl->update(socket, mode);
+}
+
+
+void PollSet::clear()
+{
+	_pImpl->clear();
+}
+
+
+PollSet::SocketModeMap PollSet::poll(const Poco::Timespan& timeout)
+{
+	return _pImpl->poll(timeout);
+}
+
+
+} } // namespace Poco::Net

+ 2 - 1
Net/testsuite/Makefile

@@ -26,7 +26,8 @@ objects = \
 	NTPClientTest NTPClientTestSuite \
 	WebSocketTest WebSocketTestSuite \
 	SyslogTest \
-	OAuth10CredentialsTest OAuth20CredentialsTest OAuthTestSuite
+	OAuth10CredentialsTest OAuth20CredentialsTest OAuthTestSuite \
+	PollSetTest
 
 target         = testrunner
 target_version = 1

+ 135 - 0
Net/testsuite/src/PollSetTest.cpp

@@ -0,0 +1,135 @@
+//
+// PollSetTest.cpp
+//
+// Copyright (c) 2016, Applied Informatics Software Engineering GmbH.
+// and Contributors.
+//
+// SPDX-License-Identifier:	BSL-1.0
+//
+
+
+#include "PollSetTest.h"
+#include "CppUnit/TestCaller.h"
+#include "CppUnit/TestSuite.h"
+#include "EchoServer.h"
+#include "Poco/Net/StreamSocket.h"
+#include "Poco/Net/ServerSocket.h"
+#include "Poco/Net/SocketAddress.h"
+#include "Poco/Net/NetException.h"
+#include "Poco/Net/PollSet.h"
+#include "Poco/Stopwatch.h"
+
+
+using Poco::Net::Socket;
+using Poco::Net::StreamSocket;
+using Poco::Net::ServerSocket;
+using Poco::Net::SocketAddress;
+using Poco::Net::ConnectionRefusedException;
+using Poco::Net::PollSet;
+using Poco::Timespan;
+using Poco::Stopwatch;
+
+
+PollSetTest::PollSetTest(const std::string& name): CppUnit::TestCase(name)
+{
+}
+
+
+PollSetTest::~PollSetTest()
+{
+}
+
+
+void PollSetTest::testPoll()
+{
+	EchoServer echoServer1;
+	EchoServer echoServer2;
+	StreamSocket ss1;
+	StreamSocket ss2;
+
+	ss1.connect(SocketAddress("127.0.0.1", echoServer1.port()));
+	ss2.connect(SocketAddress("127.0.0.1", echoServer2.port()));
+
+	PollSet ps;
+	ps.add(ss1, PollSet::POLL_READ);
+
+	// nothing readable
+	Stopwatch sw;
+	sw.start();
+	Timespan timeout(1000000);
+	assert (ps.poll(timeout).empty());
+	assert (sw.elapsed() >= 900000);
+	sw.restart();
+
+	ps.add(ss2, PollSet::POLL_READ);
+
+	// ss1 must be writable, if polled for
+	ps.update(ss1, PollSet::POLL_READ | PollSet::POLL_WRITE);
+	PollSet::SocketModeMap sm = ps.poll(timeout);
+	assert (sm.find(ss1) != sm.end());
+	assert (sm.find(ss2) == sm.end());
+	assert (sm.find(ss1)->second == PollSet::POLL_WRITE);
+	assert (sw.elapsed() < 100000);
+
+	ps.update(ss1, PollSet::POLL_READ);
+
+	ss1.sendBytes("hello", 5);
+	char buffer[256];
+	sw.restart();
+	sm = ps.poll(timeout);
+	assert (sm.find(ss1) != sm.end());
+	assert (sm.find(ss2) == sm.end());
+	assert (sm.find(ss1)->second == PollSet::POLL_READ);
+	assert (sw.elapsed() < 100000);
+
+	int n = ss1.receiveBytes(buffer, sizeof(buffer));
+	assert (n == 5);
+	assert (std::string(buffer, n) == "hello");
+
+
+	ss2.sendBytes("HELLO", 5);
+	sw.restart();
+	sm = ps.poll(timeout);
+	assert (sm.find(ss1) == sm.end());
+	assert (sm.find(ss2) != sm.end());
+	assert (sm.find(ss2)->second == PollSet::POLL_READ);
+	assert (sw.elapsed() < 100000);
+
+	n = ss2.receiveBytes(buffer, sizeof(buffer));
+	assert (n == 5);
+	assert (std::string(buffer, n) == "HELLO");
+
+	ps.remove(ss2);
+
+	ss2.sendBytes("HELLO", 5);
+	sw.restart();
+	sm = ps.poll(timeout);
+	assert (sm.empty());
+
+	n = ss2.receiveBytes(buffer, sizeof(buffer));
+	assert (n == 5);
+	assert (std::string(buffer, n) == "HELLO");
+
+	ss1.close();
+	ss2.close();
+}
+
+
+void PollSetTest::setUp()
+{
+}
+
+
+void PollSetTest::tearDown()
+{
+}
+
+
+CppUnit::Test* PollSetTest::suite()
+{
+	CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("PollSetTest");
+
+	CppUnit_addTest(pSuite, PollSetTest, testPoll);
+
+	return pSuite;
+}

+ 36 - 0
Net/testsuite/src/PollSetTest.h

@@ -0,0 +1,36 @@
+//
+// PollSetTest.h
+//
+// Definition of the PollSetTest class.
+//
+// Copyright (c) 2016, Applied Informatics Software Engineering GmbH.
+// and Contributors.
+//
+// SPDX-License-Identifier:	BSL-1.0
+//
+
+
+#ifndef PollSetTest_INCLUDED
+#define PollSetTest_INCLUDED
+
+
+#include "Poco/Net/Net.h"
+#include "CppUnit/TestCase.h"
+
+
+class PollSetTest: public CppUnit::TestCase
+{
+public:
+	PollSetTest(const std::string& name);
+	~PollSetTest();
+
+	void testPoll();
+
+	void setUp();
+	void tearDown();
+
+	static CppUnit::Test* suite();
+};
+
+
+#endif // PollSetTest_INCLUDED

+ 2 - 0
Net/testsuite/src/SocketsTestSuite.cpp

@@ -15,6 +15,7 @@
 #include "MulticastSocketTest.h"
 #include "DialogSocketTest.h"
 #include "RawSocketTest.h"
+#include "PollSetTest.h"
 
 
 CppUnit::Test* SocketsTestSuite::suite()
@@ -29,5 +30,6 @@ CppUnit::Test* SocketsTestSuite::suite()
 #ifdef POCO_NET_HAS_INTERFACE
 	pSuite->addTest(MulticastSocketTest::suite());
 #endif
+	pSuite->addTest(PollSetTest::suite());
 	return pSuite;
 }