Use epoll facilities

This commit is contained in:
2019-06-27 21:38:32 +02:00
parent 500f64e2f9
commit 19b67484db
11 changed files with 218 additions and 206 deletions

View File

@@ -6,22 +6,9 @@
#include <cstdio>
#include <sstream>
void ConnectionOperator::SendResponse(ClientSocket const & clientSocket, Http::Response const & response) const
std::vector<char> ConnectionOperator::HandleNewConnection(int fd)
{
auto bytesToSend = response.Serialize();
try
{
clientSocket.WriteBytes(bytesToSend);
}
catch(std::runtime_error & e)
{
logger.Error("Error writing data to clientSocket");
}
}
void ConnectionOperator::HandleNewConnection(ClientSocket const & newClient)
{
auto requestBytes = newClient.ReadBytes();
auto requestBytes = Socket::ReadBytes(fd, 512);
Http::Request request;
Http::Response response;
try
@@ -37,8 +24,8 @@ void ConnectionOperator::HandleNewConnection(ClientSocket const & newClient)
logger.Error(ss.str());
response.code = HttpResponse::Code::BAD_REQUEST;
SendResponse(newClient, response);
return;
return response.Serialize();
}
for(size_t i = 0; i < middlewares.size(); ++i)
@@ -57,12 +44,11 @@ void ConnectionOperator::HandleNewConnection(ClientSocket const & newClient)
logger.Error(ss.str());
response.code = HttpResponse::Code::NOT_IMPLEMENTED;
SendResponse(newClient, response);
return;
return response.Serialize();
}
auto bytesToSend = response.Serialize();
newClient.WriteBytes(bytesToSend);
return response.Serialize();
}
ConnectionOperator::ConnectionOperator(Logger & _logger, ServerConfiguration const & serverConfiguration)

View File

@@ -2,7 +2,7 @@
#include "../logger.hpp"
#include "../middleware/base.hpp"
#include <memory>
#include "socket/clientsocket.hpp"
#include "socket.hpp"
#include <string>
#include <vector>
@@ -12,10 +12,8 @@ private:
Logger & logger;
std::vector<std::unique_ptr<Middleware::BaseMiddleware>> middlewares;
void SendResponse(ClientSocket const & clientSocket, Http::Response const & response) const;
public:
void HandleNewConnection(ClientSocket const & newClient);
std::vector<char> HandleNewConnection(int fd);
ConnectionOperator(Logger & logger, ServerConfiguration const & serverConfiguration);
};

View File

@@ -1,32 +1,119 @@
#include "../logger.hpp"
#include "configuration.hpp"
#include "server.hpp"
#include <set>
#include <sstream>
#include <stdexcept>
#include <sys/epoll.h>
#include <unistd.h>
void HttpServer::HandleEpollInEvent(int fd)
{
if (fd != listeningSocketFileDescriptor)
{
logger.Info("EPOLLIN Attempted to handle a non registered file descriptor");
return;
}
unsigned sockaddrSize = sizeof(sockaddr_in);
int const connectionFileDescriptor = accept(
listeningSocketFileDescriptor,
reinterpret_cast<sockaddr *>(&socketAddress),
&sockaddrSize);
socketWriteMap[connectionFileDescriptor] = connectionOperator.HandleNewConnection(connectionFileDescriptor);
epoll_event event;
event.data.fd = connectionFileDescriptor;
event.events = EPOLLOUT | EPOLLONESHOT;
if (epoll_ctl(pollFileDescriptor, EPOLL_CTL_ADD, connectionFileDescriptor, &event) < 0)
{
logger.Error("Error registering file descriptor for EPOLLOUT");
}
}
void HttpServer::HandleEpollOutEvent(int fd)
{
auto result = socketWriteMap.find(fd);
if (result == socketWriteMap.end())
{
logger.Error("EPOLLOUT Event for unexpected fd");
return;
}
Socket::WriteBytes(fd, result->second);
close(fd);
}
void HttpServer::Execute()
{
std::vector<epoll_event> epollEvents;
epollEvents.resize(1000);
while(isOpen)
{
try
int eventsHappened = epoll_wait(pollFileDescriptor, epollEvents.data(), static_cast<int>(epollEvents.size()), -1);
for (int i = 0; i < eventsHappened; ++i)
{
ClientSocket newClient = listeningSocket.AcceptNextConnection();
connectionOperator.HandleNewConnection(newClient);
}
catch (std::runtime_error & e)
{
logger.Info("ClientSocket dropped on accept");
int const fd = epollEvents[i].data.fd;
if (epollEvents[i].events & EPOLLIN)
{
HandleEpollInEvent(fd);
}
else
{
HandleEpollOutEvent(fd);
}
}
}
}
HttpServer::HttpServer(Logger & _logger, ServerConfiguration const & serverConfiguration)
: logger(_logger),
listeningSocket(serverConfiguration.GetPort()),
pollFileDescriptor(-1),
listeningSocketFileDescriptor(-1),
connectionOperator(_logger, serverConfiguration),
isOpen(true)
{
pollFileDescriptor = epoll_create1(0);
if (pollFileDescriptor < 0)
{
throw std::runtime_error("Error creating epoll file descriptor");
}
socketAddress.sin_family = AF_INET;
socketAddress.sin_addr.s_addr = INADDR_ANY;
socketAddress.sin_port = htons(serverConfiguration.GetPort());
listeningSocketFileDescriptor = ListeningSocket::Create(socketAddress, 10000);
if (listeningSocketFileDescriptor < 0)
{
throw std::runtime_error("Error creating listening socket");
}
epoll_event event;
event.data.fd = listeningSocketFileDescriptor;
event.events = EPOLLIN;
if (epoll_ctl(pollFileDescriptor, EPOLL_CTL_ADD, listeningSocketFileDescriptor, &event) < 0)
{
throw std::runtime_error("Error registering listening socket with epoll facilities");
}
std::stringstream ss;
ss << "Listening on port " << serverConfiguration.GetPort();
logger.Info(ss.str());
}
HttpServer::~HttpServer()
{
if (listeningSocketFileDescriptor >= 0)
{
close(listeningSocketFileDescriptor);
}
if (pollFileDescriptor >= 0)
{
close(pollFileDescriptor);
}
}

View File

@@ -1,20 +1,28 @@
#pragma once
#include "../logger.hpp"
#include "connectionoperator.hpp"
#include "socket/listeningsocket.hpp"
#include <map>
class HttpServer
{
private:
Logger & logger;
ListeningSocket listeningSocket;
int pollFileDescriptor;
sockaddr_in socketAddress;
int listeningSocketFileDescriptor;
std::map<int, std::vector<char>> socketWriteMap;
ConnectionOperator connectionOperator;
bool isOpen;
void HandleEpollInEvent(int fd);
void HandleEpollOutEvent(int fd);
public:
void Execute();
HttpServer(Logger & logger, ServerConfiguration const & serverConfiguration);
~HttpServer();
HttpServer(HttpServer & other) = delete;
HttpServer & operator=(HttpServer & other) = delete;
};

85
src/server/socket.cpp Normal file
View File

@@ -0,0 +1,85 @@
#include "socket.hpp"
#include <stdexcept>
#include <unistd.h>
namespace ListeningSocket
{
int Create(sockaddr_in & socketAddress, int const connectionLimit)
{
int socketFileDescriptor = socket(AF_INET, SOCK_STREAM, 0);
if (socketFileDescriptor < 0)
{
return -1;
}
int enable = 1;
if (setsockopt(socketFileDescriptor, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0)
{
close(socketFileDescriptor);
return -1;
}
int const bindResult = bind(
socketFileDescriptor,
reinterpret_cast<sockaddr*>(&socketAddress),
sizeof(sockaddr_in));
if (bindResult < 0)
{
close(socketFileDescriptor);
return -1;
}
int const listenResult = listen(socketFileDescriptor, connectionLimit);
if (listenResult < 0)
{
close(socketFileDescriptor);
return -1;
}
return socketFileDescriptor;
}
}
namespace Socket
{
std::vector<char> ReadBytes(int fd, size_t limit)
{
size_t const readChunkSize = 128;
std::vector<char> buffer;
ssize_t totalBytesRead = 0;
ssize_t bytesRead = 0;
do
{
buffer.resize(buffer.size() + readChunkSize);
bytesRead = read(fd, &buffer[totalBytesRead], readChunkSize);
if (bytesRead < 0)
{
throw std::runtime_error("Error reading from filedescriptor");
}
totalBytesRead += bytesRead;
} while (bytesRead == readChunkSize && bytesRead < limit);
buffer.resize(totalBytesRead);
return buffer;
}
size_t WriteBytes(int fd, std::vector<char> const & bytes)
{
ssize_t totalBytesWritten = 0;
size_t const sizeToWrite = bytes.size();
while (totalBytesWritten < sizeToWrite)
{
ssize_t bytesWritten = write(fd, &bytes[totalBytesWritten], sizeToWrite - totalBytesWritten);
if (bytesWritten <= 0)
{
throw std::runtime_error("Error writing to filedescriptor");
}
totalBytesWritten += bytesWritten;
}
return totalBytesWritten;
}
}

16
src/server/socket.hpp Normal file
View File

@@ -0,0 +1,16 @@
#pragma once
#include <netinet/in.h>
#include <sys/socket.h>
#include <vector>
namespace ListeningSocket
{
int Create(sockaddr_in & socketAddress, int const connectionLimit);
}
namespace Socket
{
std::vector<char> ReadBytes(int fd, size_t limit);
size_t WriteBytes(int fd, std::vector<char> const & bytes);
}

View File

@@ -1,68 +0,0 @@
#include "clientsocket.hpp"
#include <stdexcept>
#include <unistd.h>
std::vector<char> ClientSocket::ReadBytes(size_t limit) const
{
size_t const readChunkSize = 128;
std::vector<char> buffer;
ssize_t totalBytesRead = 0;
ssize_t bytesRead = 0;
do
{
buffer.resize(buffer.size() + readChunkSize);
bytesRead = read(fileDescriptor, &buffer[totalBytesRead], readChunkSize);
if (bytesRead < 0)
{
throw std::runtime_error("Error reading from filedescriptor");
}
totalBytesRead += bytesRead;
} while (bytesRead == readChunkSize && bytesRead < limit);
buffer.resize(totalBytesRead);
return buffer;
}
size_t ClientSocket::WriteBytes(std::vector<char> const & bytes) const
{
ssize_t totalBytesWritten = 0;
size_t const sizeToWrite = bytes.size();
while (totalBytesWritten < sizeToWrite)
{
ssize_t bytesWritten = write(fileDescriptor, &bytes[totalBytesWritten], sizeToWrite - totalBytesWritten);
if (bytesWritten <= 0)
{
throw std::runtime_error("Error writing to filedescriptor");
}
totalBytesWritten += bytesWritten;
}
return totalBytesWritten;
}
ClientSocket::ClientSocket(int _fileDescriptor)
: fileDescriptor(_fileDescriptor)
{
if (_fileDescriptor < 0)
{
throw std::runtime_error("clientSocket created with invalid file descriptor");
}
}
ClientSocket::~ClientSocket()
{
if (fileDescriptor >= 0)
{
close(fileDescriptor);
}
}
ClientSocket::ClientSocket(ClientSocket && other)
{
fileDescriptor = other.fileDescriptor;
other.fileDescriptor = -1;
}

View File

@@ -1,23 +0,0 @@
#pragma once
#include <cstdint>
#include <vector>
class ClientSocket
{
private:
int fileDescriptor;
public:
// Parameter limit is a multiple of 128
std::vector<char> ReadBytes(size_t limit = 512) const;
size_t WriteBytes(std::vector<char> const & bytes) const;
ClientSocket(int _fileDescriptor);
~ClientSocket();
ClientSocket(ClientSocket && other);
ClientSocket(ClientSocket & other) = delete;
ClientSocket & operator=(ClientSocket & other) = delete;
};

View File

@@ -1,60 +0,0 @@
#include "listeningsocket.hpp"
#include <stdexcept>
#include <unistd.h>
int const connectionLimit = 10;
ClientSocket ListeningSocket::AcceptNextConnection()
{
unsigned sockaddrSize = sizeof(sockaddr_in);
int connectionFileDescriptor = accept(
socketFileDescriptor,
reinterpret_cast<sockaddr *>(&socketAddress),
&sockaddrSize);
return ClientSocket(connectionFileDescriptor);
}
ListeningSocket::ListeningSocket(int const port)
: socketFileDescriptor(-1),
socketAddress()
{
socketFileDescriptor = socket(AF_INET, SOCK_STREAM, 0);
if (socketFileDescriptor < 0)
{
throw std::runtime_error("socket creation error");
}
int enable = 1;
if (setsockopt(socketFileDescriptor, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0)
{
throw std::runtime_error("setsockopt SO_REUSEADDR failed");
}
socketAddress.sin_family = AF_INET;
socketAddress.sin_addr.s_addr = INADDR_ANY;
socketAddress.sin_port = htons(port);
int const bindResult = bind(
socketFileDescriptor,
reinterpret_cast<sockaddr*>(&socketAddress),
sizeof(sockaddr_in));
if (bindResult < 0)
{
throw std::runtime_error("socket bind error code " + std::to_string(errno));
}
int const listenResult = listen(socketFileDescriptor, connectionLimit);
if (listenResult < 0)
{
throw std::runtime_error("socket listening error");
}
}
ListeningSocket::~ListeningSocket()
{
if (socketFileDescriptor >= 0)
{
close(socketFileDescriptor);
}
}

View File

@@ -1,20 +0,0 @@
#pragma once
#include "clientsocket.hpp"
#include <sys/socket.h>
#include <netinet/in.h>
class ListeningSocket
{
private:
int socketFileDescriptor;
sockaddr_in socketAddress;
public:
ClientSocket AcceptNextConnection();
ListeningSocket(int const port);
~ListeningSocket();
ListeningSocket(ListeningSocket & other) = delete;
ListeningSocket & operator=(ListeningSocket & other) = delete;
};