// // Created by dongl on 23-5-29. // #include "Storage.h" #include "template/MsgTemplate.h" #include "IMDataPacket.h" #include #include MSG::Storage::Storage(SafeQueue *timeLine, db_base* db) : m_timeLine(timeLine), m_db_name(""), m_table("") { storage_push_lister_queue(); } MSG::Storage::Storage(std::string&& db_name, std::string&& table) : m_db_name(db_name), m_table(table) { m_timeLine = new SafeQueue(); storage_push_lister_queue(); } MSG::Storage::~Storage() { delete m_timeLine; } // 储存库 push void MSG::Storage::push(StorageMsg* msg) { // 添加至队列 m_timeLine->push(msg); } void MSG::Storage::storage_push_lister_queue() { std::function fun = [&] { // 取mongo链接 auto coll = hit_db_coll(m_db_name, m_table); while (true) { while (!m_timeLine->value().empty()) { /// 插入单个 // 弹出msg队列 此cpp只负责储存库 不负责同步库 auto msg = m_timeLine->pull(); // 执行插入 auto insert_one_result = coll.insert_one(MsgTemplate::session_msg(msg->msg_type, msg->session_type, msg->message_id, msg->time, msg->account, msg->im_msg_data)); auto doc_id = insert_one_result->inserted_id(); printf("[msg insert mongo] %s\n", doc_id.type() == bsoncxx::type::k_oid ? "msg insert" : "not msg insert"); } } }; for (int i = 0; i < 2; ++i) { std::thread t(fun); printf("%ld", t.get_id()); t.detach(); } }