C++基于Reactor的服务器百万并发实现与讲解
简介
该攻略将介绍基于Reactor模式实现高并发服务器的过程。Reactor模式是一种常见的多路复用I/O技术,用于实现高并发环境下的网络服务器。Reactor模式基于IO多路复用,通过事件驱动的方式,将网络I/O事件分发给对应的处理函数,从而实现高效的I/O操作。
本攻略将着重介绍基于C++实现Reactor模式的服务器。在本攻略中,我们将使用 epoll 实现基于 Reactor 模式的服务器程序,具体实现细节将在下文中展开。
实现步骤
- 创建服务器和客户端
在本示例中,我们将以 Linux 环境下的多线程服务器和客户端为基础,创建服务器和客户端,用于展示基于 Reactor 模式的高并发服务器实现。
- 编写基于 Reactor 模式的服务器类
我们将在服务端类中,创建一个函数用于获取文件描述符,并将其注册到 epoll 中。针对每一个已注册的文件描述符,我们将创建一个处理函数,用于处理读取请求,同时也可以增加相应的处理函数,以便更好地支持服务器端和客户端的通信。
```c++
class Reactor_Server
{
public:
Reactor_Server();
void run();
private:
int create_epoll();
void epoll_loop();
int add_fd_to_epoll(int fd, unsigned int events);
int remove_fd_from_epoll(int fd);
int modify_fd_of_epoll(int fd, unsigned int events);
void handle_accept(int listen_fd);
void handle_read(int client_fd);
int listen_fd_;
int epoll_fd_;
bool is_running_;
};
```
- 完成服务端初始化设置
在处理函数中,我们将完成服务端的初始化设置。首先,我们将创建一个监听描述符,用于侦听客户端的请求。为了提高处理效率,我们还需要将非阻塞模式打开。然后,我们将让该监听描述符被注册到 epoll 中,用于监听客户端的事件。
```c++
Reactor_Server::Reactor_Server()
{
listen_fd_ = socket(AF_INET, SOCK_STREAM, 0);
// 将listen_fd设置为非阻塞模式
fcntl(listen_fd_, F_SETFL, O_NONBLOCK);
// 绑定本地地址和端口号
...
// 监听描述符已经准备好了,加入epoll
add_fd_to_epoll(listen_fd_, EPOLLIN);
is_running_ = false;
std::cout << "Server created: " << inet_ntoa(server_addr.sin_addr) << ":" << ntohs(server_addr.sin_port) << std::endl;
}
```
- 实现事件循环
在 Reactor 模式中,事件驱动是实现高效I/O操作的核心。在该节中,我们将展示如何在服务端中实现事件循环以及如何处理不同类型的事件。
```c++
void Reactor_Server::epoll_loop()
{
epoll_event events[MAX_EVENTS];
while (is_running_)
{
int nfds = epoll_wait(epoll_fd_, events, MAX_EVENTS, -1);
if (nfds == -1 && errno != EINTR)
{
perror("Error in epoll_wait");
break;
}
for (int i = 0; i < nfds; i++)
{
int fd = events[i].data.fd;
if (fd == listen_fd_)
{
handle_accept(fd);
}
else if (events[i].events & EPOLLIN)
{
handle_read(fd);
}
}
}
}
```
上述代码中,我们使用 epoll_wait 函数等待事件的发生,然后依次处理这些事件。
- 完成客户端读取处理函数handle_read
在客户端的连接请求被接受后,该函数被调用,用于处理客户端的读事件。在这里,我们将从客户端读取数据,然后处理这些数据。当需要关闭连接时,需要调用 remove_fd_from_epoll 函数,从epoll中删除该描述符。
c++
void Reactor_Server::handle_read(int client_fd)
{
char buf[MAXLEN];
memset(buf, 0, sizeof buf);
ssize_t n = read(client_fd, buf, MAXLEN);
if (n <= 0)
{
remove_fd_from_epoll(client_fd);
close(client_fd);
return;
}
write(client_fd, buf, n);
}
- 完成客户端连接请求处理函数handle_accept
如果监听到客户端的连接请求,则应该处理该请求,创建一个新的描述符,并向epoll中添加该描述符,以便后续处理。在这里,我们使用了accept函数,然后将连接描述符注册到epoll。
```c++
void Reactor_Server::handle_accept(int listen_fd)
{
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_len);
if (client_fd == -1)
{
perror("Error in accept");
return;
}
fcntl(client_fd, F_SETFL, O_NONBLOCK); // 设置当前描述符为非阻塞方式
add_fd_to_epoll(client_fd, EPOLLIN|EPOLLOUT);
std::cout << "New client from " << inet_ntoa(client_addr.sin_addr) << ":" << ntohs(client_addr.sin_port) << std::endl;
}
```
- 完成与epoll相关的核心函数
函数add_fd_to_epoll、remove_fd_from_epoll、modify_fd_of_epoll分别用于添加、删除或修改已添加的文件描述符。下面是这些函数的实现:
```c++
int Reactor_Server::add_fd_to_epoll(int fd, unsigned int events)
{
epoll_event event;
event.data.fd = fd;
event.events = events;
return epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
}
int Reactor_Server::remove_fd_from_epoll(int fd)
{
return epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);
}
int Reactor_Server::modify_fd_of_epoll(int fd, unsigned int events)
{
epoll_event event;
event.data.fd = fd;
event.events = events;
return epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
}
```
- 实现服务端run函数
在服务端run函数中,我们将完成服务端的初始化和事件循环。在事件循环开始前,服务端将被标记为正在运行状态(is_running_ = true),随后将进入事件循环。在事件循环结束后,服务端的运行状态将设置为false,并退出。
c++
void Reactor_Server::run()
{
if (!is_running_)
{
is_running_ = true;
epoll_loop();
is_running_ = false;
}
}
示例说明
示例1:简单Echo服务器
该示例是一个简单的Echo服务器。当客户端向服务器发送消息时,服务器将读取该消息,并返回相同的消息。下面给出服务器的示例代码:
#include "reactor_server.h"
#include <iostream>
int main(int argc, char **argv)
{
Reactor_Server server;
server.run();
return 0;
}
示例2:多线程Echo服务器
该示例是一个基于多线程的Echo服务器。当客户端向服务器发送消息时,服务器将创建一个线程以处理该消息。
首先,我们将实现一个请求队列,用于存储来自客户端的请求。在下面例子中,我们使用一个线程池来管理线程,从队列中读取请求数,然后使用线程池中的线程来处理这些请求。
#include "threadpool.h"
#include "reactor_server.h"
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
typedef std::function<void()> Task;
class TaskQueue
{
public:
TaskQueue() : shutdown_(false) {}
void shut_down()
{
std::unique_lock<std::mutex> lock(mtx_);
shutdown_ = true;
cv_.notify_all();
}
void add_task(Task task)
{
std::unique_lock<std::mutex> lock(mtx_);
if (shutdown_)
{
throw std::runtime_error("TaskQueue is shutdown.");
}
tasks_.push(task);
cv_.notify_one();
}
bool get_task(Task& task)
{
std::unique_lock<std::mutex> lock(mtx_);
while (tasks_.empty() && !shutdown_)
{
cv_.wait(lock);
}
if (tasks_.empty())
{
return false;
}
task = tasks_.front();
tasks_.pop();
return true;
}
private:
std::queue<Task> tasks_;
bool shutdown_;
std::mutex mtx_;
std::condition_variable cv_;
};
TaskQueue task_queue;
ThreadPool thread_pool(4);
void handle_read(int client_fd)
{
char buf[MAXLEN];
memset(buf, 0, sizeof buf);
ssize_t n = read(client_fd, buf, MAXLEN);
if (n <= 0)
{
close(client_fd);
return;
}
auto task = [client_fd, buf]()
{
write(client_fd, buf, strlen(buf));
close(client_fd);
};
task_queue.add_task(task);
thread_pool.add_task([&task_queue]()
{
while (true)
{
Task task;
if (!task_queue.get_task(task))
{
break;
}
task();
}
});
}
int main(int argc, char **argv)
{
Reactor_Server server;
server.run();
return 0;
}
在该示例中,我们首先通过TaskQueue和ThreadPool实现了一个多线程请求处理系统。当客户端向服务器发送请求时,服务端会将该请求添加到TaskQueue中,然后从ThreadPool中获取一个线程来处理该请求。线程从TaskQueue中读取任务,然后处理它。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:C++基于reactor的服务器百万并发实现与讲解 - Python技术站