From 7079d3100646281428bda46e291827d18ff85f15 Mon Sep 17 00:00:00 2001 From: dongl <2725096176@qq.com> Date: Sat, 1 Jul 2023 20:22:27 +0800 Subject: [PATCH] =?UTF-8?q?MP=20=E5=8F=88=20=E4=BF=AE=E6=94=B9=E5=AE=9A?= =?UTF-8?q?=E4=B9=89=E4=BA=86=20im=5Fmsg=5Fdata=20=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=8C=85=20=E5=90=8C=E6=AD=A5=E5=BA=93=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E4=BA=86=E4=B8=80=E4=BA=9B=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- MDB/imm_mongodb/template/MsgTemplate.h | 4 +-- MP/IMDataPacket.cpp | 21 +++++------ MP/IMDataPacket.h | 2 +- MP/MsgData.h | 4 +-- MP/protohuf/mp.im.proto | 10 +++--- MS/mmm/agreement.cpp | 2 +- MS/mmm/agreement.h | 4 +-- MS/mmm/analysis.h | 4 +-- MessageSystem/CMakeLists.txt | 2 +- MessageSystem/TimeLine.h | 18 ++++++++-- MessageSystem/storage/Storage.cpp | 49 +++++++++++++++++++------- MessageSystem/storage/Storage.h | 23 +++++------- MessageSystem/storage/Syn.cpp | 5 +++ MessageSystem/storage/Syn.h | 14 ++++++++ TEST/CMakeLists.txt | 1 + TEST/main.cpp | 14 ++++++++ 16 files changed, 119 insertions(+), 58 deletions(-) create mode 100644 MessageSystem/storage/Syn.cpp create mode 100644 MessageSystem/storage/Syn.h diff --git a/MDB/imm_mongodb/template/MsgTemplate.h b/MDB/imm_mongodb/template/MsgTemplate.h index 0549b0c..f039d96 100644 --- a/MDB/imm_mongodb/template/MsgTemplate.h +++ b/MDB/imm_mongodb/template/MsgTemplate.h @@ -19,14 +19,14 @@ using bsoncxx::builder::basic::make_document; class MsgTemplate { public: static bsoncxx::document::view session_msg(mp::MP_SUB_TYPE msg_type, mp::MP_SUB_TYPE session_type, - int64_t message_id, time_t date, int64_t account, + int64_t message_id, time_t time, int64_t account, const std::string& msg_data) { // 要插入的视图 auto doc_value = make_document( kvp("msg_type", msg_type), // 200 text消息 kvp("session_type", session_type), // 300 单体会话 kvp("message_id", message_id), // 0 会话级id - kvp("date", date), // 时间 + kvp("date", time), // 时间 kvp("account", account), // 目标/来源 kvp("msg_data", msg_data.c_str()) // 消息内容 ); diff --git a/MP/IMDataPacket.cpp b/MP/IMDataPacket.cpp index df89f96..79c50fc 100644 --- a/MP/IMDataPacket.cpp +++ b/MP/IMDataPacket.cpp @@ -12,9 +12,9 @@ IMDataPacket::IMDataPacket(mp::MP_TYPE type, mp::im::msg_data* data) : IMDataPacket::IMDataPacket(mp::MP_TYPE type, mp::MP_SUB_TYPE subType, mp::MP_SUB_TYPE sessionType, uint64_t messageId, - time_t time, uint64_t account, std::string &imMsgData) : + time_t time, uint64_t account, const std::string &imMsgData) : Mph(type), - MsgData(subType, sessionType,messageId, time, account,imMsgData) { + MsgData(subType, sessionType,messageId, time, account, imMsgData) { } @@ -29,7 +29,7 @@ std::string IMDataPacket::packet() { // 包体长度 mph->set_mpb_size(data->ByteSizeLong()); - // im_data_packet 的 L 为 16bit + // im_data_packet 的 L 为 8 bit std::string temp; // 判断是否超过数据包限制大小 一个包 大概0.06MB @@ -37,16 +37,13 @@ std::string IMDataPacket::packet() { return "超过数据包限制大小,无法构造,长度请低于65535, 或者分包"; } - // L -// temp.push_back(mph->ByteSizeLong()); - if (mph->ByteSizeLong() + data->ByteSizeLong() <= 255) { - temp.push_back(0); + /// 顺序 LTV + // L 设置 T 的大小 + if (mph->ByteSizeLong() <= 255) { + // 添加 L temp.push_back(mph->ByteSizeLong()); - } else { - short len = mph->ByteSizeLong() + data->ByteSizeLong(); - char LTV_L[2]; - memcpy(LTV_L, &len, sizeof(LTV_L)); - temp.append(LTV_L); + // T 设置 V 的大小 + mph->set_mpb_size(data->ByteSizeLong()); } // T diff --git a/MP/IMDataPacket.h b/MP/IMDataPacket.h index f03c3e1..1514d16 100644 --- a/MP/IMDataPacket.h +++ b/MP/IMDataPacket.h @@ -16,7 +16,7 @@ public: IMDataPacket(mp::MP_TYPE type, mp::MP_SUB_TYPE subType, mp::MP_SUB_TYPE sessionType, uint64_t messageId, - time_t time, uint64_t account, std::string &imMsgData); + time_t time, uint64_t account, const std::string &imMsgData); IMDataPacket(mp::MP_TYPE type, mp::MP_SUB_TYPE subType, uint64_t messageId, time_t time); diff --git a/MP/MsgData.h b/MP/MsgData.h index da1e914..5f6989b 100644 --- a/MP/MsgData.h +++ b/MP/MsgData.h @@ -12,14 +12,14 @@ public: explicit MsgData(mp::im::msg_data *data) : data(data) {} MsgData(mp::MP_SUB_TYPE sub_type, mp::MP_SUB_TYPE session_type, - uint64_t message_id, time_t time, uint64_t account, std::string& im_msg_data) { + uint64_t message_id, time_t time, uint64_t account, const std::string& im_msg_data) { data = new mp::im::msg_data(); data->set_msg_type(sub_type); data->set_session_type(session_type); data->set_message_id(message_id); data->set_time(time); data->set_account(account); - data->set_allocated_im_msg_data(&im_msg_data); + data->set_im_msg_data(im_msg_data); } MsgData(mp::MP_SUB_TYPE sub_type, uint64_t message_id, time_t time) { diff --git a/MP/protohuf/mp.im.proto b/MP/protohuf/mp.im.proto index 4b4d488..ce780a6 100644 --- a/MP/protohuf/mp.im.proto +++ b/MP/protohuf/mp.im.proto @@ -15,12 +15,12 @@ message notice { // pull push // 获取/推送 [消息包] // 通知数据包 拉推包 实际数据包 请求 响应都有 message msg_data { - MP_SUB_TYPE msg_type = 1; // [] 数组 // 消息类型 子命令 分辨什么类型的消息 文本 视频 音频? - MP_SUB_TYPE session_type = 2; // [] 数组 // 会话 目标/来源 类型 分辨是群消息还是 单体消息 - uint64 message_id = 3; // [] 数组 // 消息id 字节序 转成 uint64 + MP_SUB_TYPE msg_type = 1; // 消息类型 子命令 分辨什么类型的消息 文本 视频 音频? + MP_SUB_TYPE session_type = 2; // 会话 目标/来源 类型 分辨是群消息还是 单体消息 + uint64 message_id = 3; // 消息id 字节序 转成 uint64 uint64 time = 4; // 时间 - uint64 account = 5; // [] 数组 // 消息 目标/来源 - string im_msg_data = 6; // [] 数组 // 消息数据字节序 + uint64 account = 5; // 消息 目标/来源 + string im_msg_data = 6; // 消息数据字节序 } diff --git a/MS/mmm/agreement.cpp b/MS/mmm/agreement.cpp index 9fd03d5..f7d5684 100644 --- a/MS/mmm/agreement.cpp +++ b/MS/mmm/agreement.cpp @@ -15,7 +15,7 @@ void agreement_request::set (std::shared_ptr &mph, std::shared_ptr &mph, std::shared_ptr &data, bufferevent *bev, +void agreement_request::set(std::shared_ptr &mph, std::shared_ptr &data, bufferevent *bev, sockaddr_in *addr) { m_mph = mph; m_data = data; diff --git a/MS/mmm/agreement.h b/MS/mmm/agreement.h index f4b0e7f..969b37c 100644 --- a/MS/mmm/agreement.h +++ b/MS/mmm/agreement.h @@ -30,13 +30,13 @@ public: public: void set (std::shared_ptr &mph, std::shared_ptr& request, bufferevent* bev, sockaddr_in* addr); - void set (std::shared_ptr &mph, std::shared_ptr& data, bufferevent* bev, sockaddr_in* addr); + void set (std::shared_ptr &mph, std::shared_ptr& data, bufferevent* bev, sockaddr_in* addr); public: // 包头 std::shared_ptr m_mph; // im 包体 data - std::shared_ptr m_data; + std::shared_ptr m_data; // *********** 逻辑包体 mp::body m_body; mp::cqi m_cqi; diff --git a/MS/mmm/analysis.h b/MS/mmm/analysis.h index 8af77c0..6645f08 100644 --- a/MS/mmm/analysis.h +++ b/MS/mmm/analysis.h @@ -15,7 +15,7 @@ public: analysis(std::shared_ptr& mph, std::shared_ptr& request) : m_mph(mph), m_request(request) { } - analysis(std::shared_ptr& mph, std::shared_ptr& data) : m_mph(mph), m_data(data) { + analysis(std::shared_ptr& mph, std::shared_ptr& data) : m_mph(mph), m_data(data) { } @@ -35,7 +35,7 @@ public: private: std::shared_ptr m_mph; std::shared_ptr m_request; - std::shared_ptr m_data; + std::shared_ptr m_data; }; diff --git a/MessageSystem/CMakeLists.txt b/MessageSystem/CMakeLists.txt index 73b13ac..c2439e1 100644 --- a/MessageSystem/CMakeLists.txt +++ b/MessageSystem/CMakeLists.txt @@ -12,7 +12,7 @@ add_library(MessageSystem ${STORAGE} ${SYN} TimeLine.cpp -) + ) target_link_libraries(MessageSystem imm_mongodb diff --git a/MessageSystem/TimeLine.h b/MessageSystem/TimeLine.h index 7757e9e..f7a705b 100644 --- a/MessageSystem/TimeLine.h +++ b/MessageSystem/TimeLine.h @@ -13,23 +13,37 @@ struct SynMsg { uint64_t message_id; - time_t msg_time; + time_t time; +}; + +struct StorageMsg { + mp::MP_SUB_TYPE msg_type; + mp::MP_SUB_TYPE session_type; + int64_t message_id; + time_t time; + int64_t account; + std::string im_msg_data; }; template class TimeLine { - public: void push(T ele) { + mutex.lock(); queue.push(ele); + mutex.unlock(); } T pull() { + std::lock_guard lockGuard(mutex); auto ele = queue.front(); queue.back(); return ele; } + std::queue& value() { + return queue; + } private: std::mutex mutex; std::queue queue; diff --git a/MessageSystem/storage/Storage.cpp b/MessageSystem/storage/Storage.cpp index 3c38a1e..ceee36a 100644 --- a/MessageSystem/storage/Storage.cpp +++ b/MessageSystem/storage/Storage.cpp @@ -3,28 +3,51 @@ // #include "Storage.h" +#include "template/MsgTemplate.h" -MSG::Storage::Storage(TimeLine *timeLine, db_base* db) : m_timeLine(timeLine), m_db(db) {} -MSG::Storage::Storage() { - m_timeLine = new TimeLine(); +#include +#include + +MSG::Storage::Storage(TimeLine *timeLine, db_base* db) + : m_timeLine(timeLine), m_db(db), m_db_name(""), m_table("") {} + +MSG::Storage::Storage(std::string&& db_name, std::string&& table) : m_db_name(db_name), m_table(table) { + m_timeLine = new TimeLine(); m_db = new db_base(); } // 储存库 push -void MSG::Storage::push(MSGFormat::Storage* msg) { - // 添加至信箱 +void MSG::Storage::push(StorageMsg* msg) { + // 添加至信箱 同步库 m_timeLine->push(msg); - auto coll = m_db->hit_db_coll("im_session", "chat"); - coll.find_one(make_document(kvp("count", 1))); } void MSG::Storage::pull() { -// m_timeLine->pull(); - auto coll = m_db->hit_db_coll("im_session", "chat"); - auto temp = coll.find_one(make_document(kvp("count", 1))); -// if (temp) { -// std::cout << temp->find("_id")->get_oid().value.to_string() << std::endl; -// } + auto coll = m_db->hit_db_coll(m_db_name, m_table); +} + + +void MSG::Storage::storage_push_queue() { + // 取mongo链接 + auto coll = m_db->hit_db_coll(m_db_name, m_table); + + std::function fun = [&] { + while (true) { + while (!m_timeLine->value().empty()) { + // 弹出msg队列 此cpp只负责储存库 不负责同步库 + auto msg = m_timeLine->pull(); + // 执行插入 + coll.insert_one(MsgTemplate::session_msg(msg->msg_type, msg->session_type, + msg->message_id, msg->time, msg->account, msg->im_msg_data)); + } + } + }; + + for (int i = 0; i < 2; ++i) { + std::thread t(fun); + printf("%ld", t.get_id()); + t.detach(); + } } diff --git a/MessageSystem/storage/Storage.h b/MessageSystem/storage/Storage.h index 0380e54..3f24a56 100644 --- a/MessageSystem/storage/Storage.h +++ b/MessageSystem/storage/Storage.h @@ -9,30 +9,23 @@ #include "db_base.h" #include "../TimeLine.h" -namespace MSGFormat { - struct Storage { - mp::MP_SUB_TYPE msg_type; - mp::MP_SUB_TYPE session_type; - uint64_t message_id; - time_t msg_time; - uint64_t account; - std::string msg_data; - }; -} - namespace MSG { class Storage { public: - explicit Storage(TimeLine *timeLine, db_base* db); - Storage(); + Storage(TimeLine *timeLine, db_base* db); + Storage(std::string&& db_name, std::string&& table); public: - void push(MSGFormat::Storage* msg); + void push(StorageMsg* msg); void pull(); + void storage_push_queue(); + private: db_base* m_db = nullptr; - TimeLine * m_timeLine = nullptr; + TimeLine * m_timeLine = nullptr; + + std::string m_db_name, m_table; }; } diff --git a/MessageSystem/storage/Syn.cpp b/MessageSystem/storage/Syn.cpp new file mode 100644 index 0000000..4d4a0d7 --- /dev/null +++ b/MessageSystem/storage/Syn.cpp @@ -0,0 +1,5 @@ +// +// Created by dongl on 23-7-1. +// + +#include "Syn.h" diff --git a/MessageSystem/storage/Syn.h b/MessageSystem/storage/Syn.h new file mode 100644 index 0000000..2630151 --- /dev/null +++ b/MessageSystem/storage/Syn.h @@ -0,0 +1,14 @@ +// +// Created by dongl on 23-7-1. +// + +#ifndef IM2_SYN_H +#define IM2_SYN_H + + +class Syn { + +}; + + +#endif //IM2_SYN_H diff --git a/TEST/CMakeLists.txt b/TEST/CMakeLists.txt index 0676552..04f2f2e 100644 --- a/TEST/CMakeLists.txt +++ b/TEST/CMakeLists.txt @@ -20,6 +20,7 @@ add_executable(TEST main.cpp) target_link_libraries(TEST + MP works MessageSystem imm_mysqldb diff --git a/TEST/main.cpp b/TEST/main.cpp index f9f7c73..e30d67a 100644 --- a/TEST/main.cpp +++ b/TEST/main.cpp @@ -10,6 +10,7 @@ #include "works/db/UserDB.h" #include "smtp/send_email.h" #include "storage/Storage.h" +#include "IMDataPacket.h" int main(int argc, char **argv) { testing::InitGoogleTest(&argc, argv); @@ -124,4 +125,17 @@ TEST(selsct_is_friend, fecth_frinds_Test) { printf("1111111, %c\n", kk); printf("2725096176, %b\n", user); +} + +TEST (MP_IM_PUSH_MSG, MP_IM_PUSH_MSG_TEST) { + auto temp = new IMDataPacket(mp::MP_TYPE::MP_IM_PUSH_MSG, // 推消息数据包头 + mp::MP_SUB_TYPE::MP_IM_TEXT, // 数据包体 消息类型 + mp::MP_SUB_TYPE::MP_SESSION_FRIEND, // 数据包体 会话类型 + 1, + time(nullptr), + 783556037, + "hello test msg!" + ); + + std::cout << temp->packet() << std::endl; } \ No newline at end of file