IM/MS/pool/thread/ev_pool.cpp

134 lines
2.9 KiB
C++

//
// Created by dongl on 23-4-14.
//
#include "ev_pool.h"
std::function<void()> ev_base(event_base* base) {
return [=](){
printf("ev_base ptr: %p, pthread: %ld\n",base, pthread_self());
event_base_loop(base, EVLOOP_NO_EXIT_ON_EMPTY);
};
}
ev_pool::ev_pool(int size) : m_pool_max_size(size), m_pool_curr_size(0), m_poll(false) {
}
void ev_pool::add_event_base() {
if (m_pool_curr_size >= m_pool_max_size && m_poll) {
perror("add_event_base failed");
return;
}
event_base* base = event_base_new();
m_ev_bases.push_back(base);
m_pool.run(ev_base(base));
m_pool_curr_size += 1;
}
void ev_pool::add_event_base(const std::function<void()>& function) {
m_pool.run(function);
}
void ev_pool::add_event_bases(int num) {
if (num > m_pool_max_size && m_poll) {
perror("add_event_bases failed");
fflush(stdout);
return;
}
for (int i = 0; i < num; ++i) {
add_event_base();
}
}
bool ev_pool::polling(bool poll) {
m_poll = poll;
return m_poll;
}
void read(evutil_socket_t, short, void *) {
printf("read\n");
fflush(stdout);
}
void ev_pool::add_buffer_event(evutil_socket_t fd, bufferevent_data_cb readcb, bufferevent_data_cb writecb,
bufferevent_event_cb eventcb, short events, sockaddr_in* addr) {
// 调度一个base集合;
event_base* base = dispatching();
// 创建socket链接监听
bufferevent* bev = bufferevent_socket_new(base, fd, events);
BBCA* bbca = new BBCA();
bbca->base = base;
bbca->addr = addr;
// 设置
bufferevent_setcb(bev, readcb, writecb, eventcb, bbca);
// 启用
bufferevent_enable(bev, EV_READ | EV_WRITE);
m_bevs.insert(std::pair<event_base*, bufferevent*>(base, bev));
printf("event_base: %p, fd: %d\n", base, fd);
}
void ev_pool::add_event(evutil_socket_t fd, short events, event_callback_fn callback, void *callback_arg) {
event* ev = event_new(dispatching(), fd, events, callback, callback_arg);
event_add(ev, nullptr);
}
event_base *ev_pool::dispatching() {
static auto base = m_ev_bases.begin();
if (base != m_ev_bases.end())
base;
else
base = m_ev_bases.begin();
return *base++;
}
void ev_pool::ev_loop_exit(event_base* base) {
free();
}
void ev_pool::ev_base_exit(event_base* base) {
auto b = std::remove(m_ev_bases.begin(), m_ev_bases.end(), base);
bufferevent_free(m_bevs.find(*b)->second);
event_base_loopexit(base, nullptr);
event_base_free(base);
m_bevs.erase(m_bevs.find(*b));
m_ev_bases.erase(b);
}
void ev_pool::free() {
for (const auto &item: m_bevs) {
bufferevent_free(item.second);
}
for (const auto &item: m_ev_bases) {
event_base_free(item);
}
m_bevs.clear();
m_ev_bases.clear();
}
void ev_pool::run() {
m_pool.wait();
}