2023-07-04 14:10:13 +08:00

65 lines
1.7 KiB
C++

//
// Created by dongl on 23-5-29.
//
#include "Storage.h"
#include "template/MsgTemplate.h"
#include "IMDataPacket.h"
#include <utility>
#include <thread>
MSG::Storage::Storage(SafeQueue<StorageMsg *> *timeLine, db_base* db)
: m_timeLine(timeLine), m_db_name(""), m_table("") {
storage_push_lister_queue();
}
MSG::Storage::Storage(std::string&& db_name, std::string&& table) : m_db_name(db_name), m_table(table) {
m_timeLine = new SafeQueue<StorageMsg *>();
storage_push_lister_queue();
}
MSG::Storage::~Storage() {
delete m_timeLine;
}
// 储存库 push
void MSG::Storage::push(StorageMsg* msg) {
// 添加至队列
m_timeLine->push(msg);
}
void MSG::Storage::storage_push_lister_queue() {
std::function<void()> fun = [&] {
// 取mongo链接
auto coll = hit_db_coll(m_db_name, m_table);
while (true) {
while (!m_timeLine->value().empty()) {
/// 插入单个
// 弹出msg队列 此cpp只负责储存库 不负责同步库
auto msg = m_timeLine->pull();
// 执行插入
auto insert_one_result = coll.insert_one(MsgTemplate::session_msg(msg->msg_type, msg->session_type,
msg->message_id, msg->time, msg->account, msg->im_msg_data));
auto doc_id = insert_one_result->inserted_id();
printf("[msg insert mongo] %s\n", doc_id.type() == bsoncxx::type::k_oid ? "msg insert" : "not msg insert");
}
}
};
for (int i = 0; i < 2; ++i) {
std::thread t(fun);
printf("%ld", t.get_id());
t.detach();
}
}