SimpleEpollServer源码阅读

文章目录
  1. 1. AsyncServer
  2. 2. ClientDescriptor
  3. 3. ExampleClient
  4. 4. 参考资料

AsyncServer

首先看一下 Server 的 data member.

1
2
3
4
5
6
private:
int listen_fd_, epoll_fd_;
epoll_event *events_;
std::map<int, ClientDescriptor *> clients_;
uint32_t timeout_secs_;
time_t last_socket_check_;

AsynceServer 的构造函数如下, 首先初始化了 socket 连接, 绑定端口号,设置 listenfd, 又设置了 epoll 的 event 和 nonblocking。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
AsyncServer(const char *listen_addr, uint16_t listen_port, uint32_t timeout_secs) :
listen_fd_(-1),
epoll_fd_(-1),
timeout_secs_(timeout_secs),
last_socket_check_(0)
{
sockaddr_in sin = { 0 };

sin.sin_addr.s_addr = inet_addr(listen_addr);
sin.sin_family = AF_INET;
sin.sin_port = htons(listen_port);

listen_fd_ = socket(AF_INET, SOCK_STREAM, 0);
if(listen_fd_ <= 0)
throw std::runtime_error("socket() failed, error code: " + std::to_string(errno));

if(bind(listen_fd_, reinterpret_cast<sockaddr *>(&sin), sizeof(sin)))
throw std::runtime_error("bind() failed, error code: " + std::to_string(errno));

if(SetNonblocking(listen_fd_) == false)
throw std::runtime_error("SetNonBlocking() failed, error code: " + std::to_string(errno));

if(listen(listen_fd_, SOMAXCONN) == -1)
throw std::runtime_error("listen() failed, error code: " + std::to_string(errno));

epoll_fd_ = epoll_create1(0);
if(epoll_fd_ == -1)
throw std::runtime_error("epoll_create1() failed, error code: " + std::to_string(errno));

epoll_event e_event;
e_event.events = EPOLLIN;
e_event.data.fd = listen_fd_;

if(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, listen_fd_, &e_event) == -1)
throw std::runtime_error("epoll_ctl() failed, error code: " + std::to_string(errno));

events_ = new epoll_event[64];
}

构造函数中的 SetNonblocking 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
bool SetNonblocking(int fd)
{
int flags = fcntl(fd, F_GETFL, 0);
if(flags == -1)
return false;

flags |= O_NONBLOCK; // set nonblocking

if(fcntl(fd, F_SETFL, flags) == -1)
return false;

return true;
}

// 属于常规操作

析构函数比较简单,就是销毁资源, 包括 listen_fd, epoll_fd 和 events。

1
2
3
4
5
6
7
8
9
10
~AsyncServer()
{
if(listen_fd_ != -1)
close(listen_fd_);

if(epoll_fd_ != -1)
close(epoll_fd_);

delete[] events_;
}

核心逻辑在 EventLoop() 函数中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void EventLoop()
{
while(1)
{
int num_fds = epoll_wait(epoll_fd_, events_, 64, 1000);
// 最大事件数 64, 最多等待时间 1000 ms
if(num_fds != -1)
{
//iterate signaled fds
for(int i = 0; i < num_fds; i++)
{
//notifications on listening fd are incoming client connections
if(events_[i].data.fd == listen_fd_)
{
HandleAccept(); // 处理用户连接
} else {
HandleClient(events_[i]); //
}
}
}

//perform cleanup every second and remove timed-out sockets
// 处理空闲客户连接 采用心跳判断死活
if((last_socket_check_ + 1) < time(0) && clients_.size() > 0)
{
std::map<int, ClientDescriptor *>::iterator it = clients_.begin();
while(it != clients_.end())
{
ClientDescriptor *client = (*it).second;

if(!client->HeartBeat())
{
//if HeartBeat() returns false remove fd from map and close
it = clients_.erase(it);
client->ServerClose();
delete client;
} else {
it++;
}
}

last_socket_check_ = time(0); // 更新 check 时间
}
}
}

下面关注当有客户端尝试连接的时候调用的 HandleAccept() 函数。也就是说当 serverfd 上发生 EPOLLIN event 后,会调用 HandleAccept。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
bool HandleAccept()
{
sockaddr_in client_sin;
socklen_t sin_size = sizeof(client_sin);
ClientDescriptorType *client;

int client_fd = accept(listen_fd_, reinterpret_cast<sockaddr *>(&client_sin), &sin_size);
// 获取 client_fd
if(client_fd == -1)
{
printf("accept() failed, error code: %d\n", errno);
return false;
}
// 设为非阻塞
if(!SetNonblocking(client_fd))
{
printf("failed to put fd into non-blocking mode, error code: %d\n", errno);
return false;
}

//allocate and initialize a new descriptor for the client
client = new ClientDescriptorType(client_fd, client_sin.sin_addr,
ntohs(client_sin.sin_port),
timeout_secs_);

epoll_event ev;
ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET; //client events will be handled in edge-triggered mode 边缘触发模式
// EPOLLRDHUP 代表对端断开连接
ev.data.ptr = client; //we will pass client descriptor with every event

// 增加监听的时间
if(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, client_fd, &ev) == 1)
{
printf("epoll_ctl() failed, error code: %d\n", errno);
delete client;
return false;
}

//store new client descriptor into the map of clients
clients_[client_fd] = client;

printf("[+] new client: %s:%d\n", inet_ntoa(client_sin.sin_addr), ntohs(client_sin.sin_port));
return true;
}

当一个 client_fd 上发生 EPOLLIN 事件时,则需要调用 HandleClient() 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//called whenever and EPOLLIN event occurs on a client fd
bool HandleClient(epoll_event ev)
{
//retrieve client descriptor address from the data parameter
ClientDescriptor *client = reinterpret_cast<ClientDescriptor *>(ev.data.ptr);

// 分别对三个不同事件进行处理
// EPOLLIN EPOLLRDHUP EPOLLOUT
//we got some data from the client
if(ev.events & EPOLLIN)
{
if(!client->ReadReady())
{
RemoveClient(client);
client->ServerClose();
delete client;
return false;
}
}

//the client closed the connection (should be after EPOLLIN as client can send data then close)
if(ev.events & EPOLLRDHUP)
{
RemoveClient(client);
client->ClientClose();
delete client;
return false;
}

//fd is ready to be written
if(ev.events & EPOLLOUT)
{
if(!client->WriteReady())
{
RemoveClient(client);
client->ServerClose();
delete client;
return false;
}
}

return true;
}

void RemoveClient(ClientDescriptor *client)
{
std::map<int, ClientDescriptor *>::iterator it = clients_.find(client->uid());
clients_.erase(it);
}

ClientDescriptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/*
This is a base client descriptor and virtual methods should be implemented by a derived class.
Returning false from any of the methods will result in Cleanup() being called and the client
descriptor being deconstructed immediately.
*/
class ClientDescriptor
{
public:
ClientDescriptor(int fd, in_addr client_addr, uint16_t client_port, uint32_t timeout) :
fd_(fd),
client_addr_(client_addr),
client_port_(client_port),
timeout_(timeout)
{
}

virtual ~ClientDescriptor()
{

}

//called when a client fd becomes available for writing
virtual bool ReadReady() { throw std::runtime_error("ReadReady() not implemented"); }

//called when a client fd becomes available for reading
virtual bool WriteReady() { throw std::runtime_error("WriteReady() not implemented"); }

//called periodically to check if fd is still alive (used to implement timeout)
virtual bool HeartBeat() { throw std::runtime_error("HeartBeat() not implemented"); }

//called when the server is done with the client and the fd should be closed
virtual void ServerClose() { throw std::runtime_error("ServerClose() not implemented"); }

//called if the connection was forcibly closed by the client
virtual void ClientClose() { throw std::runtime_error("ClientClose() not implemented"); }

//client's unique id
int uid() { return fd_; }

protected:
int fd_;
in_addr client_addr_;
uint16_t client_port_;
uint32_t timeout_;
};

ExampleClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include <netinet/in.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include "ClientDescriptor.h"

class ExampleClient : public ClientDescriptor
{
public:
ExampleClient(int fd, in_addr client_addr, uint16_t client_port, uint32_t timeout) :
ClientDescriptor(fd, client_addr, client_port, timeout),
last_active_(time(0))
{
}

bool ReadReady();
bool WriteReady();
bool HeartBeat();
void ClientClose();
void ServerClose();

protected:
time_t last_active_;
};

bool ExampleClient::ReadReady()
{
char buffer[1024];
int bytes_read;
std::string data_buffer;

//we must drain the entire read buffer as we won't get another event until client sends more data
while(1)
{
bytes_read = recv(fd_, buffer, 1024, 0);
if(bytes_read <= 0)
break;

data_buffer.append(buffer, bytes_read);
}

//client triggered EPOLLIN but sent no data (usually due to remote socket being closed)
if(data_buffer.length() == 0)
return true;

printf("[i] client %s:%d said: %s\n", inet_ntoa(client_addr_), client_port_, data_buffer.c_str());

write(fd_, data_buffer.c_str(), data_buffer.size());

//update last active time to prevent timeout
last_active_ = time(0);

return true;
}

bool ExampleClient::WriteReady()
{
/*
during heavy network I/O fds can become unwritable and subsequent calls to write() / send() will fail,
in this case the data which failed to send should be stored in a buffer and the operation should be
retried when WriteReady() is called to signal the fd is writable again (this is up to you to implement).
*/
return true;
}

bool ExampleClient::HeartBeat()
{
//if no operations occurred during timeout period return false to signal server to close socket
if(static_cast<time_t>(last_active_ + timeout_) < time(0))
{
printf("[i] connection %s:%d has timed out\n", inet_ntoa(client_addr_), client_port_);
return false;
}

return true;
}

void ExampleClient::ClientClose()
{
close(fd_);

printf("[-] connection %s:%d closed by client\n", inet_ntoa(client_addr_), client_port_);
}

void ExampleClient::ServerClose()
{
close(fd_);

printf("[-] connection %s:%d closed by server\n", inet_ntoa(client_addr_), client_port_);
}

参考资料