// // Created by dongl on 23-5-12. // #include #include #include #include "management.h" #include "MP/Response.h" std::queue management::resp_msg; void management::read_packet(bufferevent *bev) { char buff[2048] = {0}; std::atomic len_index = bufferevent_read(bev, buff, 2048); // 处理粘包 char packet_h[50] = {0}; char packet_b[256] = {0}; while (len_index > 0) { memset(packet_h, 0, sizeof(packet_h)); memset(packet_b, 0, sizeof(packet_b)); /// read L 读包长度 uint8_t packetLen; // 取包长度 memcpy(&packetLen, buff, 1); // 更新buffer memcpy(buff, buff + 1, strlen(buff) - 1); // 更新buffer长度 len_index -= 1; /// read V 读包头 // 取包头 memcpy(packet_h, buff, packetLen); // 更新buffer memcpy(buff, buff + packetLen, strlen(buff) - packetLen); // 更新buffer长度 len_index -= packetLen; // 解包头 auto mph = new mp::mph(); mph->ParseFromString(packet_h); /// read V 读包体 包头内含有包体长度 // 取包体 memcpy(packet_b, buff, mph->mpb_size()); // 更新buffer memcpy(buff, buff + mph->mpb_size(), strlen(buff) - mph->mpb_size()); // 更新buffer长度 len_index -= mph->mpb_size(); // 解包体 auto response = new mp::response(); response->ParseFromString(packet_b); auto resp = new agreement_response(); resp->set(response); resp_msg.push(resp); } } agreement_response management::fetch_packet() { agreement_response ret; auto beforeTime = std::chrono::steady_clock::now(); // double duration = 0; while (resp_msg.empty()) { // auto afterTime = std::chrono::steady_clock::now(); // duration += std::chrono::duration(afterTime - beforeTime).count(); // // if (duration > 5000) { // // } std::this_thread::sleep_for(std::chrono::microseconds(10)); } auto afterTime = std::chrono::steady_clock::now(); ret = *resp_msg.front(); resp_msg.pop(); //毫秒级 double duration_millsecond = std::chrono::duration(afterTime - beforeTime).count(); std::cout << "" << duration_millsecond << "毫秒" << std::endl; return ret; }