libevent 学习笔记
一、参考
libevent
Libevent深入浅出 - 《Libevent 深入浅出》 - 书栈网 · BookStack
libevent 之 event config的相关函数介绍_event_config_new_yldfree的博客-CSDN博客
Libevent之evbuffer详解_有时需要偏执狂的博客-CSDN博客
二、libevent概述
libevent 就是将网络、IO多路复用等进行封装。核心:监听事件。
event_base:表示事件集合。
event:表示事件(对应一个fd或者一个signal)。
bufferevent:带缓存的事件。
三、libevent 笔记
1、重要结构体
1、struct event:事件;
2、struct event_base:事件的集合;
3、struct event_config:event_base 配置结构体;
4、struct bufferevent:带缓存区事件;
5、struct evbuffer:缓存区抽象;
6、struct evconnlistener:网络封装;
2、事件种类
#define EV_TIMEOUT 0x01 // 超时事件
#define EV_READ 0x02 // fd 可读事件
#define EV_WRITE 0x04 // fd 可写事件
#define EV_SIGNAL 0x08 // POSIX信号事件
#define EV_PERSIST 0x10 // 持久事件(激活时不会被自动移除,也就是可以连续监听)
#define EV_ET 0x20 // 边缘触发
#define EV_FINALIZE 0x40
#define EV_CLOSED 0x80 // 连接关闭事件
/**@}*/
3、事件集接口
1、初始化
struct event_base *
event_init(void)
{struct event_base *base = event_base_new_with_config(NULL);if (base == NULL) {event_errx(1, "%s: Unable to construct event_base", __func__);return NULL;}current_base = base;return (base);
}
struct event_base *
event_base_new(void)
{struct event_base *base = NULL;struct event_config *cfg = event_config_new();if (cfg) {base = event_base_new_with_config(cfg);event_config_free(cfg);}return base;
}
2、释放资源
void
event_base_free(struct event_base *base)
{event_base_free_(base, 1);
}
3、设置配置结构体
// 避免IO机制
int
event_config_avoid_method(struct event_config *cfg, const char *method)
{struct event_config_entry *entry = mm_malloc(sizeof(*entry));if (entry == NULL)return (-1);if ((entry->avoid_method = mm_strdup(method)) == NULL) {mm_free(entry);return (-1);}TAILQ_INSERT_TAIL(&cfg->entries, entry, next);return (0);
}// 设置特性
int
event_config_require_features(struct event_config *cfg,int features)
{if (!cfg)return (-1);cfg->require_features = features;return (0);
}// 设置标志位
int
event_config_set_flag(struct event_config *cfg, int flag)
{if (!cfg)return -1;cfg->flags |= flag;return 0;
}
4、检查event_base后端
// libevent 支持的方法名字数组
const char **
event_get_supported_methods(void)
{static const char **methods = NULL;const struct eventop **method;const char **tmp;int i = 0, k;/* count all methods */for (method = &eventops[0]; *method != NULL; ++method) {++i;}/* allocate one more than we need for the NULL pointer */tmp = mm_calloc((i + 1), sizeof(char *));if (tmp == NULL)return (NULL);/* populate the array with the supported methods */for (k = 0, i = 0; eventops[k] != NULL; ++k) {tmp[i++] = eventops[k]->name;}tmp[i] = NULL;if (methods != NULL)mm_free((char**)methods);methods = tmp;return (methods);
}// 返回正在使用的方法
const char *
event_base_get_method(const struct event_base *base)
{EVUTIL_ASSERT(base);return (base->evsel->name);
}// 获取特征的比特掩码
int
event_base_get_features(const struct event_base *base)
{return base->evsel->features;
}
5、优先级配置
int
event_base_priority_init(struct event_base *base, int npriorities)
{int i, r;r = -1;EVBASE_ACQUIRE_LOCK(base, th_base_lock);if (N_ACTIVE_CALLBACKS(base) || npriorities < 1|| npriorities >= EVENT_MAX_PRIORITIES)goto err;if (npriorities == base->nactivequeues)goto ok;if (base->nactivequeues) {mm_free(base->activequeues);base->nactivequeues = 0;}/* Allocate our priority queues */base->activequeues = (struct evcallback_list *)mm_calloc(npriorities, sizeof(struct evcallback_list));if (base->activequeues == NULL) {event_warn("%s: calloc", __func__);goto err;}base->nactivequeues = npriorities;for (i = 0; i < base->nactivequeues; ++i) {TAILQ_INIT(&base->activequeues[i]);}ok:r = 0;
err:EVBASE_RELEASE_LOCK(base, th_base_lock);return (r);
}
6、event_base和fork
/* reinitialize the event base after a fork */
int
event_reinit(struct event_base *base)
{const struct eventop *evsel;int res = 0;int was_notifiable = 0;int had_signal_added = 0;EVBASE_ACQUIRE_LOCK(base, th_base_lock);evsel = base->evsel;/* check if this event mechanism requires reinit on the backend */if (evsel->need_reinit) {/* We're going to call event_del() on our notify events (the* ones that tell about signals and wakeup events). But we* don't actually want to tell the backend to change its* state, since it might still share some resource (a kqueue,* an epoll fd) with the parent process, and we don't want to* delete the fds from _that_ backend, we temporarily stub out* the evsel with a replacement.*/base->evsel = &nil_eventop;}/* We need to re-create a new signal-notification fd and a new* thread-notification fd. Otherwise, we'll still share those with* the parent process, which would make any notification sent to them* get received by one or both of the event loops, more or less at* random.*/if (base->sig.ev_signal_added) {event_del_nolock_(&base->sig.ev_signal, EVENT_DEL_AUTOBLOCK);event_debug_unassign(&base->sig.ev_signal);memset(&base->sig.ev_signal, 0, sizeof(base->sig.ev_signal));had_signal_added = 1;base->sig.ev_signal_added = 0;}if (base->sig.ev_signal_pair[0] != -1)EVUTIL_CLOSESOCKET(base->sig.ev_signal_pair[0]);if (base->sig.ev_signal_pair[1] != -1)EVUTIL_CLOSESOCKET(base->sig.ev_signal_pair[1]);if (base->th_notify_fn != NULL) {was_notifiable = 1;base->th_notify_fn = NULL;}if (base->th_notify_fd[0] != -1) {event_del_nolock_(&base->th_notify, EVENT_DEL_AUTOBLOCK);EVUTIL_CLOSESOCKET(base->th_notify_fd[0]);if (base->th_notify_fd[1] != -1)EVUTIL_CLOSESOCKET(base->th_notify_fd[1]);base->th_notify_fd[0] = -1;base->th_notify_fd[1] = -1;event_debug_unassign(&base->th_notify);}/* Replace the original evsel. */base->evsel = evsel;if (evsel->need_reinit) {/* Reconstruct the backend through brute-force, so that we do* not share any structures with the parent process. For some* backends, this is necessary: epoll and kqueue, for* instance, have events associated with a kernel* structure. If didn't reinitialize, we'd share that* structure with the parent process, and any changes made by* the parent would affect our backend's behavior (and vice* versa).*/if (base->evsel->dealloc != NULL)base->evsel->dealloc(base);base->evbase = evsel->init(base);if (base->evbase == NULL) {event_errx(1,"%s: could not reinitialize event mechanism",__func__);res = -1;goto done;}/* Empty out the changelist (if any): we are starting from a* blank slate. */event_changelist_freemem_(&base->changelist);/* Tell the event maps to re-inform the backend about all* pending events. This will make the signal notification* event get re-created if necessary. */if (evmap_reinit_(base) < 0)res = -1;} else {res = evsig_init_(base);if (res == 0 && had_signal_added) {res = event_add_nolock_(&base->sig.ev_signal, NULL, 0);if (res == 0)base->sig.ev_signal_added = 1;}}/* If we were notifiable before, and nothing just exploded, become* notifiable again. */if (was_notifiable && res == 0)res = evthread_make_base_notifiable_nolock_(base);done:EVBASE_RELEASE_LOCK(base, th_base_lock);return (res);
}
4、事件接口
1、初始化
// 分配和构造一个用于 base 的新的事件
struct event *
event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
{struct event *ev;ev = mm_malloc(sizeof(struct event));if (ev == NULL)return (NULL);if (event_assign(ev, base, fd, events, cb, arg) < 0) {mm_free(ev);return (NULL);}return (ev);
}// 信号事件
#define evsignal_new(b, x, cb, arg) event_new((b), (x), EV_SIGNAL|EV_PERSIST, (cb), (arg))
2、释放事件
void
event_free(struct event *ev)
{/* This is disabled, so that events which have been finalized be a* valid target for event_free(). That's */// event_debug_assert_is_setup_(ev);/* make sure that this event won't be coming back to haunt us. */event_del(ev);event_debug_note_teardown_(ev);mm_free(ev);}
3、将事件添加到挂起事件集
int
event_add(struct event *ev, const struct timeval *tv)
{int res;if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {event_warnx("%s: event has no event_base set.", __func__);return -1;}EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);res = event_add_nolock_(ev, tv, 0);EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);return (res);
}
4、从监控事件集中删除事件
int
event_del(struct event *ev)
{return event_del_(ev, EVENT_DEL_AUTOBLOCK);
}
5、事件优先级
int
event_priority_set(struct event *ev, int pri)
{event_debug_assert_is_setup_(ev);if (ev->ev_flags & EVLIST_ACTIVE)return (-1);if (pri < 0 || pri >= ev->ev_base->nactivequeues)return (-1);ev->ev_pri = pri;return (0);
}
6、事件状态检查
// 确定给定的事件是否是未决的或者激活的
int
event_pending(const struct event *ev, short event, struct timeval *tv)
{int flags = 0;if (EVUTIL_FAILURE_CHECK(ev->ev_base == NULL)) {event_warnx("%s: event has no event_base set.", __func__);return 0;}EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);event_debug_assert_is_setup_(ev);if (ev->ev_flags & EVLIST_INSERTED)flags |= (ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED|EV_SIGNAL));if (ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))flags |= ev->ev_res;if (ev->ev_flags & EVLIST_TIMEOUT)flags |= EV_TIMEOUT;event &= (EV_TIMEOUT|EV_READ|EV_WRITE|EV_CLOSED|EV_SIGNAL);/* See if there is a timeout that we should report */if (tv != NULL && (flags & event & EV_TIMEOUT)) {struct timeval tmp = ev->ev_timeout;tmp.tv_usec &= MICROSECONDS_MASK;/* correctly remamp to real time */evutil_timeradd(&ev->ev_base->tv_clock_diff, &tmp, tv);}EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);return (flags & event);
}// 返回信号值
#define event_get_signal(ev) ((int)event_get_fd(ev))// 返回文件描述符
evutil_socket_t
event_get_fd(const struct event *ev)
{event_debug_assert_is_setup_(ev);return ev->ev_fd;
}// 返回事件集
struct event_base *
event_get_base(const struct event *ev)
{event_debug_assert_is_setup_(ev);return ev->ev_base;
}// 返回事件标志
short
event_get_events(const struct event *ev)
{event_debug_assert_is_setup_(ev);return ev->ev_events;
}// 获取回调函数
event_callback_fn
event_get_callback(const struct event *ev)
{event_debug_assert_is_setup_(ev);return ev->ev_callback;
}// 获取回调函数参数
void *
event_get_callback_arg(const struct event *ev)
{event_debug_assert_is_setup_(ev);return ev->ev_arg;
}// 返回优先级
int
event_get_priority(const struct event *ev)
{event_debug_assert_is_setup_(ev);return ev->ev_pri;
}
7、一次触发事件
int
event_base_once(struct event_base *base, evutil_socket_t fd, short events,void (*callback)(evutil_socket_t, short, void *),void *arg, const struct timeval *tv)
{struct event_once *eonce;int res = 0;int activate = 0;/* We cannot support signals that just fire once, or persistent* events. */if (events & (EV_SIGNAL|EV_PERSIST))return (-1);if ((eonce = mm_calloc(1, sizeof(struct event_once))) == NULL)return (-1);eonce->cb = callback;eonce->arg = arg;if ((events & (EV_TIMEOUT|EV_SIGNAL|EV_READ|EV_WRITE|EV_CLOSED)) == EV_TIMEOUT) {evtimer_assign(&eonce->ev, base, event_once_cb, eonce);if (tv == NULL || ! evutil_timerisset(tv)) {/* If the event is going to become active immediately,* don't put it on the timeout queue. This is one* idiom for scheduling a callback, so let's make* it fast (and order-preserving). */activate = 1;}} else if (events & (EV_READ|EV_WRITE|EV_CLOSED)) {events &= EV_READ|EV_WRITE|EV_CLOSED;event_assign(&eonce->ev, base, fd, events, event_once_cb, eonce);} else {/* Bad event combination */mm_free(eonce);return (-1);}if (res == 0) {EVBASE_ACQUIRE_LOCK(base, th_base_lock);if (activate)event_active_nolock_(&eonce->ev, EV_TIMEOUT, 1);elseres = event_add_nolock_(&eonce->ev, tv, 0);if (res != 0) {mm_free(eonce);return (res);} else {LIST_INSERT_HEAD(&base->once_events, eonce, next_once);}EVBASE_RELEASE_LOCK(base, th_base_lock);}return (0);
}
8、手动激活事件
void
event_active(struct event *ev, int res, short ncalls)
{if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {event_warnx("%s: event has no event_base set.", __func__);return;}EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);event_debug_assert_is_setup_(ev);event_active_nolock_(ev, res, ncalls);EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
}
8、事件状态之间的转换
5、bufferevent接口
1、创建套接字
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,int options)
{struct bufferevent_private *bufev_p;struct bufferevent *bufev;#ifdef _WIN32if (base && event_base_get_iocp_(base))return bufferevent_async_new_(base, fd, options);
#endifif ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)return NULL;if (bufferevent_init_common_(bufev_p, base, &bufferevent_ops_socket,options) < 0) {mm_free(bufev_p);return NULL;}bufev = &bufev_p->bev;evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);event_assign(&bufev->ev_read, bufev->ev_base, fd,EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev);event_assign(&bufev->ev_write, bufev->ev_base, fd,EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev);evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);evbuffer_freeze(bufev->input, 0);evbuffer_freeze(bufev->output, 1);return bufev;
}
2、启动连接
int
bufferevent_socket_connect(struct bufferevent *bev,const struct sockaddr *sa, int socklen)
{struct bufferevent_private *bufev_p =EVUTIL_UPCAST(bev, struct bufferevent_private, bev);evutil_socket_t fd;int r = 0;int result=-1;int ownfd = 0;bufferevent_incref_and_lock_(bev);if (!bufev_p)goto done;fd = bufferevent_getfd(bev);if (fd < 0) {if (!sa)goto done;fd = evutil_socket_(sa->sa_family,SOCK_STREAM|EVUTIL_SOCK_NONBLOCK, 0);if (fd < 0)goto done;ownfd = 1;}if (sa) {
#ifdef _WIN32if (bufferevent_async_can_connect_(bev)) {bufferevent_setfd(bev, fd);r = bufferevent_async_connect_(bev, fd, sa, socklen);if (r < 0)goto freesock;bufev_p->connecting = 1;result = 0;goto done;} else
#endifr = evutil_socket_connect_(&fd, sa, socklen);if (r < 0)goto freesock;}
#ifdef _WIN32/* ConnectEx() isn't always around, even when IOCP is enabled.* Here, we borrow the socket object's write handler to fall back* on a non-blocking connect() when ConnectEx() is unavailable. */if (BEV_IS_ASYNC(bev)) {event_assign(&bev->ev_write, bev->ev_base, fd,EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bev);}
#endifbufferevent_setfd(bev, fd);if (r == 0) {if (! be_socket_enable(bev, EV_WRITE)) {bufev_p->connecting = 1;result = 0;goto done;}} else if (r == 1) {/* The connect succeeded already. How very BSD of it. */result = 0;bufev_p->connecting = 1;bufferevent_trigger_nolock_(bev, EV_WRITE, BEV_OPT_DEFER_CALLBACKS);} else {/* The connect failed already. How very BSD of it. */result = 0;bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, BEV_OPT_DEFER_CALLBACKS);bufferevent_disable(bev, EV_WRITE|EV_READ);}goto done;freesock:bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);if (ownfd)evutil_closesocket(fd);/* do something about the error? */
done:bufferevent_decref_and_unlock_(bev);return result;
}
3、释放资源
void
bufferevent_free(struct bufferevent *bufev)
{BEV_LOCK(bufev);bufferevent_setcb(bufev, NULL, NULL, NULL, NULL);bufferevent_cancel_all_(bufev);bufferevent_decref_and_unlock_(bufev);
}
4、回调函数操作
// 回调设置
void
bufferevent_setcb(struct bufferevent *bufev,bufferevent_data_cb readcb, bufferevent_data_cb writecb,bufferevent_event_cb eventcb, void *cbarg)
{BEV_LOCK(bufev);bufev->readcb = readcb;bufev->writecb = writecb;bufev->errorcb = eventcb;bufev->cbarg = cbarg;BEV_UNLOCK(bufev);
}// 获取回调函数
void
bufferevent_getcb(struct bufferevent *bufev,bufferevent_data_cb *readcb_ptr,bufferevent_data_cb *writecb_ptr,bufferevent_event_cb *eventcb_ptr,void **cbarg_ptr)
{BEV_LOCK(bufev);if (readcb_ptr)*readcb_ptr = bufev->readcb;if (writecb_ptr)*writecb_ptr = bufev->writecb;if (eventcb_ptr)*eventcb_ptr = bufev->errorcb;if (cbarg_ptr)*cbarg_ptr = bufev->cbarg;BEV_UNLOCK(bufev);
}
5、启用
int
bufferevent_enable(struct bufferevent *bufev, short event)
{struct bufferevent_private *bufev_private =EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);short impl_events = event;int r = 0;bufferevent_incref_and_lock_(bufev);if (bufev_private->read_suspended)impl_events &= ~EV_READ;if (bufev_private->write_suspended)impl_events &= ~EV_WRITE;bufev->enabled |= event;if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)r = -1;bufferevent_decref_and_unlock_(bufev);return r;
}
6、禁用
int
bufferevent_disable(struct bufferevent *bufev, short event)
{int r = 0;BEV_LOCK(bufev);bufev->enabled &= ~event;if (bufev->be_ops->disable(bufev, event) < 0)r = -1;BEV_UNLOCK(bufev);return r;
}
7、设置水位线
void
bufferevent_setwatermark(struct bufferevent *bufev, short events,size_t lowmark, size_t highmark)
{struct bufferevent_private *bufev_private =EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);BEV_LOCK(bufev);if (events & EV_WRITE) {bufev->wm_write.low = lowmark;bufev->wm_write.high = highmark;}if (events & EV_READ) {bufev->wm_read.low = lowmark;bufev->wm_read.high = highmark;if (highmark) {/* There is now a new high-water mark for read.enable the callback if needed, and see if we shouldsuspend/bufferevent_wm_unsuspend. */if (bufev_private->read_watermarks_cb == NULL) {bufev_private->read_watermarks_cb =evbuffer_add_cb(bufev->input,bufferevent_inbuf_wm_cb,bufev);}evbuffer_cb_set_flags(bufev->input,bufev_private->read_watermarks_cb,EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER);if (evbuffer_get_length(bufev->input) >= highmark)bufferevent_wm_suspend_read(bufev);else if (evbuffer_get_length(bufev->input) < highmark)bufferevent_wm_unsuspend_read(bufev);} else {/* There is now no high-water mark for read. */if (bufev_private->read_watermarks_cb)evbuffer_cb_clear_flags(bufev->input,bufev_private->read_watermarks_cb,EVBUFFER_CB_ENABLED);bufferevent_wm_unsuspend_read(bufev);}}BEV_UNLOCK(bufev);
}
7、获取输入缓存区
struct evbuffer *
bufferevent_get_input(struct bufferevent *bufev)
{return bufev->input;
}
8、获取输出缓存区
struct evbuffer *
bufferevent_get_output(struct bufferevent *bufev)
{return bufev->output;
}
9、向写缓存区写入数据
int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
{if (evbuffer_add(bufev->output, data, size) == -1)return (-1);return 0;
}int
bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
{if (evbuffer_add_buffer(bufev->output, buf) == -1)return (-1);return 0;
}
10、从读缓存区获取数据
size_t
bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
{return (evbuffer_remove(bufev->input, data, size));
}int
bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf)
{return (evbuffer_add_buffer(buf, bufev->input));
}
11、清空缓存区
int
bufferevent_flush(struct bufferevent *bufev,short iotype,enum bufferevent_flush_mode mode)
{int r = -1;BEV_LOCK(bufev);if (bufev->be_ops->flush)r = bufev->be_ops->flush(bufev, iotype, mode);BEV_UNLOCK(bufev);return r;
}
6、evBuffer接口
1、创建
struct evbuffer *
evbuffer_new(void)
{struct evbuffer *buffer;buffer = mm_calloc(1, sizeof(struct evbuffer));if (buffer == NULL)return (NULL);LIST_INIT(&buffer->callbacks);buffer->refcnt = 1;buffer->last_with_datap = &buffer->first;return (buffer);
}
2、释放
void
evbuffer_free(struct evbuffer *buffer)
{EVBUFFER_LOCK(buffer);evbuffer_decref_and_unlock_(buffer);
}
3、锁操作
int
evbuffer_enable_locking(struct evbuffer *buf, void *lock)
{
#ifdef EVENT__DISABLE_THREAD_SUPPORTreturn -1;
#elseif (buf->lock)return -1;if (!lock) {EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE);if (!lock)return -1;buf->lock = lock;buf->own_lock = 1;} else {buf->lock = lock;buf->own_lock = 0;}return 0;
#endif
}void
evbuffer_lock(struct evbuffer *buf)
{EVBUFFER_LOCK(buf);
}void
evbuffer_unlock(struct evbuffer *buf)
{EVBUFFER_UNLOCK(buf);
}
4、获取缓存区数据长度
size_t
evbuffer_get_length(const struct evbuffer *buffer)
{size_t result;EVBUFFER_LOCK(buffer);result = (buffer->total_len);EVBUFFER_UNLOCK(buffer);return result;
}
5、向缓存区添加数据
int
evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)
{struct evbuffer_chain *chain, *tmp;const unsigned char *data = data_in;size_t remain, to_alloc;int result = -1;EVBUFFER_LOCK(buf);if (buf->freeze_end) {goto done;}/* Prevent buf->total_len overflow */if (datlen > EV_SIZE_MAX - buf->total_len) {goto done;}if (*buf->last_with_datap == NULL) {chain = buf->last;} else {chain = *buf->last_with_datap;}/* If there are no chains allocated for this buffer, allocate one* big enough to hold all the data. */if (chain == NULL) {chain = evbuffer_chain_new(datlen);if (!chain)goto done;evbuffer_chain_insert(buf, chain);}if ((chain->flags & EVBUFFER_IMMUTABLE) == 0) {/* Always true for mutable buffers */EVUTIL_ASSERT(chain->misalign >= 0 &&(ev_uint64_t)chain->misalign <= EVBUFFER_CHAIN_MAX);remain = chain->buffer_len - (size_t)chain->misalign - chain->off;if (remain >= datlen) {/* there's enough space to hold all the data in the* current last chain */memcpy(chain->buffer + chain->misalign + chain->off,data, datlen);chain->off += datlen;buf->total_len += datlen;buf->n_add_for_cb += datlen;goto out;} else if (!CHAIN_PINNED(chain) &&evbuffer_chain_should_realign(chain, datlen)) {/* we can fit the data into the misalignment */evbuffer_chain_align(chain);memcpy(chain->buffer + chain->off, data, datlen);chain->off += datlen;buf->total_len += datlen;buf->n_add_for_cb += datlen;goto out;}} else {/* we cannot write any data to the last chain */remain = 0;}/* we need to add another chain */to_alloc = chain->buffer_len;if (to_alloc <= EVBUFFER_CHAIN_MAX_AUTO_SIZE/2)to_alloc <<= 1;if (datlen > to_alloc)to_alloc = datlen;tmp = evbuffer_chain_new(to_alloc);if (tmp == NULL)goto done;if (remain) {memcpy(chain->buffer + chain->misalign + chain->off,data, remain);chain->off += remain;buf->total_len += remain;buf->n_add_for_cb += remain;}data += remain;datlen -= remain;memcpy(tmp->buffer, data, datlen);tmp->off = datlen;evbuffer_chain_insert(buf, tmp);buf->n_add_for_cb += datlen;out:evbuffer_invoke_callbacks_(buf);result = 0;
done:EVBUFFER_UNLOCK(buf);return result;
}int
evbuffer_add_printf(struct evbuffer *buf, const char *fmt, ...)
{int res = -1;va_list ap;va_start(ap, fmt);res = evbuffer_add_vprintf(buf, fmt, ap);va_end(ap);return (res);
}int
evbuffer_add_vprintf(struct evbuffer *buf, const char *fmt, va_list ap)
{char *buffer;size_t space;int sz, result = -1;va_list aq;struct evbuffer_chain *chain;EVBUFFER_LOCK(buf);if (buf->freeze_end) {goto done;}/* make sure that at least some space is available */if ((chain = evbuffer_expand_singlechain(buf, 64)) == NULL)goto done;for (;;) {
#if 0size_t used = chain->misalign + chain->off;buffer = (char *)chain->buffer + chain->misalign + chain->off;EVUTIL_ASSERT(chain->buffer_len >= used);space = chain->buffer_len - used;
#endifbuffer = (char*) CHAIN_SPACE_PTR(chain);space = (size_t) CHAIN_SPACE_LEN(chain);#ifndef va_copy
#define va_copy(dst, src) memcpy(&(dst), &(src), sizeof(va_list))
#endifva_copy(aq, ap);sz = evutil_vsnprintf(buffer, space, fmt, aq);va_end(aq);if (sz < 0)goto done;if (INT_MAX >= EVBUFFER_CHAIN_MAX &&(size_t)sz >= EVBUFFER_CHAIN_MAX)goto done;if ((size_t)sz < space) {chain->off += sz;buf->total_len += sz;buf->n_add_for_cb += sz;advance_last_with_data(buf);evbuffer_invoke_callbacks_(buf);result = sz;goto done;}if ((chain = evbuffer_expand_singlechain(buf, sz + 1)) == NULL)goto done;}/* NOTREACHED */done:EVBUFFER_UNLOCK(buf);return result;
}int
evbuffer_expand(struct evbuffer *buf, size_t datlen)
{struct evbuffer_chain *chain;EVBUFFER_LOCK(buf);chain = evbuffer_expand_singlechain(buf, datlen);EVBUFFER_UNLOCK(buf);return chain ? 0 : -1;
}
6、移动数据
int
evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
{struct evbuffer_chain *pinned, *last;size_t in_total_len, out_total_len;int result = 0;EVBUFFER_LOCK2(inbuf, outbuf);in_total_len = inbuf->total_len;out_total_len = outbuf->total_len;if (in_total_len == 0 || outbuf == inbuf)goto done;if (outbuf->freeze_end || inbuf->freeze_start) {result = -1;goto done;}if (PRESERVE_PINNED(inbuf, &pinned, &last) < 0) {result = -1;goto done;}if (out_total_len == 0) {/* There might be an empty chain at the start of outbuf; free* it. */evbuffer_free_all_chains(outbuf->first);COPY_CHAIN(outbuf, inbuf);} else {APPEND_CHAIN(outbuf, inbuf);}RESTORE_PINNED(inbuf, pinned, last);inbuf->n_del_for_cb += in_total_len;outbuf->n_add_for_cb += in_total_len;evbuffer_invoke_callbacks_(inbuf);evbuffer_invoke_callbacks_(outbuf);done:EVBUFFER_UNLOCK2(inbuf, outbuf);return result;
}int
evbuffer_remove_buffer(struct evbuffer *src, struct evbuffer *dst,size_t datlen)
{/*XXX We should have an option to force this to be zero-copy.*//*XXX can fail badly on sendfile case. */struct evbuffer_chain *chain, *previous;size_t nread = 0;int result;EVBUFFER_LOCK2(src, dst);chain = previous = src->first;if (datlen == 0 || dst == src) {result = 0;goto done;}if (dst->freeze_end || src->freeze_start) {result = -1;goto done;}/* short-cut if there is no more data buffered */if (datlen >= src->total_len) {datlen = src->total_len;evbuffer_add_buffer(dst, src);result = (int)datlen; /*XXXX should return ev_ssize_t*/goto done;}/* removes chains if possible */while (chain->off <= datlen) {/* We can't remove the last with data from src unless we* remove all chains, in which case we would have done the if* block above */EVUTIL_ASSERT(chain != *src->last_with_datap);nread += chain->off;datlen -= chain->off;previous = chain;if (src->last_with_datap == &chain->next)src->last_with_datap = &src->first;chain = chain->next;}if (nread) {/* we can remove the chain */struct evbuffer_chain **chp;chp = evbuffer_free_trailing_empty_chains(dst);if (dst->first == NULL) {dst->first = src->first;} else {*chp = src->first;}dst->last = previous;previous->next = NULL;src->first = chain;advance_last_with_data(dst);dst->total_len += nread;dst->n_add_for_cb += nread;}/* we know that there is more data in the src buffer than* we want to read, so we manually drain the chain */evbuffer_add(dst, chain->buffer + chain->misalign, datlen);chain->misalign += datlen;chain->off -= datlen;nread += datlen;/* You might think we would want to increment dst->n_add_for_cb* here too. But evbuffer_add above already took care of that.*/src->total_len -= nread;src->n_del_for_cb += nread;if (nread) {evbuffer_invoke_callbacks_(dst);evbuffer_invoke_callbacks_(src);}result = (int)nread;/*XXXX should change return type */done:EVBUFFER_UNLOCK2(src, dst);return result;
}
7、evconnlistener接口
1、初始化
struct evconnlistener *
evconnlistener_new(struct event_base *base,evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,evutil_socket_t fd)
{struct evconnlistener_event *lev;#ifdef _WIN32if (base && event_base_get_iocp_(base)) {const struct win32_extension_fns *ext =event_get_win32_extension_fns_();if (ext->AcceptEx && ext->GetAcceptExSockaddrs)return evconnlistener_new_async(base, cb, ptr, flags,backlog, fd);}
#endifif (backlog > 0) {if (listen(fd, backlog) < 0)return NULL;} else if (backlog < 0) {if (listen(fd, 128) < 0)return NULL;}lev = mm_calloc(1, sizeof(struct evconnlistener_event));if (!lev)return NULL;lev->base.ops = &evconnlistener_event_ops;lev->base.cb = cb;lev->base.user_data = ptr;lev->base.flags = flags;lev->base.refcnt = 1;lev->base.accept4_flags = 0;if (!(flags & LEV_OPT_LEAVE_SOCKETS_BLOCKING))lev->base.accept4_flags |= EVUTIL_SOCK_NONBLOCK;if (flags & LEV_OPT_CLOSE_ON_EXEC)lev->base.accept4_flags |= EVUTIL_SOCK_CLOEXEC;if (flags & LEV_OPT_THREADSAFE) {EVTHREAD_ALLOC_LOCK(lev->base.lock, EVTHREAD_LOCKTYPE_RECURSIVE);}event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST,listener_read_cb, lev);if (!(flags & LEV_OPT_DISABLED))evconnlistener_enable(&lev->base);return &lev->base;
}
2、释放
void
evconnlistener_free(struct evconnlistener *lev)
{LOCK(lev);lev->cb = NULL;lev->errorcb = NULL;if (lev->ops->shutdown)lev->ops->shutdown(lev);listener_decref_and_unlock(lev);
}
3、监听
struct evconnlistener *
evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb,void *ptr, unsigned flags, int backlog, const struct sockaddr *sa,int socklen)
{struct evconnlistener *listener;evutil_socket_t fd;int on = 1;int family = sa ? sa->sa_family : AF_UNSPEC;int socktype = SOCK_STREAM | EVUTIL_SOCK_NONBLOCK;if (backlog == 0)return NULL;if (flags & LEV_OPT_CLOSE_ON_EXEC)socktype |= EVUTIL_SOCK_CLOEXEC;fd = evutil_socket_(family, socktype, 0);if (fd == -1)return NULL;if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&on, sizeof(on))<0)goto err;if (flags & LEV_OPT_REUSEABLE) {if (evutil_make_listen_socket_reuseable(fd) < 0)goto err;}if (flags & LEV_OPT_REUSEABLE_PORT) {if (evutil_make_listen_socket_reuseable_port(fd) < 0)goto err;}if (flags & LEV_OPT_DEFERRED_ACCEPT) {if (evutil_make_tcp_listen_socket_deferred(fd) < 0)goto err;}if (sa) {if (bind(fd, sa, socklen)<0)goto err;}listener = evconnlistener_new(base, cb, ptr, flags, backlog, fd);if (!listener)goto err;return listener;
err:evutil_closesocket(fd);return NULL;
}
4、启用
int
evconnlistener_enable(struct evconnlistener *lev)
{int r;LOCK(lev);lev->enabled = 1;if (lev->cb)r = lev->ops->enable(lev);elser = 0;UNLOCK(lev);return r;
}
5、禁用
int
evconnlistener_disable(struct evconnlistener *lev)
{int r;LOCK(lev);lev->enabled = 0;r = lev->ops->disable(lev);UNLOCK(lev);return r;
}
6、调整回调函数
void
evconnlistener_set_cb(struct evconnlistener *lev,evconnlistener_cb cb, void *arg)
{int enable = 0;LOCK(lev);if (lev->enabled && !lev->cb)enable = 1;lev->cb = cb;lev->user_data = arg;if (enable)evconnlistener_enable(lev);UNLOCK(lev);
}
8、常用设置
1、日志消息回调
void
event_set_log_callback(event_log_cb cb)
{log_fn = cb;
}
2、致命错误回调
void
event_set_fatal_callback(event_fatal_cb cb)
{fatal_fn = cb;
}
3、内存管理回调
void
event_set_mem_functions(void *(*malloc_fn)(size_t sz),void *(*realloc_fn)(void *ptr, size_t sz),void (*free_fn)(void *ptr))
{mm_malloc_fn_ = malloc_fn;mm_realloc_fn_ = realloc_fn;mm_free_fn_ = free_fn;
}
4、调试事件
void
event_enable_debug_mode(void)
{
#ifndef EVENT__DISABLE_DEBUG_MODEif (event_debug_mode_on_)event_errx(1, "%s was called twice!", __func__);if (event_debug_mode_too_late)event_errx(1, "%s must be called *before* creating any events ""or event_bases",__func__);event_debug_mode_on_ = 1;HT_INIT(event_debug_map, &global_debug_map);
#endif
}void
event_disable_debug_mode(void)
{
#ifndef EVENT__DISABLE_DEBUG_MODEstruct event_debug_entry **ent, *victim;EVLOCK_LOCK(event_debug_map_lock_, 0);for (ent = HT_START(event_debug_map, &global_debug_map); ent; ) {victim = *ent;ent = HT_NEXT_RMV(event_debug_map, &global_debug_map, ent);mm_free(victim);}HT_CLEAR(event_debug_map, &global_debug_map);EVLOCK_UNLOCK(event_debug_map_lock_ , 0);event_debug_mode_on_ = 0;
#endif
}void
event_debug_unassign(struct event *ev)
{event_debug_assert_not_added_(ev);event_debug_note_teardown_(ev);ev->ev_flags &= ~EVLIST_INIT;
}
四、libevent使用思路
1、参考
libevent 源码中提供hello-world.c例程。
2、服务器源码
/*This example program provides a trivial server program that listens for TCPconnections on port 9995. When they arrive, it writes a short message toeach client connection, and closes each connection once it is flushed.Where possible, it exits cleanly in response to a SIGINT (ctrl-c).
*/#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#ifndef _WIN32
#include <netinet/in.h>
# ifdef _XOPEN_SOURCE_EXTENDED
# include <arpa/inet.h>
# endif
#include <sys/socket.h>
#endif#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>static const char MESSAGE[] = "Hello, World!\n";static const int PORT = 9995;static void listener_cb(struct evconnlistener *, evutil_socket_t,struct sockaddr *, int socklen, void *);
static void conn_writecb(struct bufferevent *, void *);
static void conn_eventcb(struct bufferevent *, short, void *);
static void signal_cb(evutil_socket_t, short, void *);int
main(int argc, char **argv)
{struct event_base *base;struct evconnlistener *listener;struct event *signal_event;struct sockaddr_in sin;
#ifdef _WIN32WSADATA wsa_data;WSAStartup(0x0201, &wsa_data);
#endif// 分配事件base = event_base_new();if (!base) {fprintf(stderr, "Could not initialize libevent!\n");return 1;}memset(&sin, 0, sizeof(sin));sin.sin_family = AF_INET;sin.sin_port = htons(PORT);// 监听客户端连接listener = evconnlistener_new_bind(base, listener_cb, (void *)base,LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,(struct sockaddr*)&sin,sizeof(sin));if (!listener) {fprintf(stderr, "Could not create a listener!\n");return 1;}// 设置信号事件(ctrl+c触发SIGINT事件)signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);if (!signal_event || event_add(signal_event, NULL)<0) {fprintf(stderr, "Could not create/add a signal event!\n");return 1;}// 循环监听事件event_base_dispatch(base);// 释放资源evconnlistener_free(listener);event_free(signal_event);event_base_free(base);printf("done\n");return 0;
}// 客户端监听回调函数
static void
listener_cb(struct evconnlistener *listener, evutil_socket_t fd,struct sockaddr *sa, int socklen, void *user_data)
{struct event_base *base = user_data;struct bufferevent *bev;// 分配一个带缓存的事件bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);if (!bev) {fprintf(stderr, "Error constructing bufferevent!");event_base_loopbreak(base);return;}// 设置回调函数写事件回调函数和错误事件回调函数bufferevent_setcb(bev, NULL, conn_writecb, conn_eventcb, NULL);// 开启写事件bufferevent_enable(bev, EV_WRITE);// 关闭读事件bufferevent_disable(bev, EV_READ);// 写数据bufferevent_write(bev, MESSAGE, strlen(MESSAGE));
}// 写事件回调函数
static void
conn_writecb(struct bufferevent *bev, void *user_data)
{// 获取输出缓存struct evbuffer *output = bufferevent_get_output(bev);// 获取缓存中长度if (evbuffer_get_length(output) == 0) {printf("flushed answer\n");// 释放事件bufferevent_free(bev);}
}// 错误事件回调函数
static void
conn_eventcb(struct bufferevent *bev, short events, void *user_data)
{if (events & BEV_EVENT_EOF) {// 客户端关闭printf("Connection closed.\n");} else if (events & BEV_EVENT_ERROR) {// 不可恢复异常printf("Got an error on the connection: %s\n",strerror(errno));/*XXX win32*/}/* None of the other events can happen here, since we haven't enabled* timeouts */// 释放事件bufferevent_free(bev);
}static void
signal_cb(evutil_socket_t sig, short events, void *user_data)
{struct event_base *base = user_data;struct timeval delay = { 2, 0 };printf("Caught an interrupt signal; exiting cleanly in two seconds.\n");// 退出事件循环event_base_loopexit(base, &delay);
}
3、makefile
lqServer: lqServer.cgcc lqServer.c -o lqServer -levent.PHONY : clean
clean:rm -rf lqServer