Compare commits

...

2 Commits

Author SHA1 Message Date
5b12450a56 修改了 message system 2023-07-04 14:10:13 +08:00
a764f9fcab 暂时添加了 邮箱不可重复申请帐号
后续改成 邮箱可以申请多个账号, 用邮箱登陆时选择帐号登陆
修改了 MessageSystem 结构
2023-07-04 10:31:14 +08:00
35 changed files with 543 additions and 177 deletions

View File

@ -12,5 +12,7 @@ add_library(imm_mongodb
target_link_libraries(imm_mongodb
mongocxx
# mongoc-static-1.0
bsoncxx
bson-static-1.0
)

Binary file not shown.

Binary file not shown.

View File

@ -18,20 +18,31 @@ using bsoncxx::builder::basic::make_document;
class MsgTemplate {
public:
static bsoncxx::document::view session_msg(mp::MP_SUB_TYPE msg_type, mp::MP_SUB_TYPE session_type,
/**
* mongo
* @param msg_type
* @param session_type
* @param message_id id
* @param time
* @param account
* @param msg_data
* @return make_document - mongocxx
*/
static bsoncxx::document::value session_msg(mp::MP_SUB_TYPE msg_type, mp::MP_SUB_TYPE session_type,
int64_t message_id, time_t time, int64_t account,
const std::string& msg_data) {
// 要插入的视图
auto doc_value = make_document(
kvp("msg_type", msg_type), // 200 text消息
kvp("session_type", session_type), // 300 单体会话
kvp("message_id", message_id), // 0 会话级id
kvp("date", time), // 时间
kvp("msg_id", message_id), // 0 会话级id
kvp("time", time), // 时间
kvp("account", account), // 目标/来源
kvp("msg_data", msg_data.c_str()) // 消息内容
);
return doc_value.view();
return doc_value;
}
};

View File

@ -18,11 +18,10 @@ IMDataPacket::IMDataPacket(mp::MP_TYPE type, mp::MP_SUB_TYPE subType,
}
IMDataPacket::IMDataPacket(mp::MP_TYPE type, mp::MP_SUB_TYPE subType,
IMDataPacket::IMDataPacket(mp::MP_TYPE type, /*mp::MP_SUB_TYPE subType,*/
uint64_t messageId, time_t time) :
Mph(type),
MsgData(subType, messageId, time) {
MsgData(messageId, time) {
}
std::string IMDataPacket::packet() {

View File

@ -12,13 +12,35 @@
class IMDataPacket : public Mph, MsgData {
public:
/**
*
* @param type
* @param data protobuf
*/
IMDataPacket(mp::MP_TYPE type, mp::im::msg_data* data);
/**
*
* @param type
* @param subType
* @param sessionType
* @param messageId id
* @param time
* @param account
* @param imMsgData
*/
IMDataPacket(mp::MP_TYPE type, mp::MP_SUB_TYPE subType,
mp::MP_SUB_TYPE sessionType, uint64_t messageId,
time_t time, uint64_t account, const std::string &imMsgData);
IMDataPacket(mp::MP_TYPE type, mp::MP_SUB_TYPE subType,
/**
*
* @param type
* @param messageId id
* @param time
*/
IMDataPacket(mp::MP_TYPE type, /*mp::MP_SUB_TYPE subType,*/
uint64_t messageId, time_t time);
public:

View File

@ -29,6 +29,12 @@ public:
data->set_time(time);
}
MsgData(uint64_t message_id, time_t time) {
data = new mp::im::msg_data();
data->set_message_id(message_id);
data->set_time(time);
}
virtual ~MsgData() {
delete data;
}

View File

@ -86,6 +86,8 @@ enum MP_SUB_TYPE {
MP_ADD_CHECK = 80; // /
MP_TYPE_NULL = 100; //
///***********************************************************************************///
/// 200+ IM ***********************************************************************************///

View File

@ -16,6 +16,23 @@ session::session() {
printf("timing end\n");
}
session::session(session_build_type type) {
if (type == SESSION_SUPPORT_ALL) {
printf("timing begin\n");
timing();
printf("timing end\n");
} else if (type == SESSION_SUPPORT_SESSION) {
printf("timing begin\n");
timing();
printf("timing end\n");
}
else if (type == SESSION_SUPPORT_USER) {
}
}
/// curr mem user curd user session
void session::add_user(mp::sri* sri, std::shared_ptr<agreement_request>& request) {
if (sri->subcommand() == mp::MP_LOGIN_SUCCESS) {
@ -75,6 +92,7 @@ void session::init_session(bufferevent *bev) {
}
// 给用户 添加会话信息
void session::set_session(bufferevent* bev, const std::string &session_key, const std::string &session_value) {
printf("code: %s\n", session_value.c_str());
@ -198,3 +216,5 @@ std::optional<userinfo*> session::find_user(uint64_t account) {

View File

@ -25,10 +25,23 @@ struct bev_key {
std::string value;
};
enum session_build_type {
SESSION_SUPPORT_USER = 0,
SESSION_SUPPORT_SESSION = 1,
SESSION_SUPPORT_ALL = 2,
};
class session {
public:
/**
*
*/
session();
/**
*
*/
session(session_build_type type);
public:
// 添加用户
@ -45,6 +58,7 @@ public:
bool is_user(const std::string& account);
// 是否有这个用户
bool is_user(uint64_t account);
// 初始化 session
void init_session(bufferevent* bev);
// 设置 session

View File

@ -7,6 +7,7 @@ aux_source_directory(db/po DIR_WORKS_DB_PO)
include_directories(../mmm)
include_directories(${CMAKE_SOURCE_DIR}/MDB/imm_mysqldb)
include_directories(${CMAKE_SOURCE_DIR}/MessageSystem)
add_library(works
${DIR_WORKS_CONTROLLER}

View File

@ -3,14 +3,8 @@
//
#include "IMController.h"
#include "message_push/MessageMgr.h"
void IMController::run(std::shared_ptr<agreement_request> request, std::shared_ptr<agreement_response> response) {
// msg push 储存库内 正常会话 要同步库 储存库
if (request->m_mph->mp_type() == mp::MP_IM_PUSH_MSG) {
}
// 传来 im msg 临时会话 消息包本体 要同步库
else if (request->m_mph->mp_type() == mp::MP_IM_MSG) {
}
MessageMgr();
}

View File

@ -21,9 +21,15 @@ mp::sri* PEVerifCodeService::send_email(const std::string &target_email, const s
sri_clear();
emailcode = code == "0" ? emailcode : code;
// 检查发送验证码
bool state = send_email_def(target_email, emailcode);
// 检查邮箱
auto [is_exist, user] = userDb.select_user(target_email, "email");
if (state) {
if (is_exist) {
sri->set_subcommand(mp::MP_SUB_TYPE::MP_REGISTER_FAIL);
sri->set_msg("邮箱已使用!");
} else if (state) {
sri->set_subcommand(mp::MP_SUB_TYPE::MP_CODE_SUCCESS);
sri->set_msg("验证码已发送");
} else {

View File

@ -15,6 +15,7 @@ public:
private:
std::string emailcode;
UserDB userDb;
};

View File

@ -1,17 +1,28 @@
project(MessageSystem)
aux_source_directory(message_push PUSH)
aux_source_directory(storage STORAGE)
aux_source_directory(synchronization SYN)
aux_source_directory(message_push PUSH)
aux_source_directory(message_push/online ONLINE)
aux_source_directory(message_push/offline OFFLINE)
aux_source_directory(message_storage STORAGE)
aux_source_directory(message_pull PULL)
aux_source_directory(message_safe SAFE_QUEUE)
aux_source_directory(message_base BASE)
include_directories(${CMAKE_SOURCE_DIR}/include/libevent)
include_directories(${CMAKE_SOURCE_DIR}/MP)
include_directories(${CMAKE_SOURCE_DIR}/MDB/imm_mongodb)
include_directories(${CMAKE_SOURCE_DIR}/MessageSystem)
include_directories(${CMAKE_SOURCE_DIR}/MS/mmm)
add_library(MessageSystem
${PUSH}
${ONLINE}
${OFFLINE}
${STORAGE}
${SYN}
TimeLine.cpp
${PULL}
${SAFE_QUEUE}
${BASE}
)
target_link_libraries(MessageSystem

View File

@ -1,5 +0,0 @@
//
// Created by dongl on 23-5-17.
//
#include "TimeLine.h"

View File

@ -1,55 +0,0 @@
//
// Created by dongl on 23-5-17.
//
#ifndef IM2_TIMELINE_H
#define IM2_TIMELINE_H
#include <string>
#include <queue>
#include "proto/mp.mp.pb.h"
#include "storage/db_base.h"
struct SynMsg {
uint64_t message_id;
time_t time;
};
struct StorageMsg {
mp::MP_SUB_TYPE msg_type;
mp::MP_SUB_TYPE session_type;
int64_t message_id;
time_t time;
int64_t account;
std::string im_msg_data;
};
template<class T>
class TimeLine {
public:
void push(T ele) {
mutex.lock();
queue.push(ele);
mutex.unlock();
}
T pull() {
std::lock_guard lockGuard(mutex);
auto ele = queue.front();
queue.back();
return ele;
}
std::queue<T>& value() {
return queue;
}
private:
std::mutex mutex;
std::queue<T> queue;
};
#endif //IM2_TIMELINE_H

View File

@ -3,8 +3,8 @@
//
#include "db_base.h"
#include <mongocxx/instance.hpp>
#include <mongocxx/uri.hpp>
#include "mongocxx/instance.hpp"
#include "mongocxx/uri.hpp"
db_base::db_base() {
@ -12,6 +12,10 @@ db_base::db_base() {
mongocxx::uri uri("mongodb://user_session:Aa316216@124.221.152.192:27017/?authSource=im_session");
pool = new mongocxx::pool(uri);
}
db_base::~db_base() {
delete pool;
}
mongocxx::pool::entry db_base::acquire() {
return pool->acquire();
@ -33,3 +37,4 @@ mongocxx::collection db_base::hit_db_coll(const std::string& db_name, const std:

View File

@ -5,10 +5,10 @@
#ifndef IM2_DB_BASE_H
#define IM2_DB_BASE_H
#include <mongocxx/pool.hpp>
#include <mongocxx/stdx.hpp>
#include <bsoncxx/stdx/optional.hpp>
#include <mongocxx/client.hpp>
#include "mongocxx/pool.hpp"
#include "mongocxx/stdx.hpp"
#include "bsoncxx/stdx/optional.hpp"
#include "mongocxx/client.hpp"
using bsoncxx::builder::basic::kvp;
using bsoncxx::builder::basic::make_array;
@ -17,6 +17,9 @@ using bsoncxx::builder::basic::make_document;
class db_base {
public:
db_base();
virtual ~db_base();
mongocxx::pool::entry acquire();
bsoncxx::stdx::optional<mongocxx::pool::entry> try_acquire();
@ -24,7 +27,7 @@ public:
// 命中 库 表
mongocxx::collection hit_db_coll(const std::string& db_name, const std::string& coll);
private:
protected:
mongocxx::pool* pool = nullptr;
};

View File

@ -0,0 +1,43 @@
//
// Created by dongl on 23-7-4.
//
#include "MessageMgr.h"
#include "message_push/online/OnlineMessageMgr.h"
#include "message_push/offline/OfflineMessageMgr.h"
#include "IMDataPacket.h"
session* MessageMgr::session = new class session(SESSION_SUPPORT_USER);
MessageMgr::~MessageMgr() {
delete session;
}
void MessageMgr::receive(const std::string &packet) {
}
void MessageMgr::send(const std::string &packet) {
}
// 接收
void MessageMgr::receive(const agreement_request& request) {
// 查看用户是否在线
bool is_exist = session->is_user(request.m_body.account());
if (is_exist) {
OnlineMessageMgr();
} else {
OfflineMessageMgr();
}
}
// 发送
void MessageMgr::send(const agreement_response& response) {
IMDataPacket imDataPacket(mp::MP_TYPE::MP_IM_NOTICE, response.m_notice->time(), (time_t)response.m_notice->message_id());
std::string packet = imDataPacket.packet();
bufferevent_write(response.m_bev, packet.c_str(), packet.size());
}

View File

@ -0,0 +1,45 @@
//
// Created by dongl on 23-7-4.
//
#ifndef IM2_MESSAGEMGR_H
#define IM2_MESSAGEMGR_H
#include <string>
#include "agreement.h"
#include "session.h"
class MessageMgr {
public:
virtual ~MessageMgr();
public:
/**
*
* @param request const std::string
*/
static void receive(const std::string& packet);
/**
*
* @param response const std::string
*/
static void send(const std::string& packet);
/**
*
* @param request agreement_request
*/
static void receive(const agreement_request& request);
/**
*
* @param response agreement_response
*/
static void send(const agreement_response& response);
private:
static class session* session;
};
#endif //IM2_MESSAGEMGR_H

View File

@ -0,0 +1,5 @@
//
// Created by dongl on 23-7-4.
//
#include "OfflineMessageMgr.h"

View File

@ -0,0 +1,14 @@
//
// Created by dongl on 23-7-4.
//
#ifndef IM2_OFFLINEMESSAGEMGR_H
#define IM2_OFFLINEMESSAGEMGR_H
class OfflineMessageMgr {
};
#endif //IM2_OFFLINEMESSAGEMGR_H

View File

@ -0,0 +1,5 @@
//
// Created by dongl on 23-7-4.
//
#include "OnlineMessageMgr.h"

View File

@ -0,0 +1,17 @@
//
// Created by dongl on 23-7-4.
//
#ifndef IM2_ONLINEMESSAGEMGR_H
#define IM2_ONLINEMESSAGEMGR_H
class OnlineMessageMgr {
public:
private:
};
#endif //IM2_ONLINEMESSAGEMGR_H

View File

@ -0,0 +1,6 @@
//
// Created by dongl on 23-5-17.
//
#include "SafeQueue.h"
#include <utility>

View File

@ -0,0 +1,103 @@
//
// Created by dongl on 23-5-17.
//
#ifndef IM2_TIMELINE_H
#define IM2_SAFEQUEUE_H
#include <string>
#include <queue>
#include "proto/mp.mp.pb.h"
#include "message_base/db_base.h"
struct SynMsg {
uint64_t message_id;
time_t time;
};
struct StorageMsg {
mp::MP_SUB_TYPE msg_type;
mp::MP_SUB_TYPE session_type;
int64_t message_id;
time_t time;
int64_t account;
std::string im_msg_data;
};
template<class T>
class SafeQueue {
public:
/**
*
* @param ele T
*/
void push(T ele) {
mutex.lock();
queue.push(ele);
mutex.unlock();
}
/**
*
* @return T
*/
T pull() {
std::lock_guard lockGuard(mutex);
auto ele = queue.front();
queue.pop();
return ele;
}
/**
*
* @return T
*/
T front() {
std::lock_guard lockGuard(mutex);
return queue.front();
}
/**
*
* @return T
*/
T index_second() {
if (1 >= queue.size()) {
perror("查询越界\n");
return nullptr;
}
std::lock_guard lockGuard(mutex);
return *((&queue.front()) + 1);
}
/**
* front i个位置
* @param i front i个位置
* @return T
*/
T index_second_n(uint32_t i) {
if (i >= queue.size()) {
perror("查询越界\n");
return nullptr;
}
std::lock_guard lockGuard(mutex);
return *((&queue.front()) + i);
}
/**
* not safe
* @return std::queue<T>
*/
std::queue<T>& value() {
return queue;
}
private:
std::mutex mutex;
std::queue<T> queue;
};
#endif //IM2_TIMELINE_H

View File

@ -0,0 +1,64 @@
//
// Created by dongl on 23-5-29.
//
#include "Storage.h"
#include "template/MsgTemplate.h"
#include "IMDataPacket.h"
#include <utility>
#include <thread>
MSG::Storage::Storage(SafeQueue<StorageMsg *> *timeLine, db_base* db)
: m_timeLine(timeLine), m_db_name(""), m_table("") {
storage_push_lister_queue();
}
MSG::Storage::Storage(std::string&& db_name, std::string&& table) : m_db_name(db_name), m_table(table) {
m_timeLine = new SafeQueue<StorageMsg *>();
storage_push_lister_queue();
}
MSG::Storage::~Storage() {
delete m_timeLine;
}
// 储存库 push
void MSG::Storage::push(StorageMsg* msg) {
// 添加至队列
m_timeLine->push(msg);
}
void MSG::Storage::storage_push_lister_queue() {
std::function<void()> fun = [&] {
// 取mongo链接
auto coll = hit_db_coll(m_db_name, m_table);
while (true) {
while (!m_timeLine->value().empty()) {
/// 插入单个
// 弹出msg队列 此cpp只负责储存库 不负责同步库
auto msg = m_timeLine->pull();
// 执行插入
auto insert_one_result = coll.insert_one(MsgTemplate::session_msg(msg->msg_type, msg->session_type,
msg->message_id, msg->time, msg->account, msg->im_msg_data));
auto doc_id = insert_one_result->inserted_id();
printf("[msg insert mongo] %s\n", doc_id.type() == bsoncxx::type::k_oid ? "msg insert" : "not msg insert");
}
}
};
for (int i = 0; i < 2; ++i) {
std::thread t(fun);
printf("%ld", t.get_id());
t.detach();
}
}

View File

@ -6,24 +6,24 @@
#define IM2_STORAGE_H
#include "db_base.h"
#include "../TimeLine.h"
#include "message_base/db_base.h"
#include "message_safe/SafeQueue.h"
namespace MSG {
class Storage {
class Storage : db_base {
public:
Storage(TimeLine<StorageMsg *> *timeLine, db_base* db);
Storage(SafeQueue<StorageMsg *> *timeLine, db_base* db);
Storage(std::string&& db_name, std::string&& table);
virtual ~Storage();
public:
void push(StorageMsg* msg);
void pull();
void storage_push_queue();
void storage_push_lister_queue();
private:
db_base* m_db = nullptr;
TimeLine<StorageMsg *> * m_timeLine = nullptr;
SafeQueue<StorageMsg *> * m_timeLine = nullptr;
std::string m_db_name, m_table;
};

View File

@ -0,0 +1,43 @@
//
// Created by dongl on 23-7-1.
//
#include <thread>
#include "Syn.h"
#include "IMDataPacket.h"
MSG::Syn::Syn(SafeQueue<SynMsg *> *mTimeLine) : m_timeLine(mTimeLine) {}
MSG::Syn::Syn() : m_timeLine(new SafeQueue<SynMsg* >) {
}
MSG::Syn::~Syn() {
delete m_timeLine;
}
void MSG::Syn::push(SynMsg* msg) {
m_timeLine->push(msg);
}
void MSG::Syn::syn_push_lister_queue(const std::function<void(std::string&& packet)>& cb) {
std::function<void()> fun = [&] {
while (true) {
while (!m_timeLine->value().empty()) {
SynMsg* msg = m_timeLine->pull();
IMDataPacket imDataPacket(mp::MP_IM_NOTICE, msg->message_id, msg->time);
// 执行回调
cb(imDataPacket.packet());
}
}
};
for (int i = 0; i < 2; ++i) {
std::thread t(fun);
printf("[thread id] %ld\n", t.get_id());
t.detach();
}
}

View File

@ -0,0 +1,26 @@
//
// Created by dongl on 23-7-1.
//
#ifndef IM2_SYN_H
#define IM2_SYN_H
#include "message_safe/SafeQueue.h"
namespace MSG {
class Syn : public db_base {
public:
explicit Syn(SafeQueue<SynMsg *> *mTimeLine);
explicit Syn();
virtual ~Syn();
public:
void push(SynMsg* msg);
void syn_push_lister_queue(const std::function<void(std::string&& packet)>&);
private:
SafeQueue<SynMsg *>* m_timeLine = nullptr;
};
}
#endif //IM2_SYN_H

View File

@ -1,54 +0,0 @@
//
// Created by dongl on 23-5-29.
//
#include "Storage.h"
#include "template/MsgTemplate.h"
#include <utility>
#include <thread>
MSG::Storage::Storage(TimeLine<StorageMsg *> *timeLine, db_base* db)
: m_timeLine(timeLine), m_db(db), m_db_name(""), m_table("") {}
MSG::Storage::Storage(std::string&& db_name, std::string&& table) : m_db_name(db_name), m_table(table) {
m_timeLine = new TimeLine<StorageMsg *>();
m_db = new db_base();
}
// 储存库 push
void MSG::Storage::push(StorageMsg* msg) {
// 添加至信箱 同步库
m_timeLine->push(msg);
}
void MSG::Storage::pull() {
auto coll = m_db->hit_db_coll(m_db_name, m_table);
}
void MSG::Storage::storage_push_queue() {
// 取mongo链接
auto coll = m_db->hit_db_coll(m_db_name, m_table);
std::function<void()> fun = [&] {
while (true) {
while (!m_timeLine->value().empty()) {
// 弹出msg队列 此cpp只负责储存库 不负责同步库
auto msg = m_timeLine->pull();
// 执行插入
coll.insert_one(MsgTemplate::session_msg(msg->msg_type, msg->session_type,
msg->message_id, msg->time, msg->account, msg->im_msg_data));
}
}
};
for (int i = 0; i < 2; ++i) {
std::thread t(fun);
printf("%ld", t.get_id());
t.detach();
}
}

View File

@ -1,5 +0,0 @@
//
// Created by dongl on 23-7-1.
//
#include "Syn.h"

View File

@ -1,14 +0,0 @@
//
// Created by dongl on 23-7-1.
//
#ifndef IM2_SYN_H
#define IM2_SYN_H
class Syn {
};
#endif //IM2_SYN_H

View File

@ -9,8 +9,8 @@
#include "works/db/UserFriendsDB.h"
#include "works/db/UserDB.h"
#include "smtp/send_email.h"
#include "storage/Storage.h"
#include "IMDataPacket.h"
#include "message_storage/Storage.h"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
@ -112,11 +112,6 @@ TEST(select_friends_fdb, fecth_frinds_Test) {
}
TEST(MSG_PULL, MSG_PULL__Test) {
MSG::Storage* storage = new MSG::Storage();
storage->pull();
}
TEST(selsct_is_friend, fecth_frinds_Test) {
auto i = UserFriendsDB();
auto user = i.select_friend(2725096176, 783556037);
@ -138,4 +133,40 @@ TEST (MP_IM_PUSH_MSG, MP_IM_PUSH_MSG_TEST) {
);
std::cout << temp->packet() << std::endl;
}
MSG::Storage* storage = new MSG::Storage("im_session", "chat");
StorageMsg* msg = new StorageMsg();
msg->msg_type = mp::MP_SUB_TYPE::MP_IM_TEXT;
msg->session_type = mp::MP_SUB_TYPE::MP_SESSION_FRIEND;
msg->message_id = 1;
msg->time = time(nullptr);
msg->account = 783556037;
msg->im_msg_data = "hello test msg!";
storage->push(msg);
sleep(10);
}
TEST(queue, queue_test) {
std::queue<int> queue;
queue.push(1);
queue.push(2);
printf("%d", *((&queue.front()) + 1));
}