MP 又 修改定义了 im_msg_data 数据包

同步库添加了一些代码
This commit is contained in:
dongl 2023-07-01 20:22:27 +08:00
parent 66da42c0f8
commit 7079d31006
16 changed files with 119 additions and 58 deletions

View File

@ -19,14 +19,14 @@ using bsoncxx::builder::basic::make_document;
class MsgTemplate {
public:
static bsoncxx::document::view session_msg(mp::MP_SUB_TYPE msg_type, mp::MP_SUB_TYPE session_type,
int64_t message_id, time_t date, int64_t account,
int64_t message_id, time_t time, int64_t account,
const std::string& msg_data) {
// 要插入的视图
auto doc_value = make_document(
kvp("msg_type", msg_type), // 200 text消息
kvp("session_type", session_type), // 300 单体会话
kvp("message_id", message_id), // 0 会话级id
kvp("date", date), // 时间
kvp("date", time), // 时间
kvp("account", account), // 目标/来源
kvp("msg_data", msg_data.c_str()) // 消息内容
);

View File

@ -12,9 +12,9 @@ IMDataPacket::IMDataPacket(mp::MP_TYPE type, mp::im::msg_data* data) :
IMDataPacket::IMDataPacket(mp::MP_TYPE type, mp::MP_SUB_TYPE subType,
mp::MP_SUB_TYPE sessionType, uint64_t messageId,
time_t time, uint64_t account, std::string &imMsgData) :
time_t time, uint64_t account, const std::string &imMsgData) :
Mph(type),
MsgData(subType, sessionType,messageId, time, account,imMsgData) {
MsgData(subType, sessionType,messageId, time, account, imMsgData) {
}
@ -29,7 +29,7 @@ std::string IMDataPacket::packet() {
// 包体长度
mph->set_mpb_size(data->ByteSizeLong());
// im_data_packet 的 L 为 16bit
// im_data_packet 的 L 为 8 bit
std::string temp;
// 判断是否超过数据包限制大小 一个包 大概0.06MB
@ -37,16 +37,13 @@ std::string IMDataPacket::packet() {
return "超过数据包限制大小无法构造长度请低于65535 或者分包";
}
// L
// temp.push_back(mph->ByteSizeLong());
if (mph->ByteSizeLong() + data->ByteSizeLong() <= 255) {
temp.push_back(0);
/// 顺序 LTV
// L 设置 T 的大小
if (mph->ByteSizeLong() <= 255) {
// 添加 L
temp.push_back(mph->ByteSizeLong());
} else {
short len = mph->ByteSizeLong() + data->ByteSizeLong();
char LTV_L[2];
memcpy(LTV_L, &len, sizeof(LTV_L));
temp.append(LTV_L);
// T 设置 V 的大小
mph->set_mpb_size(data->ByteSizeLong());
}
// T

View File

@ -16,7 +16,7 @@ public:
IMDataPacket(mp::MP_TYPE type, mp::MP_SUB_TYPE subType,
mp::MP_SUB_TYPE sessionType, uint64_t messageId,
time_t time, uint64_t account, std::string &imMsgData);
time_t time, uint64_t account, const std::string &imMsgData);
IMDataPacket(mp::MP_TYPE type, mp::MP_SUB_TYPE subType,
uint64_t messageId, time_t time);

View File

@ -12,14 +12,14 @@ public:
explicit MsgData(mp::im::msg_data *data) : data(data) {}
MsgData(mp::MP_SUB_TYPE sub_type, mp::MP_SUB_TYPE session_type,
uint64_t message_id, time_t time, uint64_t account, std::string& im_msg_data) {
uint64_t message_id, time_t time, uint64_t account, const std::string& im_msg_data) {
data = new mp::im::msg_data();
data->set_msg_type(sub_type);
data->set_session_type(session_type);
data->set_message_id(message_id);
data->set_time(time);
data->set_account(account);
data->set_allocated_im_msg_data(&im_msg_data);
data->set_im_msg_data(im_msg_data);
}
MsgData(mp::MP_SUB_TYPE sub_type, uint64_t message_id, time_t time) {

View File

@ -15,12 +15,12 @@ message notice { // pull push
// / [] //
message msg_data {
MP_SUB_TYPE msg_type = 1; // [] //
MP_SUB_TYPE session_type = 2; // [] // /
uint64 message_id = 3; // [] // id uint64
MP_SUB_TYPE msg_type = 1; //
MP_SUB_TYPE session_type = 2; // /
uint64 message_id = 3; // id uint64
uint64 time = 4; //
uint64 account = 5; // [] // /
string im_msg_data = 6; // [] //
uint64 account = 5; // /
string im_msg_data = 6; //
}

View File

@ -15,7 +15,7 @@ void agreement_request::set (std::shared_ptr<mp::mph> &mph, std::shared_ptr<mp::
m_addr = addr;
}
void agreement_request::set(std::shared_ptr<mp::mph> &mph, std::shared_ptr<mp::im::data> &data, bufferevent *bev,
void agreement_request::set(std::shared_ptr<mp::mph> &mph, std::shared_ptr<mp::im::msg_data> &data, bufferevent *bev,
sockaddr_in *addr) {
m_mph = mph;
m_data = data;

View File

@ -30,13 +30,13 @@ public:
public:
void set (std::shared_ptr<mp::mph> &mph, std::shared_ptr<mp::request>& request, bufferevent* bev, sockaddr_in* addr);
void set (std::shared_ptr<mp::mph> &mph, std::shared_ptr<mp::im::data>& data, bufferevent* bev, sockaddr_in* addr);
void set (std::shared_ptr<mp::mph> &mph, std::shared_ptr<mp::im::msg_data>& data, bufferevent* bev, sockaddr_in* addr);
public:
// 包头
std::shared_ptr<mp::mph> m_mph;
// im 包体 data
std::shared_ptr<mp::im::data> m_data;
std::shared_ptr<mp::im::msg_data> m_data;
// *********** 逻辑包体
mp::body m_body;
mp::cqi m_cqi;

View File

@ -15,7 +15,7 @@ public:
analysis(std::shared_ptr<mp::mph>& mph, std::shared_ptr<mp::request>& request) : m_mph(mph), m_request(request) {
}
analysis(std::shared_ptr<mp::mph>& mph, std::shared_ptr<mp::im::data>& data) : m_mph(mph), m_data(data) {
analysis(std::shared_ptr<mp::mph>& mph, std::shared_ptr<mp::im::msg_data>& data) : m_mph(mph), m_data(data) {
}
@ -35,7 +35,7 @@ public:
private:
std::shared_ptr<mp::mph> m_mph;
std::shared_ptr<mp::request> m_request;
std::shared_ptr<mp::im::data> m_data;
std::shared_ptr<mp::im::msg_data> m_data;
};

View File

@ -12,7 +12,7 @@ add_library(MessageSystem
${STORAGE}
${SYN}
TimeLine.cpp
)
)
target_link_libraries(MessageSystem
imm_mongodb

View File

@ -13,23 +13,37 @@
struct SynMsg {
uint64_t message_id;
time_t msg_time;
time_t time;
};
struct StorageMsg {
mp::MP_SUB_TYPE msg_type;
mp::MP_SUB_TYPE session_type;
int64_t message_id;
time_t time;
int64_t account;
std::string im_msg_data;
};
template<class T>
class TimeLine {
public:
void push(T ele) {
mutex.lock();
queue.push(ele);
mutex.unlock();
}
T pull() {
std::lock_guard lockGuard(mutex);
auto ele = queue.front();
queue.back();
return ele;
}
std::queue<T>& value() {
return queue;
}
private:
std::mutex mutex;
std::queue<T> queue;

View File

@ -3,28 +3,51 @@
//
#include "Storage.h"
#include "template/MsgTemplate.h"
MSG::Storage::Storage(TimeLine<MSGFormat::Storage *> *timeLine, db_base* db) : m_timeLine(timeLine), m_db(db) {}
MSG::Storage::Storage() {
m_timeLine = new TimeLine<MSGFormat::Storage*>();
#include <utility>
#include <thread>
MSG::Storage::Storage(TimeLine<StorageMsg *> *timeLine, db_base* db)
: m_timeLine(timeLine), m_db(db), m_db_name(""), m_table("") {}
MSG::Storage::Storage(std::string&& db_name, std::string&& table) : m_db_name(db_name), m_table(table) {
m_timeLine = new TimeLine<StorageMsg *>();
m_db = new db_base();
}
// 储存库 push
void MSG::Storage::push(MSGFormat::Storage* msg) {
// 添加至信箱
void MSG::Storage::push(StorageMsg* msg) {
// 添加至信箱 同步库
m_timeLine->push(msg);
auto coll = m_db->hit_db_coll("im_session", "chat");
coll.find_one(make_document(kvp("count", 1)));
}
void MSG::Storage::pull() {
// m_timeLine->pull();
auto coll = m_db->hit_db_coll("im_session", "chat");
auto temp = coll.find_one(make_document(kvp("count", 1)));
// if (temp) {
// std::cout << temp->find("_id")->get_oid().value.to_string() << std::endl;
// }
auto coll = m_db->hit_db_coll(m_db_name, m_table);
}
void MSG::Storage::storage_push_queue() {
// 取mongo链接
auto coll = m_db->hit_db_coll(m_db_name, m_table);
std::function<void()> fun = [&] {
while (true) {
while (!m_timeLine->value().empty()) {
// 弹出msg队列 此cpp只负责储存库 不负责同步库
auto msg = m_timeLine->pull();
// 执行插入
coll.insert_one(MsgTemplate::session_msg(msg->msg_type, msg->session_type,
msg->message_id, msg->time, msg->account, msg->im_msg_data));
}
}
};
for (int i = 0; i < 2; ++i) {
std::thread t(fun);
printf("%ld", t.get_id());
t.detach();
}
}

View File

@ -9,30 +9,23 @@
#include "db_base.h"
#include "../TimeLine.h"
namespace MSGFormat {
struct Storage {
mp::MP_SUB_TYPE msg_type;
mp::MP_SUB_TYPE session_type;
uint64_t message_id;
time_t msg_time;
uint64_t account;
std::string msg_data;
};
}
namespace MSG {
class Storage {
public:
explicit Storage(TimeLine<MSGFormat::Storage *> *timeLine, db_base* db);
Storage();
Storage(TimeLine<StorageMsg *> *timeLine, db_base* db);
Storage(std::string&& db_name, std::string&& table);
public:
void push(MSGFormat::Storage* msg);
void push(StorageMsg* msg);
void pull();
void storage_push_queue();
private:
db_base* m_db = nullptr;
TimeLine<MSGFormat::Storage*> * m_timeLine = nullptr;
TimeLine<StorageMsg *> * m_timeLine = nullptr;
std::string m_db_name, m_table;
};
}

View File

@ -0,0 +1,5 @@
//
// Created by dongl on 23-7-1.
//
#include "Syn.h"

View File

@ -0,0 +1,14 @@
//
// Created by dongl on 23-7-1.
//
#ifndef IM2_SYN_H
#define IM2_SYN_H
class Syn {
};
#endif //IM2_SYN_H

View File

@ -20,6 +20,7 @@ add_executable(TEST
main.cpp)
target_link_libraries(TEST
MP
works
MessageSystem
imm_mysqldb

View File

@ -10,6 +10,7 @@
#include "works/db/UserDB.h"
#include "smtp/send_email.h"
#include "storage/Storage.h"
#include "IMDataPacket.h"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
@ -125,3 +126,16 @@ TEST(selsct_is_friend, fecth_frinds_Test) {
printf("2725096176, %b\n", user);
}
TEST (MP_IM_PUSH_MSG, MP_IM_PUSH_MSG_TEST) {
auto temp = new IMDataPacket(mp::MP_TYPE::MP_IM_PUSH_MSG, // 推消息数据包头
mp::MP_SUB_TYPE::MP_IM_TEXT, // 数据包体 消息类型
mp::MP_SUB_TYPE::MP_SESSION_FRIEND, // 数据包体 会话类型
1,
time(nullptr),
783556037,
"hello test msg!"
);
std::cout << temp->packet() << std::endl;
}