Compare commits
2 Commits
7079d31006
...
5b12450a56
Author | SHA1 | Date | |
---|---|---|---|
5b12450a56 | |||
a764f9fcab |
@ -12,5 +12,7 @@ add_library(imm_mongodb
|
||||
|
||||
target_link_libraries(imm_mongodb
|
||||
mongocxx
|
||||
# mongoc-static-1.0
|
||||
bsoncxx
|
||||
bson-static-1.0
|
||||
)
|
BIN
MDB/imm_mongodb/lib/libbson-static-1.0.a
Normal file
BIN
MDB/imm_mongodb/lib/libbson-static-1.0.a
Normal file
Binary file not shown.
BIN
MDB/imm_mongodb/lib/libmongoc-static-1.0.a
Normal file
BIN
MDB/imm_mongodb/lib/libmongoc-static-1.0.a
Normal file
Binary file not shown.
@ -18,20 +18,31 @@ using bsoncxx::builder::basic::make_document;
|
||||
|
||||
class MsgTemplate {
|
||||
public:
|
||||
static bsoncxx::document::view session_msg(mp::MP_SUB_TYPE msg_type, mp::MP_SUB_TYPE session_type,
|
||||
|
||||
/**
|
||||
* 对于 mongo 的 通用消息 插入模板
|
||||
* @param msg_type 消息类型
|
||||
* @param session_type 会话类型
|
||||
* @param message_id 消息id
|
||||
* @param time 消息时间
|
||||
* @param account 消息来源
|
||||
* @param msg_data 消息数据
|
||||
* @return make_document -》 mongocxx
|
||||
*/
|
||||
static bsoncxx::document::value session_msg(mp::MP_SUB_TYPE msg_type, mp::MP_SUB_TYPE session_type,
|
||||
int64_t message_id, time_t time, int64_t account,
|
||||
const std::string& msg_data) {
|
||||
// 要插入的视图
|
||||
auto doc_value = make_document(
|
||||
kvp("msg_type", msg_type), // 200 text消息
|
||||
kvp("session_type", session_type), // 300 单体会话
|
||||
kvp("message_id", message_id), // 0 会话级id
|
||||
kvp("date", time), // 时间
|
||||
kvp("msg_id", message_id), // 0 会话级id
|
||||
kvp("time", time), // 时间
|
||||
kvp("account", account), // 目标/来源
|
||||
kvp("msg_data", msg_data.c_str()) // 消息内容
|
||||
);
|
||||
|
||||
return doc_value.view();
|
||||
return doc_value;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -18,11 +18,10 @@ IMDataPacket::IMDataPacket(mp::MP_TYPE type, mp::MP_SUB_TYPE subType,
|
||||
|
||||
}
|
||||
|
||||
IMDataPacket::IMDataPacket(mp::MP_TYPE type, mp::MP_SUB_TYPE subType,
|
||||
IMDataPacket::IMDataPacket(mp::MP_TYPE type, /*mp::MP_SUB_TYPE subType,*/
|
||||
uint64_t messageId, time_t time) :
|
||||
Mph(type),
|
||||
MsgData(subType, messageId, time) {
|
||||
|
||||
MsgData(messageId, time) {
|
||||
}
|
||||
|
||||
std::string IMDataPacket::packet() {
|
||||
|
@ -12,13 +12,35 @@
|
||||
|
||||
class IMDataPacket : public Mph, MsgData {
|
||||
public:
|
||||
|
||||
/**
|
||||
* 消息包
|
||||
* @param type 包类型
|
||||
* @param data protobuf 定义的 消息包类型
|
||||
*/
|
||||
IMDataPacket(mp::MP_TYPE type, mp::im::msg_data* data);
|
||||
|
||||
/**
|
||||
* 消息包
|
||||
* @param type 包类型
|
||||
* @param subType 消息类型
|
||||
* @param sessionType 会话类型
|
||||
* @param messageId 消息id
|
||||
* @param time 时间
|
||||
* @param account 来源
|
||||
* @param imMsgData 数据
|
||||
*/
|
||||
IMDataPacket(mp::MP_TYPE type, mp::MP_SUB_TYPE subType,
|
||||
mp::MP_SUB_TYPE sessionType, uint64_t messageId,
|
||||
time_t time, uint64_t account, const std::string &imMsgData);
|
||||
|
||||
IMDataPacket(mp::MP_TYPE type, mp::MP_SUB_TYPE subType,
|
||||
/**
|
||||
* 通知包
|
||||
* @param type 包类型
|
||||
* @param messageId 消息id
|
||||
* @param time 消息时间
|
||||
*/
|
||||
IMDataPacket(mp::MP_TYPE type, /*mp::MP_SUB_TYPE subType,*/
|
||||
uint64_t messageId, time_t time);
|
||||
|
||||
public:
|
||||
|
@ -29,6 +29,12 @@ public:
|
||||
data->set_time(time);
|
||||
}
|
||||
|
||||
MsgData(uint64_t message_id, time_t time) {
|
||||
data = new mp::im::msg_data();
|
||||
data->set_message_id(message_id);
|
||||
data->set_time(time);
|
||||
}
|
||||
|
||||
virtual ~MsgData() {
|
||||
delete data;
|
||||
}
|
||||
|
@ -86,6 +86,8 @@ enum MP_SUB_TYPE {
|
||||
|
||||
MP_ADD_CHECK = 80; // 需要回答问题的好友/群组 验证问题
|
||||
|
||||
MP_TYPE_NULL = 100; // 类型无意义 用于填充
|
||||
|
||||
///***********************************************************************************///
|
||||
|
||||
/// 200+ IM ***********************************************************************************///
|
||||
|
@ -16,6 +16,23 @@ session::session() {
|
||||
printf("timing end\n");
|
||||
}
|
||||
|
||||
session::session(session_build_type type) {
|
||||
if (type == SESSION_SUPPORT_ALL) {
|
||||
printf("timing begin\n");
|
||||
timing();
|
||||
printf("timing end\n");
|
||||
|
||||
} else if (type == SESSION_SUPPORT_SESSION) {
|
||||
printf("timing begin\n");
|
||||
timing();
|
||||
printf("timing end\n");
|
||||
}
|
||||
|
||||
else if (type == SESSION_SUPPORT_USER) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/// curr mem user curd user session
|
||||
void session::add_user(mp::sri* sri, std::shared_ptr<agreement_request>& request) {
|
||||
if (sri->subcommand() == mp::MP_LOGIN_SUCCESS) {
|
||||
@ -75,6 +92,7 @@ void session::init_session(bufferevent *bev) {
|
||||
}
|
||||
|
||||
|
||||
|
||||
// 给用户 添加会话信息
|
||||
void session::set_session(bufferevent* bev, const std::string &session_key, const std::string &session_value) {
|
||||
printf("code: %s\n", session_value.c_str());
|
||||
@ -198,3 +216,5 @@ std::optional<userinfo*> session::find_user(uint64_t account) {
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -25,10 +25,23 @@ struct bev_key {
|
||||
std::string value;
|
||||
};
|
||||
|
||||
enum session_build_type {
|
||||
SESSION_SUPPORT_USER = 0,
|
||||
SESSION_SUPPORT_SESSION = 1,
|
||||
SESSION_SUPPORT_ALL = 2,
|
||||
};
|
||||
|
||||
|
||||
class session {
|
||||
public:
|
||||
/**
|
||||
* 空构造是 全部支持
|
||||
*/
|
||||
session();
|
||||
/**
|
||||
* 选择支持
|
||||
*/
|
||||
session(session_build_type type);
|
||||
|
||||
public:
|
||||
// 添加用户
|
||||
@ -45,6 +58,7 @@ public:
|
||||
bool is_user(const std::string& account);
|
||||
// 是否有这个用户
|
||||
bool is_user(uint64_t account);
|
||||
|
||||
// 初始化 session
|
||||
void init_session(bufferevent* bev);
|
||||
// 设置 session
|
||||
|
@ -7,6 +7,7 @@ aux_source_directory(db/po DIR_WORKS_DB_PO)
|
||||
|
||||
include_directories(../mmm)
|
||||
include_directories(${CMAKE_SOURCE_DIR}/MDB/imm_mysqldb)
|
||||
include_directories(${CMAKE_SOURCE_DIR}/MessageSystem)
|
||||
|
||||
add_library(works
|
||||
${DIR_WORKS_CONTROLLER}
|
||||
|
@ -3,14 +3,8 @@
|
||||
//
|
||||
|
||||
#include "IMController.h"
|
||||
#include "message_push/MessageMgr.h"
|
||||
|
||||
void IMController::run(std::shared_ptr<agreement_request> request, std::shared_ptr<agreement_response> response) {
|
||||
// msg push 储存库内 正常会话 要同步库 储存库
|
||||
if (request->m_mph->mp_type() == mp::MP_IM_PUSH_MSG) {
|
||||
|
||||
}
|
||||
// 传来 im msg 临时会话 消息包本体 要同步库
|
||||
else if (request->m_mph->mp_type() == mp::MP_IM_MSG) {
|
||||
|
||||
}
|
||||
MessageMgr();
|
||||
}
|
||||
|
@ -21,9 +21,15 @@ mp::sri* PEVerifCodeService::send_email(const std::string &target_email, const s
|
||||
sri_clear();
|
||||
|
||||
emailcode = code == "0" ? emailcode : code;
|
||||
// 检查发送验证码
|
||||
bool state = send_email_def(target_email, emailcode);
|
||||
// 检查邮箱
|
||||
auto [is_exist, user] = userDb.select_user(target_email, "email");
|
||||
|
||||
if (state) {
|
||||
if (is_exist) {
|
||||
sri->set_subcommand(mp::MP_SUB_TYPE::MP_REGISTER_FAIL);
|
||||
sri->set_msg("邮箱已使用!");
|
||||
} else if (state) {
|
||||
sri->set_subcommand(mp::MP_SUB_TYPE::MP_CODE_SUCCESS);
|
||||
sri->set_msg("验证码已发送");
|
||||
} else {
|
||||
|
@ -15,6 +15,7 @@ public:
|
||||
|
||||
private:
|
||||
std::string emailcode;
|
||||
UserDB userDb;
|
||||
};
|
||||
|
||||
|
||||
|
@ -1,17 +1,28 @@
|
||||
project(MessageSystem)
|
||||
|
||||
aux_source_directory(message_push PUSH)
|
||||
aux_source_directory(storage STORAGE)
|
||||
aux_source_directory(synchronization SYN)
|
||||
aux_source_directory(message_push PUSH)
|
||||
aux_source_directory(message_push/online ONLINE)
|
||||
aux_source_directory(message_push/offline OFFLINE)
|
||||
aux_source_directory(message_storage STORAGE)
|
||||
aux_source_directory(message_pull PULL)
|
||||
aux_source_directory(message_safe SAFE_QUEUE)
|
||||
aux_source_directory(message_base BASE)
|
||||
|
||||
include_directories(${CMAKE_SOURCE_DIR}/include/libevent)
|
||||
include_directories(${CMAKE_SOURCE_DIR}/MP)
|
||||
include_directories(${CMAKE_SOURCE_DIR}/MDB/imm_mongodb)
|
||||
|
||||
include_directories(${CMAKE_SOURCE_DIR}/MessageSystem)
|
||||
include_directories(${CMAKE_SOURCE_DIR}/MS/mmm)
|
||||
|
||||
add_library(MessageSystem
|
||||
${PUSH}
|
||||
${ONLINE}
|
||||
${OFFLINE}
|
||||
${STORAGE}
|
||||
${SYN}
|
||||
TimeLine.cpp
|
||||
${PULL}
|
||||
${SAFE_QUEUE}
|
||||
${BASE}
|
||||
)
|
||||
|
||||
target_link_libraries(MessageSystem
|
||||
|
@ -1,5 +0,0 @@
|
||||
//
|
||||
// Created by dongl on 23-5-17.
|
||||
//
|
||||
|
||||
#include "TimeLine.h"
|
@ -1,55 +0,0 @@
|
||||
//
|
||||
// Created by dongl on 23-5-17.
|
||||
//
|
||||
|
||||
#ifndef IM2_TIMELINE_H
|
||||
#define IM2_TIMELINE_H
|
||||
|
||||
#include <string>
|
||||
#include <queue>
|
||||
#include "proto/mp.mp.pb.h"
|
||||
#include "storage/db_base.h"
|
||||
|
||||
|
||||
struct SynMsg {
|
||||
uint64_t message_id;
|
||||
time_t time;
|
||||
};
|
||||
|
||||
struct StorageMsg {
|
||||
mp::MP_SUB_TYPE msg_type;
|
||||
mp::MP_SUB_TYPE session_type;
|
||||
int64_t message_id;
|
||||
time_t time;
|
||||
int64_t account;
|
||||
std::string im_msg_data;
|
||||
};
|
||||
|
||||
template<class T>
|
||||
class TimeLine {
|
||||
public:
|
||||
void push(T ele) {
|
||||
mutex.lock();
|
||||
queue.push(ele);
|
||||
mutex.unlock();
|
||||
}
|
||||
|
||||
T pull() {
|
||||
std::lock_guard lockGuard(mutex);
|
||||
auto ele = queue.front();
|
||||
queue.back();
|
||||
return ele;
|
||||
}
|
||||
|
||||
std::queue<T>& value() {
|
||||
return queue;
|
||||
}
|
||||
private:
|
||||
std::mutex mutex;
|
||||
std::queue<T> queue;
|
||||
};
|
||||
|
||||
|
||||
|
||||
|
||||
#endif //IM2_TIMELINE_H
|
@ -3,8 +3,8 @@
|
||||
//
|
||||
|
||||
#include "db_base.h"
|
||||
#include <mongocxx/instance.hpp>
|
||||
#include <mongocxx/uri.hpp>
|
||||
#include "mongocxx/instance.hpp"
|
||||
#include "mongocxx/uri.hpp"
|
||||
|
||||
|
||||
db_base::db_base() {
|
||||
@ -12,6 +12,10 @@ db_base::db_base() {
|
||||
mongocxx::uri uri("mongodb://user_session:Aa316216@124.221.152.192:27017/?authSource=im_session");
|
||||
pool = new mongocxx::pool(uri);
|
||||
}
|
||||
db_base::~db_base() {
|
||||
delete pool;
|
||||
}
|
||||
|
||||
|
||||
mongocxx::pool::entry db_base::acquire() {
|
||||
return pool->acquire();
|
||||
@ -33,3 +37,4 @@ mongocxx::collection db_base::hit_db_coll(const std::string& db_name, const std:
|
||||
|
||||
|
||||
|
||||
|
@ -5,10 +5,10 @@
|
||||
#ifndef IM2_DB_BASE_H
|
||||
#define IM2_DB_BASE_H
|
||||
|
||||
#include <mongocxx/pool.hpp>
|
||||
#include <mongocxx/stdx.hpp>
|
||||
#include <bsoncxx/stdx/optional.hpp>
|
||||
#include <mongocxx/client.hpp>
|
||||
#include "mongocxx/pool.hpp"
|
||||
#include "mongocxx/stdx.hpp"
|
||||
#include "bsoncxx/stdx/optional.hpp"
|
||||
#include "mongocxx/client.hpp"
|
||||
|
||||
using bsoncxx::builder::basic::kvp;
|
||||
using bsoncxx::builder::basic::make_array;
|
||||
@ -17,6 +17,9 @@ using bsoncxx::builder::basic::make_document;
|
||||
class db_base {
|
||||
public:
|
||||
db_base();
|
||||
|
||||
virtual ~db_base();
|
||||
|
||||
mongocxx::pool::entry acquire();
|
||||
bsoncxx::stdx::optional<mongocxx::pool::entry> try_acquire();
|
||||
|
||||
@ -24,7 +27,7 @@ public:
|
||||
// 命中 库 表
|
||||
mongocxx::collection hit_db_coll(const std::string& db_name, const std::string& coll);
|
||||
|
||||
private:
|
||||
protected:
|
||||
mongocxx::pool* pool = nullptr;
|
||||
};
|
||||
|
43
MessageSystem/message_push/MessageMgr.cpp
Normal file
43
MessageSystem/message_push/MessageMgr.cpp
Normal file
@ -0,0 +1,43 @@
|
||||
//
|
||||
// Created by dongl on 23-7-4.
|
||||
//
|
||||
|
||||
#include "MessageMgr.h"
|
||||
#include "message_push/online/OnlineMessageMgr.h"
|
||||
#include "message_push/offline/OfflineMessageMgr.h"
|
||||
#include "IMDataPacket.h"
|
||||
|
||||
session* MessageMgr::session = new class session(SESSION_SUPPORT_USER);
|
||||
|
||||
MessageMgr::~MessageMgr() {
|
||||
delete session;
|
||||
}
|
||||
|
||||
|
||||
void MessageMgr::receive(const std::string &packet) {
|
||||
|
||||
}
|
||||
|
||||
void MessageMgr::send(const std::string &packet) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
// 接收
|
||||
void MessageMgr::receive(const agreement_request& request) {
|
||||
// 查看用户是否在线
|
||||
bool is_exist = session->is_user(request.m_body.account());
|
||||
if (is_exist) {
|
||||
OnlineMessageMgr();
|
||||
} else {
|
||||
OfflineMessageMgr();
|
||||
}
|
||||
}
|
||||
|
||||
// 发送
|
||||
void MessageMgr::send(const agreement_response& response) {
|
||||
IMDataPacket imDataPacket(mp::MP_TYPE::MP_IM_NOTICE, response.m_notice->time(), (time_t)response.m_notice->message_id());
|
||||
std::string packet = imDataPacket.packet();
|
||||
bufferevent_write(response.m_bev, packet.c_str(), packet.size());
|
||||
}
|
||||
|
45
MessageSystem/message_push/MessageMgr.h
Normal file
45
MessageSystem/message_push/MessageMgr.h
Normal file
@ -0,0 +1,45 @@
|
||||
//
|
||||
// Created by dongl on 23-7-4.
|
||||
//
|
||||
|
||||
#ifndef IM2_MESSAGEMGR_H
|
||||
#define IM2_MESSAGEMGR_H
|
||||
|
||||
|
||||
#include <string>
|
||||
#include "agreement.h"
|
||||
#include "session.h"
|
||||
|
||||
class MessageMgr {
|
||||
public:
|
||||
virtual ~MessageMgr();
|
||||
|
||||
public:
|
||||
/**
|
||||
* 接受消息
|
||||
* @param request const std::string 消息包原始二进制数据
|
||||
*/
|
||||
static void receive(const std::string& packet);
|
||||
/**
|
||||
* 已服务器为主体 的 发送消息
|
||||
* @param response const std::string 消息包原始二进制数据
|
||||
*/
|
||||
static void send(const std::string& packet);
|
||||
|
||||
/**
|
||||
* 接受消息
|
||||
* @param request agreement_request 解析好的消息协议
|
||||
*/
|
||||
static void receive(const agreement_request& request);
|
||||
/**
|
||||
* 已服务器为主体 的 发送消息
|
||||
* @param response agreement_response 解析好的消息协议
|
||||
*/
|
||||
static void send(const agreement_response& response);
|
||||
|
||||
private:
|
||||
static class session* session;
|
||||
};
|
||||
|
||||
|
||||
#endif //IM2_MESSAGEMGR_H
|
5
MessageSystem/message_push/offline/OfflineMessageMgr.cpp
Normal file
5
MessageSystem/message_push/offline/OfflineMessageMgr.cpp
Normal file
@ -0,0 +1,5 @@
|
||||
//
|
||||
// Created by dongl on 23-7-4.
|
||||
//
|
||||
|
||||
#include "OfflineMessageMgr.h"
|
14
MessageSystem/message_push/offline/OfflineMessageMgr.h
Normal file
14
MessageSystem/message_push/offline/OfflineMessageMgr.h
Normal file
@ -0,0 +1,14 @@
|
||||
//
|
||||
// Created by dongl on 23-7-4.
|
||||
//
|
||||
|
||||
#ifndef IM2_OFFLINEMESSAGEMGR_H
|
||||
#define IM2_OFFLINEMESSAGEMGR_H
|
||||
|
||||
|
||||
class OfflineMessageMgr {
|
||||
|
||||
};
|
||||
|
||||
|
||||
#endif //IM2_OFFLINEMESSAGEMGR_H
|
5
MessageSystem/message_push/online/OnlineMessageMgr.cpp
Normal file
5
MessageSystem/message_push/online/OnlineMessageMgr.cpp
Normal file
@ -0,0 +1,5 @@
|
||||
//
|
||||
// Created by dongl on 23-7-4.
|
||||
//
|
||||
|
||||
#include "OnlineMessageMgr.h"
|
17
MessageSystem/message_push/online/OnlineMessageMgr.h
Normal file
17
MessageSystem/message_push/online/OnlineMessageMgr.h
Normal file
@ -0,0 +1,17 @@
|
||||
//
|
||||
// Created by dongl on 23-7-4.
|
||||
//
|
||||
|
||||
#ifndef IM2_ONLINEMESSAGEMGR_H
|
||||
#define IM2_ONLINEMESSAGEMGR_H
|
||||
|
||||
|
||||
class OnlineMessageMgr {
|
||||
public:
|
||||
|
||||
private:
|
||||
|
||||
};
|
||||
|
||||
|
||||
#endif //IM2_ONLINEMESSAGEMGR_H
|
6
MessageSystem/message_safe/SafeQueue.cpp
Normal file
6
MessageSystem/message_safe/SafeQueue.cpp
Normal file
@ -0,0 +1,6 @@
|
||||
//
|
||||
// Created by dongl on 23-5-17.
|
||||
//
|
||||
|
||||
#include "SafeQueue.h"
|
||||
#include <utility>
|
103
MessageSystem/message_safe/SafeQueue.h
Normal file
103
MessageSystem/message_safe/SafeQueue.h
Normal file
@ -0,0 +1,103 @@
|
||||
//
|
||||
// Created by dongl on 23-5-17.
|
||||
//
|
||||
|
||||
#ifndef IM2_TIMELINE_H
|
||||
#define IM2_SAFEQUEUE_H
|
||||
|
||||
#include <string>
|
||||
#include <queue>
|
||||
#include "proto/mp.mp.pb.h"
|
||||
#include "message_base/db_base.h"
|
||||
|
||||
|
||||
struct SynMsg {
|
||||
uint64_t message_id;
|
||||
time_t time;
|
||||
};
|
||||
|
||||
struct StorageMsg {
|
||||
mp::MP_SUB_TYPE msg_type;
|
||||
mp::MP_SUB_TYPE session_type;
|
||||
int64_t message_id;
|
||||
time_t time;
|
||||
int64_t account;
|
||||
std::string im_msg_data;
|
||||
};
|
||||
|
||||
template<class T>
|
||||
class SafeQueue {
|
||||
public:
|
||||
/**
|
||||
* 压入元素
|
||||
* @param ele T 类型的
|
||||
*/
|
||||
void push(T ele) {
|
||||
mutex.lock();
|
||||
queue.push(ele);
|
||||
mutex.unlock();
|
||||
}
|
||||
|
||||
/**
|
||||
* 弹出元素 并 删除
|
||||
* @return T
|
||||
*/
|
||||
T pull() {
|
||||
std::lock_guard lockGuard(mutex);
|
||||
auto ele = queue.front();
|
||||
queue.pop();
|
||||
return ele;
|
||||
}
|
||||
|
||||
/**
|
||||
* 头部元素
|
||||
* @return T
|
||||
*/
|
||||
T front() {
|
||||
std::lock_guard lockGuard(mutex);
|
||||
return queue.front();
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询第二个元素
|
||||
* @return T
|
||||
*/
|
||||
T index_second() {
|
||||
if (1 >= queue.size()) {
|
||||
perror("查询越界\n");
|
||||
return nullptr;
|
||||
}
|
||||
std::lock_guard lockGuard(mutex);
|
||||
return *((&queue.front()) + 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询 距离 front 向右的第i个位置
|
||||
* @param i 距离 front 向右的第i个位置
|
||||
* @return T
|
||||
*/
|
||||
T index_second_n(uint32_t i) {
|
||||
if (i >= queue.size()) {
|
||||
perror("查询越界\n");
|
||||
return nullptr;
|
||||
}
|
||||
std::lock_guard lockGuard(mutex);
|
||||
return *((&queue.front()) + i);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取队列本身 not safe 的
|
||||
* @return std::queue<T>
|
||||
*/
|
||||
std::queue<T>& value() {
|
||||
return queue;
|
||||
}
|
||||
private:
|
||||
std::mutex mutex;
|
||||
std::queue<T> queue;
|
||||
};
|
||||
|
||||
|
||||
|
||||
|
||||
#endif //IM2_TIMELINE_H
|
64
MessageSystem/message_storage/Storage.cpp
Normal file
64
MessageSystem/message_storage/Storage.cpp
Normal file
@ -0,0 +1,64 @@
|
||||
//
|
||||
// Created by dongl on 23-5-29.
|
||||
//
|
||||
|
||||
#include "Storage.h"
|
||||
#include "template/MsgTemplate.h"
|
||||
#include "IMDataPacket.h"
|
||||
|
||||
#include <utility>
|
||||
#include <thread>
|
||||
|
||||
MSG::Storage::Storage(SafeQueue<StorageMsg *> *timeLine, db_base* db)
|
||||
: m_timeLine(timeLine), m_db_name(""), m_table("") {
|
||||
storage_push_lister_queue();
|
||||
}
|
||||
|
||||
MSG::Storage::Storage(std::string&& db_name, std::string&& table) : m_db_name(db_name), m_table(table) {
|
||||
m_timeLine = new SafeQueue<StorageMsg *>();
|
||||
storage_push_lister_queue();
|
||||
}
|
||||
|
||||
MSG::Storage::~Storage() {
|
||||
delete m_timeLine;
|
||||
}
|
||||
|
||||
// 储存库 push
|
||||
void MSG::Storage::push(StorageMsg* msg) {
|
||||
// 添加至队列
|
||||
m_timeLine->push(msg);
|
||||
}
|
||||
|
||||
|
||||
void MSG::Storage::storage_push_lister_queue() {
|
||||
std::function<void()> fun = [&] {
|
||||
// 取mongo链接
|
||||
auto coll = hit_db_coll(m_db_name, m_table);
|
||||
|
||||
while (true) {
|
||||
while (!m_timeLine->value().empty()) {
|
||||
/// 插入单个
|
||||
// 弹出msg队列 此cpp只负责储存库 不负责同步库
|
||||
auto msg = m_timeLine->pull();
|
||||
// 执行插入
|
||||
auto insert_one_result = coll.insert_one(MsgTemplate::session_msg(msg->msg_type, msg->session_type,
|
||||
msg->message_id, msg->time, msg->account, msg->im_msg_data));
|
||||
|
||||
auto doc_id = insert_one_result->inserted_id();
|
||||
printf("[msg insert mongo] %s\n", doc_id.type() == bsoncxx::type::k_oid ? "msg insert" : "not msg insert");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
std::thread t(fun);
|
||||
printf("%ld", t.get_id());
|
||||
t.detach();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -6,24 +6,24 @@
|
||||
#define IM2_STORAGE_H
|
||||
|
||||
|
||||
#include "db_base.h"
|
||||
#include "../TimeLine.h"
|
||||
#include "message_base/db_base.h"
|
||||
#include "message_safe/SafeQueue.h"
|
||||
|
||||
namespace MSG {
|
||||
class Storage {
|
||||
class Storage : db_base {
|
||||
public:
|
||||
Storage(TimeLine<StorageMsg *> *timeLine, db_base* db);
|
||||
Storage(SafeQueue<StorageMsg *> *timeLine, db_base* db);
|
||||
Storage(std::string&& db_name, std::string&& table);
|
||||
|
||||
virtual ~Storage();
|
||||
|
||||
public:
|
||||
void push(StorageMsg* msg);
|
||||
void pull();
|
||||
|
||||
void storage_push_queue();
|
||||
void storage_push_lister_queue();
|
||||
|
||||
private:
|
||||
db_base* m_db = nullptr;
|
||||
TimeLine<StorageMsg *> * m_timeLine = nullptr;
|
||||
SafeQueue<StorageMsg *> * m_timeLine = nullptr;
|
||||
|
||||
std::string m_db_name, m_table;
|
||||
};
|
43
MessageSystem/message_storage/Syn.cpp
Normal file
43
MessageSystem/message_storage/Syn.cpp
Normal file
@ -0,0 +1,43 @@
|
||||
//
|
||||
// Created by dongl on 23-7-1.
|
||||
//
|
||||
|
||||
#include <thread>
|
||||
#include "Syn.h"
|
||||
#include "IMDataPacket.h"
|
||||
|
||||
MSG::Syn::Syn(SafeQueue<SynMsg *> *mTimeLine) : m_timeLine(mTimeLine) {}
|
||||
|
||||
MSG::Syn::Syn() : m_timeLine(new SafeQueue<SynMsg* >) {
|
||||
|
||||
}
|
||||
|
||||
MSG::Syn::~Syn() {
|
||||
delete m_timeLine;
|
||||
}
|
||||
|
||||
void MSG::Syn::push(SynMsg* msg) {
|
||||
m_timeLine->push(msg);
|
||||
}
|
||||
|
||||
void MSG::Syn::syn_push_lister_queue(const std::function<void(std::string&& packet)>& cb) {
|
||||
std::function<void()> fun = [&] {
|
||||
while (true) {
|
||||
while (!m_timeLine->value().empty()) {
|
||||
SynMsg* msg = m_timeLine->pull();
|
||||
IMDataPacket imDataPacket(mp::MP_IM_NOTICE, msg->message_id, msg->time);
|
||||
|
||||
// 执行回调
|
||||
cb(imDataPacket.packet());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
std::thread t(fun);
|
||||
printf("[thread id] %ld\n", t.get_id());
|
||||
t.detach();
|
||||
}
|
||||
}
|
||||
|
||||
|
26
MessageSystem/message_storage/Syn.h
Normal file
26
MessageSystem/message_storage/Syn.h
Normal file
@ -0,0 +1,26 @@
|
||||
//
|
||||
// Created by dongl on 23-7-1.
|
||||
//
|
||||
|
||||
#ifndef IM2_SYN_H
|
||||
#define IM2_SYN_H
|
||||
|
||||
#include "message_safe/SafeQueue.h"
|
||||
|
||||
namespace MSG {
|
||||
class Syn : public db_base {
|
||||
public:
|
||||
explicit Syn(SafeQueue<SynMsg *> *mTimeLine);
|
||||
explicit Syn();
|
||||
virtual ~Syn();
|
||||
|
||||
public:
|
||||
void push(SynMsg* msg);
|
||||
void syn_push_lister_queue(const std::function<void(std::string&& packet)>&);
|
||||
|
||||
private:
|
||||
SafeQueue<SynMsg *>* m_timeLine = nullptr;
|
||||
};
|
||||
}
|
||||
|
||||
#endif //IM2_SYN_H
|
@ -1,54 +0,0 @@
|
||||
//
|
||||
// Created by dongl on 23-5-29.
|
||||
//
|
||||
|
||||
#include "Storage.h"
|
||||
#include "template/MsgTemplate.h"
|
||||
|
||||
#include <utility>
|
||||
#include <thread>
|
||||
|
||||
MSG::Storage::Storage(TimeLine<StorageMsg *> *timeLine, db_base* db)
|
||||
: m_timeLine(timeLine), m_db(db), m_db_name(""), m_table("") {}
|
||||
|
||||
MSG::Storage::Storage(std::string&& db_name, std::string&& table) : m_db_name(db_name), m_table(table) {
|
||||
m_timeLine = new TimeLine<StorageMsg *>();
|
||||
m_db = new db_base();
|
||||
}
|
||||
|
||||
// 储存库 push
|
||||
void MSG::Storage::push(StorageMsg* msg) {
|
||||
// 添加至信箱 同步库
|
||||
m_timeLine->push(msg);
|
||||
}
|
||||
|
||||
void MSG::Storage::pull() {
|
||||
auto coll = m_db->hit_db_coll(m_db_name, m_table);
|
||||
}
|
||||
|
||||
|
||||
void MSG::Storage::storage_push_queue() {
|
||||
// 取mongo链接
|
||||
auto coll = m_db->hit_db_coll(m_db_name, m_table);
|
||||
|
||||
std::function<void()> fun = [&] {
|
||||
while (true) {
|
||||
while (!m_timeLine->value().empty()) {
|
||||
// 弹出msg队列 此cpp只负责储存库 不负责同步库
|
||||
auto msg = m_timeLine->pull();
|
||||
// 执行插入
|
||||
coll.insert_one(MsgTemplate::session_msg(msg->msg_type, msg->session_type,
|
||||
msg->message_id, msg->time, msg->account, msg->im_msg_data));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
std::thread t(fun);
|
||||
printf("%ld", t.get_id());
|
||||
t.detach();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -1,5 +0,0 @@
|
||||
//
|
||||
// Created by dongl on 23-7-1.
|
||||
//
|
||||
|
||||
#include "Syn.h"
|
@ -1,14 +0,0 @@
|
||||
//
|
||||
// Created by dongl on 23-7-1.
|
||||
//
|
||||
|
||||
#ifndef IM2_SYN_H
|
||||
#define IM2_SYN_H
|
||||
|
||||
|
||||
class Syn {
|
||||
|
||||
};
|
||||
|
||||
|
||||
#endif //IM2_SYN_H
|
@ -9,8 +9,8 @@
|
||||
#include "works/db/UserFriendsDB.h"
|
||||
#include "works/db/UserDB.h"
|
||||
#include "smtp/send_email.h"
|
||||
#include "storage/Storage.h"
|
||||
#include "IMDataPacket.h"
|
||||
#include "message_storage/Storage.h"
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
@ -112,11 +112,6 @@ TEST(select_friends_fdb, fecth_frinds_Test) {
|
||||
}
|
||||
|
||||
|
||||
TEST(MSG_PULL, MSG_PULL__Test) {
|
||||
MSG::Storage* storage = new MSG::Storage();
|
||||
storage->pull();
|
||||
}
|
||||
|
||||
TEST(selsct_is_friend, fecth_frinds_Test) {
|
||||
auto i = UserFriendsDB();
|
||||
auto user = i.select_friend(2725096176, 783556037);
|
||||
@ -138,4 +133,40 @@ TEST (MP_IM_PUSH_MSG, MP_IM_PUSH_MSG_TEST) {
|
||||
);
|
||||
|
||||
std::cout << temp->packet() << std::endl;
|
||||
|
||||
MSG::Storage* storage = new MSG::Storage("im_session", "chat");
|
||||
|
||||
StorageMsg* msg = new StorageMsg();
|
||||
msg->msg_type = mp::MP_SUB_TYPE::MP_IM_TEXT;
|
||||
msg->session_type = mp::MP_SUB_TYPE::MP_SESSION_FRIEND;
|
||||
msg->message_id = 1;
|
||||
msg->time = time(nullptr);
|
||||
msg->account = 783556037;
|
||||
msg->im_msg_data = "hello test msg!";
|
||||
storage->push(msg);
|
||||
|
||||
sleep(10);
|
||||
}
|
||||
|
||||
TEST(queue, queue_test) {
|
||||
std::queue<int> queue;
|
||||
queue.push(1);
|
||||
queue.push(2);
|
||||
|
||||
printf("%d", *((&queue.front()) + 1));
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user