libevent是一个轻量级的基于事件驱动的高性能的开源网络库,并且支持多个平台,对多个平台的I/O复用技术进行了封装,当我们编译库的代码时,编译的脚本将会根据OS支持的处理事件机制,来编译相应的代码,从而在libevent接口上保持一致。
在当前的服务器上,面对的主要问题就是要能处理大量的连接。而通过libevent这个网络库,我们就可以调用它的API来很好的解决上面的问题。首先,可以来回顾一下,对这个问题的传统解决方法。
问题: 如何处理多个客户端连接
解决方案1:I/O复用技术
这几种方式都是同步I/O,即当读写事件就绪,他们自己需要负责进行读写,这个读写过程是阻塞的,而异步I/O则不需要自己负责读写,只需要通知负责读写的程序就可以了。
解决方案2:多线程技术或多进程技术
多线程技术和多进程技术也可以处理高并发的数据连接,因为在服务器中可以产生大量的进程和线程和处理我们需要监视的连接。但是,这两种方式也是有很大的局限性的,比如多进程模型就不适合大量的短连接,因为进程的产生和关闭需要消耗较大的系统性能,同样,还要进程进程间的通信,在CPU性能不足的情况下不太适合。而多线程技术则不太适合处理长连接,因为当我们建立一个进程时,linux中会消耗8G的栈空间,如果我们的每个连接都杵着不断开,那么大量连接长连接后,导致的结果就是内存的大量消耗。
解决方案3:常用的上述二者复合使用
上述的两种方法各具有优缺点,因此,我们可以将上述的方法结合起来,这也是目前使用较多的处理高并发的方法。多进程+I/O复用或者多线程+I/O复用。而在具体的实现上,又可以分为很多的方式。比如多线程+I/O复用技术,我们使用使用一个主线程负责监听一个端口和接受的描述符是否有读写事件产生,如果有,则将事件分发给其他的工作进程去完成,这也是进程池的理念。
在说完上述的高并发的处理方法之后,我们可以来介绍一个libevent的主要特色了。
同样,lievent也是采用的上述系统提供的select,poll和epoll方法来进行I/O复用,但是针对于多个系统平台上的不同的I/O复用实现方式,libevent进行了重新的封装,并提供了统一的API接口。libevent在实现上使用了事件驱动这种机制,其本质上是一种Reactor模式。
Reactor模式,是一种事件驱动机制。应用程序需要提供相应的接口并注册到Reactor上,如果相应的事件发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。
在Libevent中也是一样,向Libevent框架注册相应的事件和回调函数;当这些事件发生时,Libevent会调用这些回调函数处理相应的事件。
lbevent的事件支持三种,分别是网络IO、定时器和信号。定时器的数据结构使用最小堆(Min Heap),以提高效率。网络IO和信号的数据结构采用了双向链表(TAILQ)。
更多linux内核视频教程文本资料免费获取后台私信【内核】。
libevent的安装很简单,我是直接从github上clone下一个源码,然后进行编译安装的。
具体的命令是(假设你已经安装了git):
# git clone https://github.com/nmathewson/Libevent.git
# cd Libevent
# sh autogen.sh
# ./configure && make
# make install
# make verify //验证安装
现在的libevent版本已经到达libevent2了,其增加了多线程的支持,API函数也发生了一些微小的变化。
如果你想知道更多的API使用情况,请点击这里。
下面,就基于libevent2编写一个聊天室服务器。
设计思想:首先创建一个套接字,进而创建一个事件对此端口进行监听,将所请求的用户组成一个队列,并监听所有的用户事件,当某个用户说话了,产生了读事件,就将该用户的发言发送给队列中的其他用户。
程序分析
需要包含的libevent函数头:
#include
#include
#include
#include
创建一个client结构体,接受连接后存放数据:
struct client {
/* The clients socket. */
int fd;
/* The bufferedevent for this client. */
struct bufferevent *buf_ev;
struct bufferevent *buf_ev;
/*
* This holds the pointers to the next and previous entries in
* the tail queue.
*/
TAILQ_ENTRY(client) entries;
};
先来看下mian函数的处理:
int
main(int argc, char **argv)
{
int listen_fd;
struct sockaddr_in listen_addr;
struct event ev_accept;
int reuseaddr_on;
/* Initialize libevent. */
evbase = event_base_new();
/* Initialize the tailq. */
TAILQ_INIT(&client_tailq_head);
/* Create our listening socket. */
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0)
err(1, "listen failed");
memset(&listen_addr, 0, sizeof(listen_addr));
listen_addr.sin_family = AF_INET;
listen_addr.sin_addr.s_addr = INADDR_ANY;
listen_addr.sin_port = htons(SERVER_PORT);
if (bind(listen_fd, (struct sockaddr *)&listen_addr,
sizeof(listen_addr)) < 0)
err(1, "bind failed");
if (listen(listen_fd, 5) < 0)
err(1, "listen failed");
reuseaddr_on = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on,
sizeof(reuseaddr_on));
/* Set the socket to non-blocking, this is essential in event
* based programming with libevent. */
if (setnonblock(listen_fd) < 0)
err(1, "failed to set server socket to non-blocking");
/* We now have a listening socket, we create a read event to
* be notified when a client connects. */
event_assign(&ev_accept, evbase, listen_fd, EV_READ|EV_PERSIST,
on_accept, NULL);
event_add(&ev_accept, NULL);
/* Start the event loop. */
event_base_dispatch(evbase);
return 0;
}
首先,函数初始化了一个用户队列tailq,接着创建了一个socket套接字,并将套接字设定为非阻塞模式,接着对一个全局的evbase事件集合,注册了事件,事件源是listen_fd,回调函数是on_accept,事件发生的情况是EV_READ,而且标志EV_PESIST表明该事件一直存在,而后开启事件扫描循环event_base_dispatch(evbase)。
再看一下回调函数on_accpet实现:
void
on_accept(int fd, short ev, void *arg)
{
int client_fd;
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
struct client *client;
client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd < 0) {
warn("accept failed");
return;
}
/* Set the client socket to non-blocking mode. */
if (setnonblock(client_fd) < 0)
warn("failed to set client socket non-blocking");
/* We've accepted a new client, create a client object. */
client = calloc(1, sizeof(*client));
if (client == NULL)
err(1, "malloc failed");
client->fd = client_fd;
client->buf_ev = bufferevent_socket_new(evbase, client_fd, 0);
bufferevent_setcb(client->buf_ev, buffered_on_read, NULL,
buffered_on_error, client);
/* We have to enable it before our callbacks will be
* called. */
bufferevent_enable(client->buf_ev, EV_READ);
/* Add the new client to the tailq. */
TAILQ_INSERT_TAIL(&client_tailq_head, client, entries);
printf("Accepted connection from %s
",
inet_ntoa(client_addr.sin_addr));
}
这个回调函数的作用很显然,就是接受了一个客户端的请求,并申请好了一个client信息,将需要的内容填写好,在填写中需要注意的是,又向上述的事件集evbase中注册了一个bufferevent事件client->buf_ev,并注册了回调函数buffered_on_read,buffered_on_error,这三个函数分别是当接受后的连接发生了读或者错误事件后的执行函数。接着,将用户的client结构放入了用户的队列tailq中去。
用户的buffer可读后的执行函数:
void
buffered_on_read(struct bufferevent *bev, void *arg)
{
struct client *this_client = arg;
struct client *client;
uint8_t data[8192];
size_t n;
/* Read 8k at a time and send it to all connected clients. */
for (;;) {
n = bufferevent_read(bev, data, sizeof(data));
if (n <= 0) {
/* Done. */
break;
}
/* Send data to all connected clients except for the
* client that sent the data. */
TAILQ_FOREACH(client, &client_tailq_head, entries) {
if (client != this_client) {
bufferevent_write(client->buf_ev, data, n);
}
}
}
}
执行函数的作用很明显,将libevent管理中的buffer数据读取出,存入本地的data数组内,然后对队列中的client进行检索,如果不是发数据的client,则将数据写入该client的buffer中,发送给该用户。这里注意的是需要反复读取buffer中的数据,防止一个读取并没有读取干净,直到读取不到数据为止。
buffer出错处理函数和上述函数差不多,功能就是出错后,结束掉保存的client结构,详细就不说了。
编译的时候记得修改Makefile中Libevent文件夹的位置
设计思想:所谓回显服务器就是将客户端发过来的数据再发回去,这里主要也就是说明libevent的纯IO复用实现。实现方法和上面的差不多,甚至可以说更加简单。
程序和上面的聊天服务器差不多,只是在buffer可读的事件函数中,不是将用户的数据发送给其他用户,而是直接发送给用户本身。
设计思想:上面的方法单纯使用libevent的简单函数来实现服务,但是这里,我们假设我们需要处理的客户端很少,于是我们可以使用对于每个连接我们分配一个线程这样的方式来实现对用户的服务。这种方式简单有效,一对一服务,就算业务逻辑出现阻塞也不怕。
程序分析
首先定义了一些数据结构,worker数据结构定义的是一个工作者,它包含有一个工作线程,和结束标志,需要获取的工作队列,和建立链表需要的指针。job数据结构定义的是操作一个job的方法和对象,这回到程序中,实际上就是指的是事件发生后,封装好的client结构体和处理这个结构体的方法。workqueue数据结构指的是当前的工作队列中的工作者,以及工作队列中的待完成的工作,以及互斥锁和条件变量(因为多个工作进程需要访问这些资源)。
具体的流程就是,用一个主线程监听一个套接字,并将套接字接受到的连接accept,并创建一个client数据结构保存该连接的信息,在这个client结构中注册一个bufferevent事件,注册到client->evbase上(这时候这是向client中的evbase注册了一个事件还没有进行循环这个事件集)。
接着,当监听到某个client有bufferevent事件发生,主线程就把该client结构体和需要进行的工作方法包装成一个job结构,然后把这个job扔到workqueue上去,并通知各个工作者。而后,各个工作者开着的线程就被激活了,疯狂地去workqueue上去抢工作做,某个worker拿到工作后,就可以解包job,根据job的工作说明书(job_function)操作工作对象(client)了。这里,job的工作说明有是循环client中的client->evbase,于是这样线程就会一直去监视这个连接的状态,如果有数据就这会调用回调函数进行处理。同时,这个线程也就是阻塞在这里,这对这一个连接负责。
建立workqueue需要的结构体和函数有:
typedef struct worker {
pthread_t thread;
int terminate;
struct workqueue *workqueue;
struct worker *prev;
struct worker *next;
} worker_t;
typedef struct job {
void (*job_function)(struct job *job);
void *user_data;
struct job *prev;
struct job *next;
} job_t;
typedef struct workqueue {
struct worker *workers;
struct job *waiting_jobs;
pthread_mutex_t jobs_mutex;
pthread_cond_t jobs_cond;
} workqueue_t;
int workqueue_init(workqueue_t *workqueue, int numWorkers);
void workqueue_shutdown(workqueue_t *workqueue);
void workqueue_add_job(workqueue_t *workqueue, job_t *job);
主线程的on_accept函数为:
void on_accept(evutil_socket_t fd, short ev, void *arg) {
int client_fd;
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
workqueue_t *workqueue = (workqueue_t *)arg;
client_t *client;
job_t *job;
client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd < 0) {
warn("accept failed");
return;
}
/* Set the client socket to non-blocking mode. */
if (evutil_make_socket_nonblocking(client_fd) < 0)
{
warn("failed to set client socket to non-blocking");
close(client_fd);
return;
}
/* Create a client object. */
if ((client = malloc(sizeof(*client))) == NULL)
{
warn("failed to allocate memory for client state");
close(client_fd);
return;
}
memset(client, 0, sizeof(*client));
client->fd = client_fd;
/* Add any custom code anywhere from here to the end of this function
* to initialize your application-specific attributes in the client struct.
*/
if ((client->output_buffer = evbuffer_new()) == NULL)
{
warn("client output buffer allocation failed");
closeAndFreeClient(client);
return;
}
if ((client->evbase = event_base_new()) == NULL)
{
warn("client event_base creation failed");
closeAndFreeClient(client);
return;
}
client->buf_ev = bufferevent_socket_new(client->evbase, client_fd, BEV_OPT_CLOSE_ON_FREE);
if ((client->buf_ev) == NULL) {
warn("client bufferevent creation failed");
closeAndFreeClient(client);
return;
}
bufferevent_setcb(client->buf_ev, buffered_on_read, buffered_on_write,
buffered_on_error, client);
/* We have to enable it before our callbacks will be
* called. */
bufferevent_enable(client->buf_ev, EV_READ);
/* Create a job object and add it to the work queue. */
if ((job = malloc(sizeof(*job))) == NULL) {
warn("failed to allocate memory for job state");
closeAndFreeClient(client);
return;
}
job->job_function = server_job_function;
job->user_data = client;
workqueue_add_job(workqueue, job);
}
job中的工作指南为:
static void server_job_function(struct job *job) {
client_t *client = (client_t *)job->user_data;
//do my job
event_base_dispatch(client->evbase);
closeAndFreeClient(client);
free(job);
}
设计思想:假设我们的用户很多,高并发,长连接,那么我们还是来用I/O复用和线程池实现吧,用一个控制线程通过I/O复用负责监听和分发事件,用一组线程池来进行处理事件,这样就可以灵活地将控制逻辑和业务逻辑分开了,见下述讲解。
程序分析
具体的流程和上面的差不多,用一个主线程监听一个套接字,并将套接字接受到的连接accept,并创建一个client数据结构保存该连接的信息,在这个client结构中注册一个bufferevent事件,但是这里,将事件注册到accept_evbase中,仍然用主线程进行监听。
而面对监听后出现的事件,将client和操作client的方法打包成一个job,放到上述的workqueue中去,让工作进程来完成。这样的操作和上述的差别在于上述方法将bufferevent注册到client中的evbase中,用工作线程监听,而本方法用主线程监听,工作线程负责处理监听产生的事件。
这要的差别在于两个函数 on_accept函数:
void on_accept(evutil_socket_t fd, short ev, void *arg) {
int client_fd;
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
client_t *client;
client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd < 0) {
warn("accept failed");
return;
}
/* Set the client socket to non-blocking mode. */
if (evutil_make_socket_nonblocking(client_fd) < 0) {
warn("failed to set client socket to non-blocking");
close(client_fd);
return;
}
/* Create a client object. */
if ((client = malloc(sizeof(*client))) == NULL) {
warn("failed to allocate memory for client state");
close(client_fd);
return;
}
memset(client, 0, sizeof(*client));
client->fd = client_fd;
/* Add any custom code anywhere from here to the end of this function
* to initialize your application-specific attributes in the client struct.
*/
if ((client->output_buffer = evbuffer_new()) == NULL) {
warn("client output buffer allocation failed");
closeAndFreeClient(client);
return;
}
//需要注意的是,这里注册到evbase_accept
client->buf_ev = bufferevent_socket_new(evbase_accept, client_fd,BEV_OPT_CLOSE_ON_FREE);
if ((client->buf_ev) == NULL) {
warn("client bufferevent creation failed");
closeAndFreeClient(client);
return;
}
bufferevent_setcb(client->buf_ev, buffered_on_read, buffered_on_write,
buffered_on_error, client);
/* We have to enable it before our callbacks will be
* called. */
bufferevent_enable(client->buf_ev, EV_READ);
}
在buffered_on_read中,提交job。
void buffered_on_read(struct bufferevent *bev, void *arg)
{
client_t *client = (client_t *)arg;
job_t *job;
/* Create a job object and add it to the work queue. */
if ((job = malloc(sizeof(*job))) == NULL) {
warn("failed to allocate memory for job state");
closeAndFreeClient(client);
return;
}
job->job_function = server_job_function;
job->user_data = client;
workqueue_add_job(&workqueue, job);
}
在job工作指南server_job_function中就可以做你工作该做的事儿了,根据发来的信息进行数据库处理,http返回等等。
页面更新:2024-04-27
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号