添加注释

This commit is contained in:
dongl 2024-02-21 23:09:39 +08:00
parent 2861cdee5f
commit dcc9180221
14 changed files with 1422 additions and 1258 deletions

View File

@ -48,14 +48,15 @@ include_directories(/opt/homebrew/include/pango-1.0)
include_directories(/opt/homebrew/include/glib-2.0) include_directories(/opt/homebrew/include/glib-2.0)
include_directories(/opt/homebrew/include/glib-2.0/glib) include_directories(/opt/homebrew/include/glib-2.0/glib)
include_directories(/opt/homebrew/include/atk-1.0) include_directories(/opt/homebrew/include/atk-1.0)
include_directories(${JSONCPP_INCLUDE_DIRS}) #include_directories(/opt/homebrew/Cellar/ffmpeg/6.1.1_3/include)
#include_directories(${JSONCPP_INCLUDE_DIRS})
include_directories(${GTK3_INCLUDE_DIRS}) include_directories(${GTK3_INCLUDE_DIRS})
# #
include_directories(${CMAKE_SOURCE_DIR}/include) include_directories(${CMAKE_SOURCE_DIR}/include)
include_directories(${CMAKE_SOURCE_DIR}/include/absl) include_directories(${CMAKE_SOURCE_DIR}/include/absl)
include_directories(${CMAKE_SOURCE_DIR}/include/webrtc_cxx_sdk) include_directories(${CMAKE_SOURCE_DIR}/include/webrtc_cxx_sdk)
include_directories(${CMAKE_SOURCE_DIR}/include/webrtc_cxx_sdk/third_party/ffmpeg) #include_directories(${CMAKE_SOURCE_DIR}/include/webrtc_cxx_sdk/third_party/ffmpeg)
#include_directories(/Users/dongl/tools/ffmpeg/ffmpeg_build/include) #include_directories(/Users/dongl/tools/ffmpeg/ffmpeg_build/include)
include_directories(${CMAKE_SOURCE_DIR}/include/webrtc_cxx_sdk/third_party/harfbuzz-ng/src/src) include_directories(${CMAKE_SOURCE_DIR}/include/webrtc_cxx_sdk/third_party/harfbuzz-ng/src/src)
include_directories(${CMAKE_SOURCE_DIR}/include/webrtc_cxx_sdk/third_party/libyuv/include) include_directories(${CMAKE_SOURCE_DIR}/include/webrtc_cxx_sdk/third_party/libyuv/include)
@ -73,7 +74,7 @@ message( ${AVFOUNDATION_LIBRARY})
# #
link_directories(${CMAKE_SOURCE_DIR}/lib/arm_macos) link_directories(${CMAKE_SOURCE_DIR}/lib/arm_macos)
link_directories(/opt/homebrew/lib) link_directories(/opt/homebrew/lib)
link_directories(/opt/homebrew/Cellar/ffmpeg/6.0_1/lib) #link_directories(/opt/homebrew/Cellar/ffmpeg/6.1.1_3/lib)
#link_directories(/Users/dongl/tools/ffmpeg/ffmpeg_build/lib) #link_directories(/Users/dongl/tools/ffmpeg/ffmpeg_build/lib)
add_subdirectory(src/client) add_subdirectory(src/client)

View File

@ -24,7 +24,7 @@ target_link_libraries(client
PkgConfig::FFMPEG PkgConfig::FFMPEG
gtk-3 gdk-3 pangocairo-1.0 pango-1.0 harfbuzz atk-1.0 cairo-gobject cairo gdk_pixbuf-2.0 gio-2.0 gobject-2.0 glib-2.0 gtk-3 gdk-3 cairo gobject-2.0 glib-2.0
${COCOA_LIBRARY} ${COCOA_LIBRARY}
${CORE_AUDIO_LIBRARY} ${CORE_AUDIO_LIBRARY}

View File

@ -74,7 +74,9 @@ namespace {
static rtc::scoped_refptr<DummySetSessionDescriptionObserver> Create() { static rtc::scoped_refptr<DummySetSessionDescriptionObserver> Create() {
return rtc::make_ref_counted<DummySetSessionDescriptionObserver>(); return rtc::make_ref_counted<DummySetSessionDescriptionObserver>();
} }
virtual void OnSuccess() { RTC_LOG(LS_INFO) << __FUNCTION__; } virtual void OnSuccess() { RTC_LOG(LS_INFO) << __FUNCTION__; }
virtual void OnFailure(webrtc::RTCError error) { virtual void OnFailure(webrtc::RTCError error) {
RTC_LOG(LS_INFO) << __FUNCTION__ << " " << ToString(error.type()) << ": " RTC_LOG(LS_INFO) << __FUNCTION__ << " " << ToString(error.type()) << ": "
<< error.message(); << error.message();
@ -95,11 +97,11 @@ namespace {
// } // }
// int num_devices = info->NumberOfDevices(); // int num_devices = info->NumberOfDevices();
// for (int i = 0; i < num_devices; ++i) { // for (int i = 0; i < num_devices; ++i) {
capturer = absl::WrapUnique( capturer = absl::WrapUnique(
webrtc::VcmCapturer::Create(kWidth, kHeight, kFps , 0)); webrtc::VcmCapturer::Create(kWidth, kHeight, kFps, 0));
if (capturer) { if (capturer) {
return rtc::make_ref_counted<CapturerTrackSource>(std::move(capturer)); return rtc::make_ref_counted<CapturerTrackSource>(std::move(capturer));
} }
// } // }
return nullptr; return nullptr;
@ -111,9 +113,10 @@ namespace {
: VideoTrackSource(/*remote=*/false), capturer_(std::move(capturer)) {} : VideoTrackSource(/*remote=*/false), capturer_(std::move(capturer)) {}
private: private:
rtc::VideoSourceInterface<webrtc::VideoFrame>* source() override { rtc::VideoSourceInterface<webrtc::VideoFrame> *source() override {
return capturer_.get(); return capturer_.get();
} }
std::unique_ptr<webrtc::VcmCapturer> capturer_; std::unique_ptr<webrtc::VcmCapturer> capturer_;
}; };
@ -158,13 +161,13 @@ bool Conductor::InitializePeerConnection() {
webrtc::LibvpxVp8EncoderTemplateAdapter, webrtc::LibvpxVp8EncoderTemplateAdapter,
webrtc::LibvpxVp9EncoderTemplateAdapter, webrtc::LibvpxVp9EncoderTemplateAdapter,
webrtc::LibaomAv1EncoderTemplateAdapter webrtc::LibaomAv1EncoderTemplateAdapter
>>(), >>(),
std::make_unique<webrtc::VideoDecoderFactoryTemplate< std::make_unique<webrtc::VideoDecoderFactoryTemplate<
webrtc::OpenH264DecoderTemplateAdapter, webrtc::OpenH264DecoderTemplateAdapter,
webrtc::LibvpxVp8DecoderTemplateAdapter, webrtc::LibvpxVp8DecoderTemplateAdapter,
webrtc::LibvpxVp9DecoderTemplateAdapter, webrtc::LibvpxVp9DecoderTemplateAdapter,
webrtc::Dav1dDecoderTemplateAdapter webrtc::Dav1dDecoderTemplateAdapter
>>(), >>(),
nullptr /* audio_mixer */, nullptr /* audio_processing */); nullptr /* audio_mixer */, nullptr /* audio_processing */);
if (!peer_connection_factory_) { if (!peer_connection_factory_) {

View File

@ -25,6 +25,7 @@
#include "rtc_base/win32.h" #include "rtc_base/win32.h"
#endif // WEBRTC_WIN #endif // WEBRTC_WIN
// 抽象回调
class MainWndCallback { class MainWndCallback {
public: public:
// 通知登陆信令服务器 // 通知登陆信令服务器
@ -37,7 +38,7 @@ public:
virtual void DisconnectFromCurrentPeer() = 0; virtual void DisconnectFromCurrentPeer() = 0;
// 自定义消息处理函数 // 自定义消息处理函数
virtual void UIThreadCallback(int msg_id, void *data) = 0; virtual void UIThreadCallback(int msg_id, void *data) = 0;
// 关闭conductor // 关闭
virtual void Close() = 0; virtual void Close() = 0;
protected: protected:
@ -45,41 +46,32 @@ protected:
}; };
// Pure virtual interface for the main window. // Pure virtual interface for the main window.
// 主窗口基类
class MainWindow { class MainWindow {
public: public:
virtual ~MainWindow() {} virtual ~MainWindow() {}
// 当前的UI界面
enum UI { enum UI {
CONNECT_TO_SERVER, CONNECT_TO_SERVER, // 链接至服务器页面
LIST_PEERS, LIST_PEERS, // PEER双端通讯列表的页面
STREAMING, STREAMING, // 流媒体对话的页面
}; };
// 注册抽象回调函数集结构体
virtual void RegisterObserver(MainWndCallback *callback) = 0; virtual void RegisterObserver(MainWndCallback *callback) = 0;
// 窗口是否有效?
virtual bool IsWindow() = 0; virtual bool IsWindow() = 0;
virtual void MessageBox(const char *caption, virtual void MessageBox(const char *caption,const char *text,bool is_error) = 0;
const char *text,
bool is_error) = 0;
virtual UI current_ui() = 0; virtual UI current_ui() = 0;
virtual void SwitchToConnectUI() = 0; virtual void SwitchToConnectUI() = 0;
virtual void SwitchToPeerList(const Peers &peers) = 0; virtual void SwitchToPeerList(const Peers &peers) = 0;
virtual void SwitchToStreamingUI() = 0; virtual void SwitchToStreamingUI() = 0;
virtual void StartLocalRenderer(webrtc::VideoTrackInterface *local_video) = 0; virtual void StartLocalRenderer(webrtc::VideoTrackInterface *local_video) = 0;
virtual void StopLocalRenderer() = 0; virtual void StopLocalRenderer() = 0;
virtual void StartRemoteRenderer(webrtc::VideoTrackInterface *remote_video) = 0;
virtual void StartRemoteRenderer(
webrtc::VideoTrackInterface *remote_video) = 0;
virtual void StopRemoteRenderer() = 0; virtual void StopRemoteRenderer() = 0;
virtual void QueueUIThreadCallback(int msg_id, void *data) = 0; virtual void QueueUIThreadCallback(int msg_id, void *data) = 0;
}; };

View File

@ -19,471 +19,481 @@
namespace { namespace {
// This is our magical hangup signal. // This is our magical hangup signal.
constexpr char kByeMessage[] = "BYE"; constexpr char kByeMessage[] = "BYE";
// Delay between server connection retries, in milliseconds // Delay between server connection retries, in milliseconds
constexpr webrtc::TimeDelta kReconnectDelay = webrtc::TimeDelta::Seconds(2); constexpr webrtc::TimeDelta kReconnectDelay = webrtc::TimeDelta::Seconds(2);
rtc::Socket* CreateClientSocket(int family) { rtc::Socket *CreateClientSocket(int family) {
rtc::Thread* thread = rtc::Thread::Current(); rtc::Thread *thread = rtc::Thread::Current();
RTC_DCHECK(thread != NULL); RTC_DCHECK(thread != NULL);
return thread->socketserver()->CreateSocket(family, SOCK_STREAM); return thread->socketserver()->CreateSocket(family, SOCK_STREAM);
} }
} // namespace } // namespace
PeerConnectionClient::PeerConnectionClient() PeerConnectionClient::PeerConnectionClient()
: callback_(NULL), resolver_(NULL), state_(NOT_CONNECTED), my_id_(-1) {} : callback_(NULL), resolver_(NULL), state_(NOT_CONNECTED), my_id_(-1) {}
PeerConnectionClient::~PeerConnectionClient() = default; PeerConnectionClient::~PeerConnectionClient() = default;
void PeerConnectionClient::InitSocketSignals() { void PeerConnectionClient::InitSocketSignals() {
RTC_DCHECK(control_socket_.get() != NULL); RTC_DCHECK(control_socket_.get() != NULL);
RTC_DCHECK(hanging_get_.get() != NULL); RTC_DCHECK(hanging_get_.get() != NULL);
control_socket_->SignalCloseEvent.connect(this, control_socket_->SignalCloseEvent.connect(this,
&PeerConnectionClient::OnClose); &PeerConnectionClient::OnClose);
hanging_get_->SignalCloseEvent.connect(this, &PeerConnectionClient::OnClose); hanging_get_->SignalCloseEvent.connect(this, &PeerConnectionClient::OnClose);
control_socket_->SignalConnectEvent.connect(this, control_socket_->SignalConnectEvent.connect(this,
&PeerConnectionClient::OnConnect); &PeerConnectionClient::OnConnect);
hanging_get_->SignalConnectEvent.connect( hanging_get_->SignalConnectEvent.connect(
this, &PeerConnectionClient::OnHangingGetConnect); this, &PeerConnectionClient::OnHangingGetConnect);
control_socket_->SignalReadEvent.connect(this, &PeerConnectionClient::OnRead); control_socket_->SignalReadEvent.connect(this, &PeerConnectionClient::OnRead);
hanging_get_->SignalReadEvent.connect( hanging_get_->SignalReadEvent.connect(
this, &PeerConnectionClient::OnHangingGetRead); this, &PeerConnectionClient::OnHangingGetRead);
} }
int PeerConnectionClient::id() const { int PeerConnectionClient::id() const {
return my_id_; return my_id_;
} }
bool PeerConnectionClient::is_connected() const { bool PeerConnectionClient::is_connected() const {
return my_id_ != -1; return my_id_ != -1;
} }
const Peers& PeerConnectionClient::peers() const { const Peers &PeerConnectionClient::peers() const {
return peers_; return peers_;
} }
void PeerConnectionClient::RegisterObserver( void PeerConnectionClient::RegisterObserver(
PeerConnectionClientObserver* callback) { PeerConnectionClientObserver *callback) {
RTC_DCHECK(!callback_); RTC_DCHECK(!callback_);
callback_ = callback; callback_ = callback;
} }
void PeerConnectionClient::Connect(const std::string& server, // 链接服务器
void PeerConnectionClient::Connect(const std::string &server,
int port, int port,
const std::string& client_name) { const std::string &client_name) {
RTC_DCHECK(!server.empty()); RTC_DCHECK(!server.empty());
RTC_DCHECK(!client_name.empty()); RTC_DCHECK(!client_name.empty());
if (state_ != NOT_CONNECTED) { if (state_ != NOT_CONNECTED) {
RTC_LOG(LS_WARNING) RTC_LOG(LS_WARNING)
// 在调用 Connect() 之前,客户端不得连接
<< "The client must not be connected before you can call Connect()"; << "The client must not be connected before you can call Connect()";
callback_->OnServerConnectionFailure(); callback_->OnServerConnectionFailure();
return; return;
} }
if (server.empty() || client_name.empty()) { if (server.empty() || client_name.empty()) {
callback_->OnServerConnectionFailure(); callback_->OnServerConnectionFailure(); // 链接失败
return; return;
} }
if (port <= 0) if (port <= 0)
port = kDefaultServerPort; port = kDefaultServerPort; // 默认值8888
server_address_.SetIP(server); // 设置地址端口客户端名称
server_address_.SetPort(port); server_address_.SetIP(server);
client_name_ = client_name; server_address_.SetPort(port);
client_name_ = client_name;
if (server_address_.IsUnresolvedIP()) { // ip 是否正确 未解析ip
state_ = RESOLVING; if (server_address_.IsUnresolvedIP()) {
resolver_ = new rtc::AsyncResolver(); state_ = RESOLVING; // 状态解决?
resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult); resolver_ = new rtc::AsyncResolver();
resolver_->Start(server_address_); // SignalDone 当地址解析过程完成时会触发此信号。
} else { // 解析结果 类指针 成员函数指针
DoConnect(); resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult);
} // AsyncResolver 将执行异步 DNS 解析,并在上发送结果信号
resolver_->Start(server_address_);
} else {
DoConnect();
}
} }
void PeerConnectionClient::OnResolveResult( void PeerConnectionClient::OnResolveResult(rtc::AsyncResolverInterface *resolver) {
rtc::AsyncResolverInterface* resolver) { if (resolver_->GetError() != 0) {
if (resolver_->GetError() != 0) { callback_->OnServerConnectionFailure(); // 链接失败
callback_->OnServerConnectionFailure(); resolver_->Destroy(false);
resolver_->Destroy(false); resolver_ = nullptr;
resolver_ = NULL; state_ = NOT_CONNECTED; // 状态未链接
state_ = NOT_CONNECTED; } else {
} else { // 这里有重复检验了一下 因为进入此函数的先决条件是为解析ip
server_address_ = resolver_->address(); server_address_ = resolver_->address();
DoConnect(); DoConnect();
} }
} }
// 执行链接 链接时 做链接 进行链接 真正执行链接的函数
void PeerConnectionClient::DoConnect() { void PeerConnectionClient::DoConnect() {
control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family())); control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family())); hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
InitSocketSignals(); // 初始化套接字信号
char buffer[1024]; InitSocketSignals();
snprintf(buffer, sizeof(buffer), "GET /sign_in?%s HTTP/1.0\r\n\r\n",
client_name_.c_str());
onconnect_data_ = buffer;
bool ret = ConnectControlSocket(); char buffer[1024];
if (ret) snprintf(buffer, sizeof(buffer), "GET /sign_in?%s HTTP/1.0\r\n\r\n",
state_ = SIGNING_IN; client_name_.c_str());
if (!ret) { onconnect_data_ = buffer;
callback_->OnServerConnectionFailure();
} bool ret = ConnectControlSocket();
if (ret)
state_ = SIGNING_IN;
if (!ret) {
callback_->OnServerConnectionFailure();
}
} }
bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) { bool PeerConnectionClient::SendToPeer(int peer_id, const std::string &message) {
if (state_ != CONNECTED) if (state_ != CONNECTED)
return false; return false;
RTC_DCHECK(is_connected()); RTC_DCHECK(is_connected());
RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED); RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
if (!is_connected() || peer_id == -1) if (!is_connected() || peer_id == -1)
return false; return false;
char headers[1024]; char headers[1024];
snprintf(headers, sizeof(headers), snprintf(headers, sizeof(headers),
"POST /message?peer_id=%i&to=%i HTTP/1.0\r\n" "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
"Content-Length: %zu\r\n" "Content-Length: %zu\r\n"
"Content-Type: text/plain\r\n" "Content-Type: text/plain\r\n"
"\r\n", "\r\n",
my_id_, peer_id, message.length()); my_id_, peer_id, message.length());
onconnect_data_ = headers; onconnect_data_ = headers;
onconnect_data_ += message; onconnect_data_ += message;
return ConnectControlSocket(); return ConnectControlSocket();
} }
bool PeerConnectionClient::SendHangUp(int peer_id) { bool PeerConnectionClient::SendHangUp(int peer_id) {
return SendToPeer(peer_id, kByeMessage); return SendToPeer(peer_id, kByeMessage);
} }
bool PeerConnectionClient::IsSendingMessage() { bool PeerConnectionClient::IsSendingMessage() {
return state_ == CONNECTED && return state_ == CONNECTED &&
control_socket_->GetState() != rtc::Socket::CS_CLOSED; control_socket_->GetState() != rtc::Socket::CS_CLOSED;
} }
bool PeerConnectionClient::SignOut() { bool PeerConnectionClient::SignOut() {
if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT) if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
return true; return true;
if (hanging_get_->GetState() != rtc::Socket::CS_CLOSED) if (hanging_get_->GetState() != rtc::Socket::CS_CLOSED)
hanging_get_->Close(); hanging_get_->Close();
if (control_socket_->GetState() == rtc::Socket::CS_CLOSED) { if (control_socket_->GetState() == rtc::Socket::CS_CLOSED) {
state_ = SIGNING_OUT; state_ = SIGNING_OUT;
if (my_id_ != -1) { if (my_id_ != -1) {
char buffer[1024]; char buffer[1024];
snprintf(buffer, sizeof(buffer), snprintf(buffer, sizeof(buffer),
"GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_); "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
onconnect_data_ = buffer; onconnect_data_ = buffer;
return ConnectControlSocket(); return ConnectControlSocket();
} else {
// Can occur if the app is closed before we finish connecting.
return true;
}
} else { } else {
// Can occur if the app is closed before we finish connecting. state_ = SIGNING_OUT_WAITING;
return true;
} }
} else {
state_ = SIGNING_OUT_WAITING;
}
return true; return true;
} }
void PeerConnectionClient::Close() { void PeerConnectionClient::Close() {
control_socket_->Close(); control_socket_->Close();
hanging_get_->Close(); hanging_get_->Close();
onconnect_data_.clear(); onconnect_data_.clear();
peers_.clear(); peers_.clear();
if (resolver_ != NULL) { if (resolver_ != NULL) {
resolver_->Destroy(false); resolver_->Destroy(false);
resolver_ = NULL; resolver_ = NULL;
} }
my_id_ = -1; my_id_ = -1;
state_ = NOT_CONNECTED; state_ = NOT_CONNECTED;
} }
bool PeerConnectionClient::ConnectControlSocket() { bool PeerConnectionClient::ConnectControlSocket() {
RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED); RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
int err = control_socket_->Connect(server_address_); int err = control_socket_->Connect(server_address_);
if (err == SOCKET_ERROR) { if (err == SOCKET_ERROR) {
Close(); Close();
return false; return false;
} }
return true; return true;
} }
void PeerConnectionClient::OnConnect(rtc::Socket* socket) { void PeerConnectionClient::OnConnect(rtc::Socket *socket) {
RTC_DCHECK(!onconnect_data_.empty()); RTC_DCHECK(!onconnect_data_.empty());
size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length()); size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
RTC_DCHECK(sent == onconnect_data_.length()); RTC_DCHECK(sent == onconnect_data_.length());
onconnect_data_.clear(); onconnect_data_.clear();
} }
void PeerConnectionClient::OnHangingGetConnect(rtc::Socket* socket) { void PeerConnectionClient::OnHangingGetConnect(rtc::Socket *socket) {
char buffer[1024]; char buffer[1024];
snprintf(buffer, sizeof(buffer), "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", snprintf(buffer, sizeof(buffer), "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n",
my_id_); my_id_);
int len = static_cast<int>(strlen(buffer)); int len = static_cast<int>(strlen(buffer));
int sent = socket->Send(buffer, len); int sent = socket->Send(buffer, len);
RTC_DCHECK(sent == len); RTC_DCHECK(sent == len);
} }
void PeerConnectionClient::OnMessageFromPeer(int peer_id, void PeerConnectionClient::OnMessageFromPeer(int peer_id,
const std::string& message) { const std::string &message) {
if (message.length() == (sizeof(kByeMessage) - 1) && if (message.length() == (sizeof(kByeMessage) - 1) &&
message.compare(kByeMessage) == 0) { message.compare(kByeMessage) == 0) {
callback_->OnPeerDisconnected(peer_id); callback_->OnPeerDisconnected(peer_id);
} else {
callback_->OnMessageFromPeer(peer_id, message);
}
}
bool PeerConnectionClient::GetHeaderValue(const std::string& data,
size_t eoh,
const char* header_pattern,
size_t* value) {
RTC_DCHECK(value != NULL);
size_t found = data.find(header_pattern);
if (found != std::string::npos && found < eoh) {
*value = atoi(&data[found + strlen(header_pattern)]);
return true;
}
return false;
}
bool PeerConnectionClient::GetHeaderValue(const std::string& data,
size_t eoh,
const char* header_pattern,
std::string* value) {
RTC_DCHECK(value != NULL);
size_t found = data.find(header_pattern);
if (found != std::string::npos && found < eoh) {
size_t begin = found + strlen(header_pattern);
size_t end = data.find("\r\n", begin);
if (end == std::string::npos)
end = eoh;
value->assign(data.substr(begin, end - begin));
return true;
}
return false;
}
bool PeerConnectionClient::ReadIntoBuffer(rtc::Socket* socket,
std::string* data,
size_t* content_length) {
char buffer[0xffff];
do {
int bytes = socket->Recv(buffer, sizeof(buffer), nullptr);
if (bytes <= 0)
break;
data->append(buffer, bytes);
} while (true);
bool ret = false;
size_t i = data->find("\r\n\r\n");
if (i != std::string::npos) {
RTC_LOG(LS_INFO) << "Headers received";
if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
size_t total_response_size = (i + 4) + *content_length;
if (data->length() >= total_response_size) {
ret = true;
std::string should_close;
const char kConnection[] = "\r\nConnection: ";
if (GetHeaderValue(*data, i, kConnection, &should_close) &&
should_close.compare("close") == 0) {
socket->Close();
// Since we closed the socket, there was no notification delivered
// to us. Compensate by letting ourselves know.
OnClose(socket, 0);
}
} else {
// We haven't received everything. Just continue to accept data.
}
} else { } else {
RTC_LOG(LS_ERROR) << "No content length field specified by the server."; callback_->OnMessageFromPeer(peer_id, message);
} }
}
return ret;
} }
void PeerConnectionClient::OnRead(rtc::Socket* socket) { bool PeerConnectionClient::GetHeaderValue(const std::string &data,
size_t content_length = 0; size_t eoh,
if (ReadIntoBuffer(socket, &control_data_, &content_length)) { const char *header_pattern,
size_t peer_id = 0, eoh = 0; size_t *value) {
bool ok = RTC_DCHECK(value != NULL);
ParseServerResponse(control_data_, content_length, &peer_id, &eoh); size_t found = data.find(header_pattern);
if (ok) { if (found != std::string::npos && found < eoh) {
if (my_id_ == -1) { *value = atoi(&data[found + strlen(header_pattern)]);
// First response. Let's store our server assigned ID. return true;
RTC_DCHECK(state_ == SIGNING_IN); }
my_id_ = static_cast<int>(peer_id); return false;
RTC_DCHECK(my_id_ != -1); }
// The body of the response will be a list of already connected peers. bool PeerConnectionClient::GetHeaderValue(const std::string &data,
if (content_length) { size_t eoh,
size_t pos = eoh + 4; const char *header_pattern,
while (pos < control_data_.size()) { std::string *value) {
size_t eol = control_data_.find('\n', pos); RTC_DCHECK(value != NULL);
if (eol == std::string::npos) size_t found = data.find(header_pattern);
break; if (found != std::string::npos && found < eoh) {
int id = 0; size_t begin = found + strlen(header_pattern);
std::string name; size_t end = data.find("\r\n", begin);
bool connected; if (end == std::string::npos)
if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id, end = eoh;
&connected) && value->assign(data.substr(begin, end - begin));
id != my_id_) { return true;
peers_[id] = name; }
callback_->OnPeerConnected(id, name); return false;
}
bool PeerConnectionClient::ReadIntoBuffer(rtc::Socket *socket,
std::string *data,
size_t *content_length) {
char buffer[0xffff];
do {
int bytes = socket->Recv(buffer, sizeof(buffer), nullptr);
if (bytes <= 0)
break;
data->append(buffer, bytes);
} while (true);
bool ret = false;
size_t i = data->find("\r\n\r\n");
if (i != std::string::npos) {
RTC_LOG(LS_INFO) << "Headers received";
if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
size_t total_response_size = (i + 4) + *content_length;
if (data->length() >= total_response_size) {
ret = true;
std::string should_close;
const char kConnection[] = "\r\nConnection: ";
if (GetHeaderValue(*data, i, kConnection, &should_close) &&
should_close.compare("close") == 0) {
socket->Close();
// Since we closed the socket, there was no notification delivered
// to us. Compensate by letting ourselves know.
OnClose(socket, 0);
}
} else {
// We haven't received everything. Just continue to accept data.
} }
pos = eol + 1; } else {
} RTC_LOG(LS_ERROR) << "No content length field specified by the server.";
} }
RTC_DCHECK(is_connected()); }
callback_->OnSignedIn(); return ret;
} else if (state_ == SIGNING_OUT) { }
void PeerConnectionClient::OnRead(rtc::Socket *socket) {
size_t content_length = 0;
if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
size_t peer_id = 0, eoh = 0;
bool ok =
ParseServerResponse(control_data_, content_length, &peer_id, &eoh);
if (ok) {
if (my_id_ == -1) {
// First response. Let's store our server assigned ID.
RTC_DCHECK(state_ == SIGNING_IN);
my_id_ = static_cast<int>(peer_id);
RTC_DCHECK(my_id_ != -1);
// The body of the response will be a list of already connected peers.
if (content_length) {
size_t pos = eoh + 4;
while (pos < control_data_.size()) {
size_t eol = control_data_.find('\n', pos);
if (eol == std::string::npos)
break;
int id = 0;
std::string name;
bool connected;
if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
&connected) &&
id != my_id_) {
peers_[id] = name;
callback_->OnPeerConnected(id, name);
}
pos = eol + 1;
}
}
RTC_DCHECK(is_connected());
callback_->OnSignedIn();
} else if (state_ == SIGNING_OUT) {
Close();
callback_->OnDisconnected();
} else if (state_ == SIGNING_OUT_WAITING) {
SignOut();
}
}
control_data_.clear();
if (state_ == SIGNING_IN) {
RTC_DCHECK(hanging_get_->GetState() == rtc::Socket::CS_CLOSED);
state_ = CONNECTED;
hanging_get_->Connect(server_address_);
}
}
}
void PeerConnectionClient::OnHangingGetRead(rtc::Socket *socket) {
RTC_LOG(LS_INFO) << __FUNCTION__;
size_t content_length = 0;
if (ReadIntoBuffer(socket, &notification_data_, &content_length)) {
size_t peer_id = 0, eoh = 0;
bool ok =
ParseServerResponse(notification_data_, content_length, &peer_id, &eoh);
if (ok) {
// Store the position where the body begins.
size_t pos = eoh + 4;
if (my_id_ == static_cast<int>(peer_id)) {
// A notification about a new member or a member that just
// disconnected.
int id = 0;
std::string name;
bool connected = false;
if (ParseEntry(notification_data_.substr(pos), &name, &id,
&connected)) {
if (connected) {
peers_[id] = name;
callback_->OnPeerConnected(id, name);
} else {
peers_.erase(id);
callback_->OnPeerDisconnected(id);
}
}
} else {
OnMessageFromPeer(static_cast<int>(peer_id),
notification_data_.substr(pos));
}
}
notification_data_.clear();
}
if (hanging_get_->GetState() == rtc::Socket::CS_CLOSED &&
state_ == CONNECTED) {
hanging_get_->Connect(server_address_);
}
}
bool PeerConnectionClient::ParseEntry(const std::string &entry,
std::string *name,
int *id,
bool *connected) {
RTC_DCHECK(name != NULL);
RTC_DCHECK(id != NULL);
RTC_DCHECK(connected != NULL);
RTC_DCHECK(!entry.empty());
*connected = false;
size_t separator = entry.find(',');
if (separator != std::string::npos) {
*id = atoi(&entry[separator + 1]);
name->assign(entry.substr(0, separator));
separator = entry.find(',', separator + 1);
if (separator != std::string::npos) {
*connected = atoi(&entry[separator + 1]) ? true : false;
}
}
return !name->empty();
}
int PeerConnectionClient::GetResponseStatus(const std::string &response) {
int status = -1;
size_t pos = response.find(' ');
if (pos != std::string::npos)
status = atoi(&response[pos + 1]);
return status;
}
bool PeerConnectionClient::ParseServerResponse(const std::string &response,
size_t content_length,
size_t *peer_id,
size_t *eoh) {
int status = GetResponseStatus(response.c_str());
if (status != 200) {
RTC_LOG(LS_ERROR) << "Received error from server";
Close(); Close();
callback_->OnDisconnected(); callback_->OnDisconnected();
} else if (state_ == SIGNING_OUT_WAITING) { return false;
SignOut();
}
} }
control_data_.clear(); *eoh = response.find("\r\n\r\n");
RTC_DCHECK(*eoh != std::string::npos);
if (*eoh == std::string::npos)
return false;
if (state_ == SIGNING_IN) { *peer_id = -1;
RTC_DCHECK(hanging_get_->GetState() == rtc::Socket::CS_CLOSED);
state_ = CONNECTED; // See comment in peer_channel.cc for why we use the Pragma header.
hanging_get_->Connect(server_address_); GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
}
} return true;
} }
void PeerConnectionClient::OnHangingGetRead(rtc::Socket* socket) { void PeerConnectionClient::OnClose(rtc::Socket *socket, int err) {
RTC_LOG(LS_INFO) << __FUNCTION__; RTC_LOG(LS_INFO) << __FUNCTION__;
size_t content_length = 0;
if (ReadIntoBuffer(socket, &notification_data_, &content_length)) {
size_t peer_id = 0, eoh = 0;
bool ok =
ParseServerResponse(notification_data_, content_length, &peer_id, &eoh);
if (ok) { socket->Close();
// Store the position where the body begins.
size_t pos = eoh + 4;
if (my_id_ == static_cast<int>(peer_id)) {
// A notification about a new member or a member that just
// disconnected.
int id = 0;
std::string name;
bool connected = false;
if (ParseEntry(notification_data_.substr(pos), &name, &id,
&connected)) {
if (connected) {
peers_[id] = name;
callback_->OnPeerConnected(id, name);
} else {
peers_.erase(id);
callback_->OnPeerDisconnected(id);
}
}
} else {
OnMessageFromPeer(static_cast<int>(peer_id),
notification_data_.substr(pos));
}
}
notification_data_.clear();
}
if (hanging_get_->GetState() == rtc::Socket::CS_CLOSED &&
state_ == CONNECTED) {
hanging_get_->Connect(server_address_);
}
}
bool PeerConnectionClient::ParseEntry(const std::string& entry,
std::string* name,
int* id,
bool* connected) {
RTC_DCHECK(name != NULL);
RTC_DCHECK(id != NULL);
RTC_DCHECK(connected != NULL);
RTC_DCHECK(!entry.empty());
*connected = false;
size_t separator = entry.find(',');
if (separator != std::string::npos) {
*id = atoi(&entry[separator + 1]);
name->assign(entry.substr(0, separator));
separator = entry.find(',', separator + 1);
if (separator != std::string::npos) {
*connected = atoi(&entry[separator + 1]) ? true : false;
}
}
return !name->empty();
}
int PeerConnectionClient::GetResponseStatus(const std::string& response) {
int status = -1;
size_t pos = response.find(' ');
if (pos != std::string::npos)
status = atoi(&response[pos + 1]);
return status;
}
bool PeerConnectionClient::ParseServerResponse(const std::string& response,
size_t content_length,
size_t* peer_id,
size_t* eoh) {
int status = GetResponseStatus(response.c_str());
if (status != 200) {
RTC_LOG(LS_ERROR) << "Received error from server";
Close();
callback_->OnDisconnected();
return false;
}
*eoh = response.find("\r\n\r\n");
RTC_DCHECK(*eoh != std::string::npos);
if (*eoh == std::string::npos)
return false;
*peer_id = -1;
// See comment in peer_channel.cc for why we use the Pragma header.
GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
return true;
}
void PeerConnectionClient::OnClose(rtc::Socket* socket, int err) {
RTC_LOG(LS_INFO) << __FUNCTION__;
socket->Close();
#ifdef WIN32 #ifdef WIN32
if (err != WSAECONNREFUSED) { if (err != WSAECONNREFUSED) {
#else #else
if (err != ECONNREFUSED) { if (err != ECONNREFUSED) {
#endif #endif
if (socket == hanging_get_.get()) { if (socket == hanging_get_.get()) {
if (state_ == CONNECTED) { if (state_ == CONNECTED) {
hanging_get_->Close(); hanging_get_->Close();
hanging_get_->Connect(server_address_); hanging_get_->Connect(server_address_);
} }
} else {
callback_->OnMessageSent(err);
}
} else { } else {
callback_->OnMessageSent(err); if (socket == control_socket_.get()) {
RTC_LOG(LS_WARNING) << "Connection refused; retrying in 2 seconds";
rtc::Thread::Current()->PostDelayedTask(
SafeTask(safety_.flag(), [this] { DoConnect(); }), kReconnectDelay);
} else {
Close();
callback_->OnDisconnected();
}
} }
} else {
if (socket == control_socket_.get()) {
RTC_LOG(LS_WARNING) << "Connection refused; retrying in 2 seconds";
rtc::Thread::Current()->PostDelayedTask(
SafeTask(safety_.flag(), [this] { DoConnect(); }), kReconnectDelay);
} else {
Close();
callback_->OnDisconnected();
}
}
} }

View File

@ -38,12 +38,12 @@ struct PeerConnectionClientObserver {
class PeerConnectionClient : public sigslot::has_slots<> { class PeerConnectionClient : public sigslot::has_slots<> {
public: public:
enum State { enum State {
NOT_CONNECTED, NOT_CONNECTED, // 未连接
RESOLVING, RESOLVING, // 解决
SIGNING_IN, SIGNING_IN, // 登录中
CONNECTED, CONNECTED, // 连接的
SIGNING_OUT_WAITING, SIGNING_OUT_WAITING, // 退出等待
SIGNING_OUT, SIGNING_OUT, // 退出
}; };
PeerConnectionClient(); PeerConnectionClient();

View File

@ -17,10 +17,10 @@
#include <glib.h> #include <glib.h>
#include <gobject/gclosure.h> #include <gobject/gclosure.h>
#include <gtk/gtk.h> #include <gtk/gtk.h>
#include <stddef.h> #include <cstddef>
#include <stdio.h> #include <cstdio>
#include <stdlib.h> #include <cstdlib>
#include <string.h> #include <cstring>
#include <cstdint> #include <cstdint>
#include <map> #include <map>
@ -42,9 +42,7 @@ namespace {
// GtkMainWnd instance. // GtkMainWnd instance.
// //
gboolean OnDestroyedCallback(GtkWidget *widget, gboolean OnDestroyedCallback(GtkWidget *widget, GdkEvent *event, gpointer data) {
GdkEvent *event,
gpointer data) {
reinterpret_cast<GtkMainWnd *>(data)->OnDestroyed(widget, event); reinterpret_cast<GtkMainWnd *>(data)->OnDestroyed(widget, event);
return FALSE; return FALSE;
} }
@ -54,6 +52,9 @@ namespace {
} }
gboolean SimulateButtonClick(gpointer button) { gboolean SimulateButtonClick(gpointer button) {
// g_signal_emit_by_name 是 GLib 库中用于通过信号名称发出信号的函数。
// 在 GTK 编程中,通过 g_signal_emit_by_name 可以模拟发出特定信号,触发与之关联的信号处理函数。
// 模拟触发模拟按钮点击
g_signal_emit_by_name(button, "clicked"); g_signal_emit_by_name(button, "clicked");
return false; return false;
} }
@ -77,7 +78,7 @@ namespace {
GtkTreeModel *model = gtk_tree_view_get_model(tree_view); GtkTreeModel *model = gtk_tree_view_get_model(tree_view);
// "if iter is NULL, then the number of toplevel nodes is returned." // "if iter is NULL, then the number of toplevel nodes is returned."
int rows = gtk_tree_model_iter_n_children(model, NULL); int rows = gtk_tree_model_iter_n_children(model, nullptr);
GtkTreePath *lastpath = gtk_tree_path_new_from_indices(rows - 1, -1); GtkTreePath *lastpath = gtk_tree_path_new_from_indices(rows - 1, -1);
// Select the last item in the list // Select the last item in the list
@ -93,7 +94,8 @@ namespace {
return false; return false;
} }
// Creates a tree view, that we use to display the list of peers. // Creates a tree view, that we use to display the list of peers.
// 创建一个树视图,我们用它来显示对等点列表。
void InitializeList(GtkWidget *list) { void InitializeList(GtkWidget *list) {
GtkCellRenderer *renderer = gtk_cell_renderer_text_new(); GtkCellRenderer *renderer = gtk_cell_renderer_text_new();
GtkTreeViewColumn *column = gtk_tree_view_column_new_with_attributes( GtkTreeViewColumn *column = gtk_tree_view_column_new_with_attributes(
@ -104,7 +106,7 @@ namespace {
g_object_unref(store); g_object_unref(store);
} }
// Adds an entry to a tree view. // Adds an entry to a tree view.
void AddToList(GtkWidget *list, const gchar *str, int value) { void AddToList(GtkWidget *list, const gchar *str, int value) {
GtkListStore *store = GtkListStore *store =
GTK_LIST_STORE(gtk_tree_view_get_model(GTK_TREE_VIEW(list))); GTK_LIST_STORE(gtk_tree_view_get_model(GTK_TREE_VIEW(list)));
@ -125,17 +127,20 @@ namespace {
gboolean HandleUIThreadCallback(gpointer data) { gboolean HandleUIThreadCallback(gpointer data) {
UIThreadCallbackData *cb_data = reinterpret_cast<UIThreadCallbackData *>(data); UIThreadCallbackData *cb_data = reinterpret_cast<UIThreadCallbackData *>(data);
// 自定义的消息处理纯虚函数
cb_data->callback->UIThreadCallback(cb_data->msg_id, cb_data->data); cb_data->callback->UIThreadCallback(cb_data->msg_id, cb_data->data);
delete cb_data; delete cb_data;
return false; return false;
} }
// 重新绘制
gboolean Redraw(gpointer data) { gboolean Redraw(gpointer data) {
GtkMainWnd *wnd = reinterpret_cast<GtkMainWnd *>(data); GtkMainWnd *wnd = reinterpret_cast<GtkMainWnd *>(data);
wnd->OnRedraw(); wnd->OnRedraw();
return false; return false;
} }
// 绘制
gboolean Draw(GtkWidget *widget, cairo_t *cr, gpointer data) { gboolean Draw(GtkWidget *widget, cairo_t *cr, gpointer data) {
GtkMainWnd *wnd = reinterpret_cast<GtkMainWnd *>(data); GtkMainWnd *wnd = reinterpret_cast<GtkMainWnd *>(data);
wnd->Draw(widget, cr); wnd->Draw(widget, cr);
@ -152,16 +157,17 @@ GtkMainWnd::GtkMainWnd(const char *server,
int port, int port,
bool autoconnect, bool autoconnect,
bool autocall) bool autocall)
: window_(NULL), : window_(nullptr),
draw_area_(NULL), draw_area_(nullptr),
vbox_(NULL), vbox_(nullptr),
server_edit_(NULL), server_edit_(nullptr),
port_edit_(NULL), port_edit_(nullptr),
peer_list_(NULL), peer_list_(nullptr),
callback_(NULL), callback_(nullptr),
server_(server), server_(server),
autoconnect_(autoconnect), autoconnect_(autoconnect),
autocall_(autocall) { autocall_(autocall) {
char buffer[10]; char buffer[10];
snprintf(buffer, sizeof(buffer), "%i", port); snprintf(buffer, sizeof(buffer), "%i", port);
port_ = buffer; port_ = buffer;
@ -176,70 +182,92 @@ void GtkMainWnd::RegisterObserver(MainWndCallback *callback) {
} }
bool GtkMainWnd::IsWindow() { bool GtkMainWnd::IsWindow() {
return window_ != NULL && GTK_IS_WINDOW(window_); return window_ != nullptr && GTK_IS_WINDOW(window_);
} }
void GtkMainWnd::MessageBox(const char *caption, void GtkMainWnd::MessageBox(const char *caption, const char *text, bool is_error) {
const char *text,
bool is_error) {
GtkWidget *dialog = gtk_message_dialog_new( GtkWidget *dialog = gtk_message_dialog_new(
GTK_WINDOW(window_), GTK_DIALOG_DESTROY_WITH_PARENT, GTK_WINDOW(window_), GTK_DIALOG_DESTROY_WITH_PARENT,
is_error ? GTK_MESSAGE_ERROR : GTK_MESSAGE_INFO, GTK_BUTTONS_CLOSE, "%s", is_error ? GTK_MESSAGE_ERROR : GTK_MESSAGE_INFO, GTK_BUTTONS_CLOSE, "%s",
text); text);
// "hhhhhhhh");
gtk_window_set_title(GTK_WINDOW(dialog), caption); gtk_window_set_title(GTK_WINDOW(dialog), caption);
gtk_dialog_run(GTK_DIALOG(dialog)); gtk_dialog_run(GTK_DIALOG(dialog));
gtk_widget_destroy(dialog); gtk_widget_destroy(dialog);
} }
// 查看当前的界面
MainWindow::UI GtkMainWnd::current_ui() { MainWindow::UI GtkMainWnd::current_ui() {
if (vbox_) if (vbox_)
return CONNECT_TO_SERVER; return UI::CONNECT_TO_SERVER;
if (peer_list_) if (peer_list_)
return LIST_PEERS; return UI::LIST_PEERS;
return STREAMING; return UI::STREAMING;
} }
// 开始本地绘制
// VideoTrackInterface 是 WebRTC 中的一个接口,它表示一个视频轨道。在实时通信中,可以使用该接口来传输视频数据。
void GtkMainWnd::StartLocalRenderer(webrtc::VideoTrackInterface *local_video) { void GtkMainWnd::StartLocalRenderer(webrtc::VideoTrackInterface *local_video) {
// 重新初始化 或者是赋值 一个渲染绘制类
local_renderer_.reset(new VideoRenderer(this, local_video)); local_renderer_.reset(new VideoRenderer(this, local_video));
} }
// 通知本地绘制
void GtkMainWnd::StopLocalRenderer() { void GtkMainWnd::StopLocalRenderer() {
local_renderer_.reset(); local_renderer_.reset();
} }
// 开始远程绘制
void GtkMainWnd::StartRemoteRenderer( void GtkMainWnd::StartRemoteRenderer(
webrtc::VideoTrackInterface *remote_video) { webrtc::VideoTrackInterface *remote_video) {
remote_renderer_.reset(new VideoRenderer(this, remote_video)); remote_renderer_.reset(new VideoRenderer(this, remote_video));
} }
// 停止远程绘制
void GtkMainWnd::StopRemoteRenderer() { void GtkMainWnd::StopRemoteRenderer() {
remote_renderer_.reset(); remote_renderer_.reset();
} }
// 队列形式的UI线程内的回调
void GtkMainWnd::QueueUIThreadCallback(int msg_id, void *data) { void GtkMainWnd::QueueUIThreadCallback(int msg_id, void *data) {
g_idle_add(HandleUIThreadCallback, // g_idle_add 是 GLib 库中的一个函数用于在主事件循环main loop中添加一个空闲回调函数。
new UIThreadCallbackData(callback_, msg_id, data)); // 当调用 g_idle_add 函数时它会创建一个空闲源idle source并将空闲回调函数注册到主事件循环中。
// 当主事件循环处于空闲状态时,空闲源会触发并调用注册的回调函数。
// 使用 g_idle_add 函数可以方便地在主事件循环的空闲时执行任务或更新界面,以提高程序的响应性。
// 只声明 定义不在这里
g_idle_add( // 回调处理函数
HandleUIThreadCallback,
// 回调类 封装了一层 添加了消息id 与 数据
new UIThreadCallbackData(callback_, msg_id, data)
);
} }
// 创建GUI窗口/窗体/GTK部件
bool GtkMainWnd::Create() { bool GtkMainWnd::Create() {
RTC_DCHECK(window_ == NULL); RTC_DCHECK(window_ == nullptr);
// GTK_WINDOW_TOPLEVEL 类型,表示创建一个顶层窗口。顶层窗口是一个独立的窗口框架,通常作为应用程序的主窗口。
window_ = gtk_window_new(GTK_WINDOW_TOPLEVEL); window_ = gtk_window_new(GTK_WINDOW_TOPLEVEL);
if (window_) { if (window_) {
// 设置窗口位置
gtk_window_set_position(GTK_WINDOW(window_), GTK_WIN_POS_CENTER); gtk_window_set_position(GTK_WINDOW(window_), GTK_WIN_POS_CENTER);
// 默认大小
gtk_window_set_default_size(GTK_WINDOW(window_), 640, 480); gtk_window_set_default_size(GTK_WINDOW(window_), 640, 480);
// 窗口标题
gtk_window_set_title(GTK_WINDOW(window_), "PeerConnection client"); gtk_window_set_title(GTK_WINDOW(window_), "PeerConnection client");
g_signal_connect(G_OBJECT(window_), "delete-event", // 可以将特定信号与回调函数连接起来,从而实现事件驱动的编程模型
G_CALLBACK(&OnDestroyedCallback), this); g_signal_connect(G_OBJECT(window_), "delete-event", // 要连接信号的对象实例、信号名称
G_CALLBACK(&OnDestroyedCallback), this); // 回调函数,即信号发出时要执行的函数、传递给回调函数的数据
g_signal_connect(window_, "key-press-event", G_CALLBACK(OnKeyPressCallback), g_signal_connect(window_, "key-press-event", G_CALLBACK(OnKeyPressCallback),
this); this);
// 切换到连接用户界面
SwitchToConnectUI(); SwitchToConnectUI();
} }
return window_ != NULL; return window_ != nullptr;
} }
bool GtkMainWnd::Destroy() { bool GtkMainWnd::Destroy() {
@ -248,83 +276,118 @@ bool GtkMainWnd::Destroy() {
// 销毁释放 一个 GTK+ 窗口部件widget及其所有子部件。 // 销毁释放 一个 GTK+ 窗口部件widget及其所有子部件。
gtk_widget_destroy(window_); gtk_widget_destroy(window_);
window_ = NULL; window_ = nullptr;
return true; return true;
} }
// 切换到连接用户界面
void GtkMainWnd::SwitchToConnectUI() { void GtkMainWnd::SwitchToConnectUI() {
RTC_LOG(LS_INFO) << __FUNCTION__; RTC_LOG(LS_INFO) << __FUNCTION__;
RTC_DCHECK(IsWindow()); RTC_DCHECK(IsWindow());
RTC_DCHECK(vbox_ == NULL); RTC_DCHECK(vbox_ == nullptr);
// 设置 GTK 容器(如窗口、框架等)的边框宽度
gtk_container_set_border_width(GTK_CONTAINER(window_), 10); gtk_container_set_border_width(GTK_CONTAINER(window_), 10);
// peer_list 待通话页面
if (peer_list_) { if (peer_list_) {
// 销毁一个小部件。当一个小部件被销毁时,它在其他对象上持有的所有引用都将被释放:
gtk_widget_destroy(peer_list_); gtk_widget_destroy(peer_list_);
peer_list_ = NULL; peer_list_ = nullptr;
} }
// 这行代码创建了一个垂直方向的 GTK 容器box并设置了 5 像素的间距。
vbox_ = gtk_box_new(GTK_ORIENTATION_VERTICAL, 5); vbox_ = gtk_box_new(GTK_ORIENTATION_VERTICAL, 5);
// 这个和上一行好像都是控件容器 最终依次添加到window_窗口中
// 应该是布局管理容器 在布局中的位置的管理
GtkWidget *valign = gtk_alignment_new(0, 1, 0, 0); GtkWidget *valign = gtk_alignment_new(0, 1, 0, 0);
gtk_container_add(GTK_CONTAINER(vbox_), valign); gtk_container_add(GTK_CONTAINER(vbox_), valign);
gtk_container_add(GTK_CONTAINER(window_), vbox_); gtk_container_add(GTK_CONTAINER(window_), vbox_);
// 创建了一个水平容器
GtkWidget *hbox = gtk_box_new(GTK_ORIENTATION_HORIZONTAL, 5); GtkWidget *hbox = gtk_box_new(GTK_ORIENTATION_HORIZONTAL, 5);
// 这回才是独立的子控件 一个label文字标签
GtkWidget *label = gtk_label_new("Server"); GtkWidget *label = gtk_label_new("Server");
gtk_container_add(GTK_CONTAINER(hbox), label); gtk_container_add(GTK_CONTAINER(hbox), label);
// 文本输入控件 服务器地址
server_edit_ = gtk_entry_new(); server_edit_ = gtk_entry_new();
// 设置输入框的初始值 这个初始值在构造本类GtkMainWnd时就初始化了
gtk_entry_set_text(GTK_ENTRY(server_edit_), server_.c_str()); gtk_entry_set_text(GTK_ENTRY(server_edit_), server_.c_str());
// 设置控件最小尺寸的函数
gtk_widget_set_size_request(server_edit_, 400, 30); gtk_widget_set_size_request(server_edit_, 400, 30);
gtk_container_add(GTK_CONTAINER(hbox), server_edit_); gtk_container_add(GTK_CONTAINER(hbox), server_edit_);
// 文本输入控件 端口号
port_edit_ = gtk_entry_new(); port_edit_ = gtk_entry_new();
gtk_entry_set_text(GTK_ENTRY(port_edit_), port_.c_str()); gtk_entry_set_text(GTK_ENTRY(port_edit_), port_.c_str());
gtk_widget_set_size_request(port_edit_, 70, 30); gtk_widget_set_size_request(port_edit_, 70, 30);
gtk_container_add(GTK_CONTAINER(hbox), port_edit_); gtk_container_add(GTK_CONTAINER(hbox), port_edit_);
// 创建一个带有包含给定文本的子部件的小GtkButton部件 按钮部件
GtkWidget *button = gtk_button_new_with_label("Connect"); GtkWidget *button = gtk_button_new_with_label("Connect");
gtk_widget_set_size_request(button, 70, 30); gtk_widget_set_size_request(button, 70, 30);
// 设置绑定事件
// GLib 库中用于连接信号和信号处理函数的函数。在 GTK 编程中,
// 通过 g_signal_connect 可以将特定的信号与相应的信号处理函数关联起来,以实现用户交互和事件处理。
g_signal_connect(button, "clicked", G_CALLBACK(OnClickedCallback), this); g_signal_connect(button, "clicked", G_CALLBACK(OnClickedCallback), this);
gtk_container_add(GTK_CONTAINER(hbox), button); gtk_container_add(GTK_CONTAINER(hbox), button);
// end 至此水平容器子控件全部添加完毕了
//
GtkWidget *halign = gtk_alignment_new(1, 0, 0, 0); GtkWidget *halign = gtk_alignment_new(1, 0, 0, 0);
gtk_container_add(GTK_CONTAINER(halign), hbox); gtk_container_add(GTK_CONTAINER(halign), hbox);
// 最终添加到这个垂直容器
gtk_box_pack_start(GTK_BOX(vbox_), halign, FALSE, FALSE, 0); gtk_box_pack_start(GTK_BOX(vbox_), halign, FALSE, FALSE, 0);
// 显示窗口及其所有子控件
gtk_widget_show_all(window_); gtk_widget_show_all(window_);
// autoconnect_ 构造时初始化的字段
if (autoconnect_) if (autoconnect_)
// g_idle_add 是 GLib 库中用于注册空闲处理函数的函数。在 GTK 编程中,通过 g_idle_add 可以注册一个空闲处理函数,
// 当主循环处于空闲状态时该函数将被调用。g_idle_add 是 GLib 库中用于注册空闲处理函数的函数。
// 在 GTK 编程中,通过 g_idle_add 可以注册一个空闲处理函数,当主循环处于空闲状态时,该函数将被调用。
// 待触发的函数 模拟按钮点击
g_idle_add(SimulateButtonClick, button); g_idle_add(SimulateButtonClick, button);
} }
// 切换到对等列表界面 待通话界面
void GtkMainWnd::SwitchToPeerList(const Peers &peers) { void GtkMainWnd::SwitchToPeerList(const Peers &peers) {
RTC_LOG(LS_INFO) << __FUNCTION__; RTC_LOG(LS_INFO) << __FUNCTION__;
// peer_list gtk 界面是否有效
if (!peer_list_) { if (!peer_list_) {
// 设置边框宽度0
gtk_container_set_border_width(GTK_CONTAINER(window_), 0); gtk_container_set_border_width(GTK_CONTAINER(window_), 0);
if (vbox_) { if (vbox_) {
// 消逝了垂直容器还有其内的所有子控件
gtk_widget_destroy(vbox_); gtk_widget_destroy(vbox_);
vbox_ = NULL; vbox_ = nullptr;
server_edit_ = NULL; server_edit_ = nullptr;
port_edit_ = NULL; port_edit_ = nullptr;
} else if (draw_area_) { } else if (draw_area_) {
// 用于渲染视频流的绘图表面界面
gtk_widget_destroy(draw_area_); gtk_widget_destroy(draw_area_);
draw_area_ = NULL; draw_area_ = nullptr;
draw_buffer_.reset(); draw_buffer_.reset();
} }
// 初始化待通话界面控件 树形视图控件
peer_list_ = gtk_tree_view_new(); peer_list_ = gtk_tree_view_new();
g_signal_connect(peer_list_, "row-activated", // 绑定信号事件 行激活?事件
G_CALLBACK(OnRowActivatedCallback), this); g_signal_connect(peer_list_, "row-activated", G_CALLBACK(OnRowActivatedCallback), this);
// 用于设置树形视图控件 GtkTreeView 是否显示列标题
gtk_tree_view_set_headers_visible(GTK_TREE_VIEW(peer_list_), FALSE); gtk_tree_view_set_headers_visible(GTK_TREE_VIEW(peer_list_), FALSE);
// 创建一个树视图,我们用它来显示对等点列表。
InitializeList(peer_list_); InitializeList(peer_list_);
gtk_container_add(GTK_CONTAINER(window_), peer_list_); gtk_container_add(GTK_CONTAINER(window_), peer_list_);
gtk_widget_show_all(window_); gtk_widget_show_all(window_);
} else { } else {
//
GtkListStore *store = GtkListStore *store =
GTK_LIST_STORE(gtk_tree_view_get_model(GTK_TREE_VIEW(peer_list_))); GTK_LIST_STORE(gtk_tree_view_get_model(GTK_TREE_VIEW(peer_list_)));
gtk_list_store_clear(store); gtk_list_store_clear(store);
@ -338,42 +401,54 @@ void GtkMainWnd::SwitchToPeerList(const Peers &peers) {
g_idle_add(SimulateLastRowActivated, peer_list_); g_idle_add(SimulateLastRowActivated, peer_list_);
} }
// 切换到流媒体用户通话界面
void GtkMainWnd::SwitchToStreamingUI() { void GtkMainWnd::SwitchToStreamingUI() {
RTC_LOG(LS_INFO) << __FUNCTION__; RTC_LOG(LS_INFO) << __FUNCTION__;
RTC_DCHECK(draw_area_ == NULL); RTC_DCHECK(draw_area_ == nullptr);
// 设置边框
gtk_container_set_border_width(GTK_CONTAINER(window_), 0); gtk_container_set_border_width(GTK_CONTAINER(window_), 0);
// 消逝待通话界面
if (peer_list_) { if (peer_list_) {
gtk_widget_destroy(peer_list_); gtk_widget_destroy(peer_list_);
peer_list_ = NULL; peer_list_ = nullptr;
} }
// 用于创建一个空白的绘图区域控件 GtkDrawingArea。
// GtkDrawingArea 控件通常用于显示自定义绘图,比如绘制图形、图表、图像等。
draw_area_ = gtk_drawing_area_new(); draw_area_ = gtk_drawing_area_new();
gtk_container_add(GTK_CONTAINER(window_), draw_area_); gtk_container_add(GTK_CONTAINER(window_), draw_area_);
// 绑定事件开始绘制
g_signal_connect(G_OBJECT(draw_area_), "draw", G_CALLBACK(&::Draw), this); g_signal_connect(G_OBJECT(draw_area_), "draw", G_CALLBACK(&::Draw), this);
gtk_widget_show_all(window_); gtk_widget_show_all(window_);
} }
// 窗口整体消逝
void GtkMainWnd::OnDestroyed(GtkWidget *widget, GdkEvent *event) { void GtkMainWnd::OnDestroyed(GtkWidget *widget, GdkEvent *event) {
callback_->Close(); callback_->Close();
window_ = NULL; window_ = nullptr;
draw_area_ = NULL; draw_area_ = nullptr;
vbox_ = NULL; vbox_ = nullptr;
server_edit_ = NULL; server_edit_ = nullptr;
port_edit_ = NULL; port_edit_ = nullptr;
peer_list_ = NULL; peer_list_ = nullptr;
} }
//
void GtkMainWnd::OnClicked(GtkWidget *widget) { void GtkMainWnd::OnClicked(GtkWidget *widget) {
// Make the connect button insensitive, so that it cannot be clicked more than // Make the connect button insensitive, so that it cannot be clicked more than
// once. Now that the connection includes auto-retry, it should not be // once. Now that the connection includes auto-retry, it should not be
// necessary to click it more than once. // necessary to click it more than once.
// 使连接按钮不敏感,使其不能被点击超过一次。 现在连接包括自动重试,不应该需要多次点击它。
// 用于设置一个控件是否处于可用状态(即是否响应用户输入)。当控件处于可用状态时,用户可以与其交互;当控件不可用时,用户无法与其交互。
gtk_widget_set_sensitive(widget, false); gtk_widget_set_sensitive(widget, false);
server_ = gtk_entry_get_text(GTK_ENTRY(server_edit_)); server_ = gtk_entry_get_text(GTK_ENTRY(server_edit_));
port_ = gtk_entry_get_text(GTK_ENTRY(port_edit_)); port_ = gtk_entry_get_text(GTK_ENTRY(port_edit_));
int port = port_.length() ? atoi(port_.c_str()) : 0; int port = port_.length() ? atoi(port_.c_str()) : 0;
// 回调 登录
callback_->StartLogin(server_, port); callback_->StartLogin(server_, port);
} }

View File

@ -11,7 +11,7 @@
#ifndef EXAMPLES_PEERCONNECTION_CLIENT_LINUX_MAIN_WND_H_ #ifndef EXAMPLES_PEERCONNECTION_CLIENT_LINUX_MAIN_WND_H_
#define EXAMPLES_PEERCONNECTION_CLIENT_LINUX_MAIN_WND_H_ #define EXAMPLES_PEERCONNECTION_CLIENT_LINUX_MAIN_WND_H_
#include <stdint.h> #include <cstdint>
#include <memory> #include <memory>
#include <string> #include <string>
@ -23,6 +23,8 @@
#include "../main_wnd.h" #include "../main_wnd.h"
#include "../peer_connection_client.h" #include "../peer_connection_client.h"
// 主窗口的GTK实现
// Forward declarations. // Forward declarations.
typedef struct _GtkWidget GtkWidget; typedef struct _GtkWidget GtkWidget;
typedef union _GdkEvent GdkEvent; typedef union _GdkEvent GdkEvent;
@ -36,92 +38,122 @@ typedef struct _cairo cairo_t;
// This is functionally equivalent to the MainWnd class in the Windows // This is functionally equivalent to the MainWnd class in the Windows
// implementation. // implementation.
class GtkMainWnd : public MainWindow { class GtkMainWnd : public MainWindow {
public: public:
GtkMainWnd(const char* server, int port, bool autoconnect, bool autocall); GtkMainWnd(const char *server, int port, bool autoconnect, bool autocall);
~GtkMainWnd();
virtual void RegisterObserver(MainWndCallback* callback); ~GtkMainWnd() override;
virtual bool IsWindow();
virtual void SwitchToConnectUI();
virtual void SwitchToPeerList(const Peers& peers);
virtual void SwitchToStreamingUI();
virtual void MessageBox(const char* caption, const char* text, bool is_error);
virtual MainWindow::UI current_ui();
virtual void StartLocalRenderer(webrtc::VideoTrackInterface* local_video);
virtual void StopLocalRenderer();
virtual void StartRemoteRenderer(webrtc::VideoTrackInterface* remote_video);
virtual void StopRemoteRenderer();
virtual void QueueUIThreadCallback(int msg_id, void* data); // 继承的纯虚接口
/*virtual */void RegisterObserver(MainWndCallback *callback) override;
// Creates and shows the main window with the |Connect UI| enabled. /*virtual */bool IsWindow() override;
bool Create();
// Destroys the window. When the window is destroyed, it ends the /*virtual */void SwitchToConnectUI() override;
// main message loop.
bool Destroy();
// Callback for when the main window is destroyed. /*virtual */void SwitchToPeerList(const Peers &peers) override;
void OnDestroyed(GtkWidget* widget, GdkEvent* event);
// Callback for when the user clicks the "Connect" button. /*virtual */void SwitchToStreamingUI() override;
void OnClicked(GtkWidget* widget);
// Callback for keystrokes. Used to capture Esc and Return. /*virtual */void MessageBox(const char *caption, const char *text, bool is_error) override;
void OnKeyPress(GtkWidget* widget, GdkEventKey* key);
// Callback when the user double clicks a peer in order to initiate a /*virtual */MainWindow::UI current_ui() override;
// connection.
void OnRowActivated(GtkTreeView* tree_view,
GtkTreePath* path,
GtkTreeViewColumn* column);
void OnRedraw(); /*virtual */void StartLocalRenderer(webrtc::VideoTrackInterface *local_video) override;
void Draw(GtkWidget* widget, cairo_t* cr); /*virtual */void StopLocalRenderer() override;
protected: /*virtual */void StartRemoteRenderer(webrtc::VideoTrackInterface *remote_video) override;
class VideoRenderer : public rtc::VideoSinkInterface<webrtc::VideoFrame> {
public:
VideoRenderer(GtkMainWnd* main_wnd, webrtc::VideoTrackInterface* track_to_render);
virtual ~VideoRenderer();
// VideoSinkInterface implementation /*virtual */void StopRemoteRenderer() override;
void OnFrame(const webrtc::VideoFrame& frame) override;
const uint8_t* image() const { return image_.get(); } /*virtual*/ void QueueUIThreadCallback(int msg_id, void *data) override;
int width() const { return width_; }
int height() const { return height_; }
protected: /********************************************************************************************/
void SetSize(int width, int height); /** 接下来的UI操作的函数 **/
std::unique_ptr<uint8_t[]> image_; /********************************************************************************************/
int width_;
int height_;
GtkMainWnd* main_wnd_;
rtc::scoped_refptr<webrtc::VideoTrackInterface> rendered_track_;
};
protected: // Creates and shows the main window with the |Connect UI| enabled.
GtkWidget* window_; // Our main window. // 创建主窗口 (链接窗口)
GtkWidget* draw_area_; // The drawing surface for rendering video streams. bool Create();
GtkWidget* vbox_; // Container for the Connect UI.
GtkWidget* server_edit_; // Destroys the window. When the window is destroyed, it ends the main message loop.
GtkWidget* port_edit_; // 销毁窗口
GtkWidget* peer_list_; // The list of peers. bool Destroy();
MainWndCallback* callback_;
std::string server_; // Callback for when the main window is destroyed.
std::string port_; // 主窗口被销毁时的回调。
bool autoconnect_; void OnDestroyed(GtkWidget *widget, GdkEvent *event);
bool autocall_;
std::unique_ptr<VideoRenderer> local_renderer_; // Callback for when the user clicks the "Connect" button.
std::unique_ptr<VideoRenderer> remote_renderer_; // 当用户点击“连接”按钮时的回调。 链接信令服务器的回调
int width_; void OnClicked(GtkWidget *widget);
int height_;
std::unique_ptr<uint8_t[]> draw_buffer_; // Callback for keystrokes. Used to capture Esc and Return.
int draw_buffer_size_; // 按键事件回调。 用于捕获 Esc 和 Return。
void OnKeyPress(GtkWidget *widget, GdkEventKey *key);
// Callback when the user double clicks a peer in order to initiate a connection.
// 当用户双击对等点以发起连接时的回调。 点击待通信列表后webrtc对话的回调
void OnRowActivated(GtkTreeView *tree_view, GtkTreePath *path, GtkTreeViewColumn *column);
// 重画时?
void OnRedraw();
// 绘制
void Draw(GtkWidget *widget, cairo_t *cr);
/// 内部类
protected:
// 视频渲染绘制 继承webrtc的sink接收组件
// 一般来说VideoSinkInterface 接口定义了一组方法或函数,用于接收、处理和显示视频数据。
class VideoRenderer : public rtc::VideoSinkInterface<webrtc::VideoFrame> {
public:
VideoRenderer(GtkMainWnd *main_wnd, webrtc::VideoTrackInterface *track_to_render);
virtual ~VideoRenderer();
// VideoSinkInterface implementation
// 基类 VideoSinkInterface 视频接收器接口实现
void OnFrame(const webrtc::VideoFrame &frame) override;
const uint8_t *image() const { return image_.get(); }
int width() const { return width_; }
int height() const { return height_; }
protected:
// 设置渲染大小
void SetSize(int width, int height);
std::unique_ptr<uint8_t[]> image_; // 某一帧?
int width_; // 渲染的宽
int height_; // 渲染的高
GtkMainWnd *main_wnd_; // 自定义的主窗口抽象类
rtc::scoped_refptr<webrtc::VideoTrackInterface> rendered_track_; // 渲染轨道
};
protected:
/// GtkWidget 可能是部件/窗口/控件
GtkWidget *window_; // Our main window. // GTK的 主窗口部件
GtkWidget *draw_area_; // The drawing surface for rendering video streams. // 用于渲染视频流的绘图表面。
GtkWidget *vbox_; // Container for the Connect UI. // Connect UI 的容器。
GtkWidget *server_edit_; // 链接信令服务器地址的输入框控件
GtkWidget *port_edit_; // 链接信令服务器端口的输入框控件
GtkWidget *peer_list_; // The list of peers. // Peer 链接的列表 待通话的列表 此时双方已经互相交换了SDP了 协商完了已经
MainWndCallback *callback_; // 回调函数集抽象类 自定义的各种操作的回调
std::string server_; // 链接的地址
std::string port_; // 链接的端口
bool autoconnect_; // 是否链接?
bool autocall_; // 是否通话?
std::unique_ptr<GtkMainWnd::VideoRenderer> local_renderer_; // 本地渲染
std::unique_ptr<GtkMainWnd::VideoRenderer> remote_renderer_; // 远程渲染
int width_; //
int height_; //
std::unique_ptr<uint8_t[]> draw_buffer_; // 绘制缓存
int draw_buffer_size_; // 绘制缓存大小
}; };
#endif // EXAMPLES_PEERCONNECTION_CLIENT_LINUX_MAIN_WND_H_ #endif // EXAMPLES_PEERCONNECTION_CLIENT_LINUX_MAIN_WND_H_

View File

@ -13,24 +13,29 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#if defined(WEBRTC_POSIX) #if defined(WEBRTC_POSIX)
#include <unistd.h> #include <unistd.h>
#endif #endif
#include "utils.h" #include "utils.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
// 表头终止符
static const char kHeaderTerminator[] = "\r\n\r\n"; static const char kHeaderTerminator[] = "\r\n\r\n";
// 表头终止符 数组的长度
static const int kHeaderTerminatorLength = sizeof(kHeaderTerminator) - 1; static const int kHeaderTerminatorLength = sizeof(kHeaderTerminator) - 1;
// static // static 什么头 不知道???? 像是http头的格式
const char DataSocket::kCrossOriginAllowHeaders[] = const char DataSocket::kCrossOriginAllowHeaders[] =
"Access-Control-Allow-Origin: *\r\n" "Access-Control-Allow-Origin: *\r\n"
"Access-Control-Allow-Credentials: true\r\n" "Access-Control-Allow-Credentials: true\r\n"
"Access-Control-Allow-Methods: POST, GET, OPTIONS\r\n" "Access-Control-Allow-Methods: POST, GET, OPTIONS\r\n"
"Access-Control-Allow-Headers: Content-Type, " "Access-Control-Allow-Headers: Content-Type, "
"Content-Length, Connection, Cache-Control\r\n" "Content-Length, Connection, Cache-Control\r\n"
"Access-Control-Expose-Headers: Content-Length\r\n"; "Access-Control-Expose-Headers: Content-Length\r\n";
#if defined(WIN32) #if defined(WIN32)
class WinsockInitializer { class WinsockInitializer {
@ -52,16 +57,16 @@ WinsockInitializer WinsockInitializer::singleton;
// //
bool SocketBase::Create() { bool SocketBase::Create() {
RTC_DCHECK(!valid()); RTC_DCHECK(!valid()); // 检验本地监听socket是否未有效
socket_ = ::socket(AF_INET, SOCK_STREAM, 0); socket_ = ::socket(AF_INET, SOCK_STREAM, 0); // 初始化后有效了
return valid(); return valid();
} }
void SocketBase::Close() { void SocketBase::Close() {
if (socket_ != INVALID_SOCKET) { if (socket_ != INVALID_SOCKET) {
closesocket(socket_); closesocket(socket_);
socket_ = INVALID_SOCKET; socket_ = INVALID_SOCKET;
} }
} }
// //
@ -69,196 +74,196 @@ void SocketBase::Close() {
// //
std::string DataSocket::request_arguments() const { std::string DataSocket::request_arguments() const {
size_t args = request_path_.find('?'); size_t args = request_path_.find('?');
if (args != std::string::npos) if (args != std::string::npos)
return request_path_.substr(args + 1); return request_path_.substr(args + 1);
return ""; return "";
} }
bool DataSocket::PathEquals(const char* path) const { bool DataSocket::PathEquals(const char *path) const {
RTC_DCHECK(path); RTC_DCHECK(path);
size_t args = request_path_.find('?'); size_t args = request_path_.find('?');
if (args != std::string::npos) if (args != std::string::npos)
return request_path_.substr(0, args).compare(path) == 0; return request_path_.substr(0, args).compare(path) == 0;
return request_path_.compare(path) == 0; return request_path_.compare(path) == 0;
} }
bool DataSocket::OnDataAvailable(bool* close_socket) { bool DataSocket::OnDataAvailable(bool *close_socket) {
RTC_DCHECK(valid()); RTC_DCHECK(valid());
char buffer[0xfff] = {0}; char buffer[0xfff] = {0};
int bytes = recv(socket_, buffer, sizeof(buffer), 0); int bytes = recv(socket_, buffer, sizeof(buffer), 0);
if (bytes == SOCKET_ERROR || bytes == 0) { if (bytes == SOCKET_ERROR || bytes == 0) {
*close_socket = true; *close_socket = true;
return false; return false;
} }
*close_socket = false; *close_socket = false;
bool ret = true; bool ret = true;
if (headers_received()) { if (headers_received()) {
if (method_ != POST) { if (method_ != POST) {
// unexpectedly received data. // unexpectedly received data.
ret = false; ret = false;
} else {
data_.append(buffer, bytes);
}
} else { } else {
data_.append(buffer, bytes); request_headers_.append(buffer, bytes);
size_t found = request_headers_.find(kHeaderTerminator);
if (found != std::string::npos) {
data_ = request_headers_.substr(found + kHeaderTerminatorLength);
request_headers_.resize(found + kHeaderTerminatorLength);
ret = ParseHeaders();
}
} }
} else { return ret;
request_headers_.append(buffer, bytes);
size_t found = request_headers_.find(kHeaderTerminator);
if (found != std::string::npos) {
data_ = request_headers_.substr(found + kHeaderTerminatorLength);
request_headers_.resize(found + kHeaderTerminatorLength);
ret = ParseHeaders();
}
}
return ret;
} }
bool DataSocket::Send(const std::string& data) const { bool DataSocket::Send(const std::string &data) const {
return send(socket_, data.data(), static_cast<int>(data.length()), 0) != return send(socket_, data.data(), static_cast<int>(data.length()), 0) !=
SOCKET_ERROR; SOCKET_ERROR;
} }
bool DataSocket::Send(const std::string& status, bool DataSocket::Send(const std::string &status,
bool connection_close, bool connection_close,
const std::string& content_type, const std::string &content_type,
const std::string& extra_headers, const std::string &extra_headers,
const std::string& data) const { const std::string &data) const {
RTC_DCHECK(valid()); RTC_DCHECK(valid());
RTC_DCHECK(!status.empty()); RTC_DCHECK(!status.empty());
std::string buffer("HTTP/1.1 " + status + "\r\n"); std::string buffer("HTTP/1.1 " + status + "\r\n");
buffer += buffer +=
"Server: PeerConnectionTestServer/0.1\r\n" "Server: PeerConnectionTestServer/0.1\r\n"
"Cache-Control: no-cache\r\n"; "Cache-Control: no-cache\r\n";
if (connection_close) if (connection_close)
buffer += "Connection: close\r\n"; buffer += "Connection: close\r\n";
if (!content_type.empty()) if (!content_type.empty())
buffer += "Content-Type: " + content_type + "\r\n"; buffer += "Content-Type: " + content_type + "\r\n";
buffer += buffer +=
"Content-Length: " + int2str(static_cast<int>(data.size())) + "\r\n"; "Content-Length: " + int2str(static_cast<int>(data.size())) + "\r\n";
if (!extra_headers.empty()) { if (!extra_headers.empty()) {
buffer += extra_headers; buffer += extra_headers;
// Extra headers are assumed to have a separator per header. // Extra headers are assumed to have a separator per header.
} }
buffer += kCrossOriginAllowHeaders; buffer += kCrossOriginAllowHeaders;
buffer += "\r\n"; buffer += "\r\n";
buffer += data; buffer += data;
return Send(buffer); return Send(buffer);
} }
void DataSocket::Clear() { void DataSocket::Clear() {
method_ = INVALID; method_ = INVALID;
content_length_ = 0; content_length_ = 0;
content_type_.clear(); content_type_.clear();
request_path_.clear(); request_path_.clear();
request_headers_.clear(); request_headers_.clear();
data_.clear(); data_.clear();
} }
bool DataSocket::ParseHeaders() { bool DataSocket::ParseHeaders() {
RTC_DCHECK(!request_headers_.empty()); RTC_DCHECK(!request_headers_.empty());
RTC_DCHECK_EQ(method_, INVALID); RTC_DCHECK_EQ(method_, INVALID);
size_t i = request_headers_.find("\r\n"); size_t i = request_headers_.find("\r\n");
if (i == std::string::npos) if (i == std::string::npos)
return false; return false;
if (!ParseMethodAndPath(request_headers_.data(), i)) if (!ParseMethodAndPath(request_headers_.data(), i))
return false; return false;
RTC_DCHECK_NE(method_, INVALID); RTC_DCHECK_NE(method_, INVALID);
RTC_DCHECK(!request_path_.empty()); RTC_DCHECK(!request_path_.empty());
if (method_ == POST) { if (method_ == POST) {
const char* headers = request_headers_.data() + i + 2; const char *headers = request_headers_.data() + i + 2;
size_t len = request_headers_.length() - i - 2; size_t len = request_headers_.length() - i - 2;
if (!ParseContentLengthAndType(headers, len)) if (!ParseContentLengthAndType(headers, len))
return false; return false;
} }
return true; return true;
} }
bool DataSocket::ParseMethodAndPath(const char* begin, size_t len) { bool DataSocket::ParseMethodAndPath(const char *begin, size_t len) {
struct { struct {
const char* method_name; const char *method_name;
size_t method_name_len; size_t method_name_len;
RequestMethod id; RequestMethod id;
} supported_methods[] = { } supported_methods[] = {
{"GET", 3, GET}, {"GET", 3, GET},
{"POST", 4, POST}, {"POST", 4, POST},
{"OPTIONS", 7, OPTIONS}, {"OPTIONS", 7, OPTIONS},
}; };
const char* path = NULL; const char *path = NULL;
for (size_t i = 0; i < ARRAYSIZE(supported_methods); ++i) { for (size_t i = 0; i < ARRAYSIZE(supported_methods); ++i) {
if (len > supported_methods[i].method_name_len && if (len > supported_methods[i].method_name_len &&
isspace(begin[supported_methods[i].method_name_len]) && isspace(begin[supported_methods[i].method_name_len]) &&
strncmp(begin, supported_methods[i].method_name, strncmp(begin, supported_methods[i].method_name,
supported_methods[i].method_name_len) == 0) { supported_methods[i].method_name_len) == 0) {
method_ = supported_methods[i].id; method_ = supported_methods[i].id;
path = begin + supported_methods[i].method_name_len; path = begin + supported_methods[i].method_name_len;
break; break;
}
} }
}
const char* end = begin + len; const char *end = begin + len;
if (!path || path >= end) if (!path || path >= end)
return false; return false;
++path;
begin = path;
while (!isspace(*path) && path < end)
++path; ++path;
begin = path;
while (!isspace(*path) && path < end)
++path;
request_path_.assign(begin, path - begin); request_path_.assign(begin, path - begin);
return true; return true;
} }
bool DataSocket::ParseContentLengthAndType(const char* headers, size_t length) { bool DataSocket::ParseContentLengthAndType(const char *headers, size_t length) {
RTC_DCHECK_EQ(content_length_, 0); RTC_DCHECK_EQ(content_length_, 0);
RTC_DCHECK(content_type_.empty()); RTC_DCHECK(content_type_.empty());
const char* end = headers + length; const char *end = headers + length;
while (headers && headers < end) { while (headers && headers < end) {
if (!isspace(headers[0])) { if (!isspace(headers[0])) {
static const char kContentLength[] = "Content-Length:"; static const char kContentLength[] = "Content-Length:";
static const char kContentType[] = "Content-Type:"; static const char kContentType[] = "Content-Type:";
if ((headers + ARRAYSIZE(kContentLength)) < end && if ((headers + ARRAYSIZE(kContentLength)) < end &&
strncmp(headers, kContentLength, ARRAYSIZE(kContentLength) - 1) == strncmp(headers, kContentLength, ARRAYSIZE(kContentLength) - 1) ==
0) { 0) {
headers += ARRAYSIZE(kContentLength) - 1; headers += ARRAYSIZE(kContentLength) - 1;
while (headers[0] == ' ') while (headers[0] == ' ')
++headers; ++headers;
content_length_ = atoi(headers); content_length_ = atoi(headers);
} else if ((headers + ARRAYSIZE(kContentType)) < end && } else if ((headers + ARRAYSIZE(kContentType)) < end &&
strncmp(headers, kContentType, ARRAYSIZE(kContentType) - 1) == strncmp(headers, kContentType, ARRAYSIZE(kContentType) - 1) ==
0) { 0) {
headers += ARRAYSIZE(kContentType) - 1; headers += ARRAYSIZE(kContentType) - 1;
while (headers[0] == ' ') while (headers[0] == ' ')
++headers; ++headers;
const char* type_end = strstr(headers, "\r\n"); const char *type_end = strstr(headers, "\r\n");
if (type_end == NULL) if (type_end == NULL)
type_end = end; type_end = end;
content_type_.assign(headers, type_end); content_type_.assign(headers, type_end);
} }
} else { } else {
++headers; ++headers;
}
headers = strstr(headers, "\r\n");
if (headers)
headers += 2;
} }
headers = strstr(headers, "\r\n");
if (headers)
headers += 2;
}
return !content_type_.empty() && content_length_ != 0; return !content_type_.empty() && content_length_ != 0;
} }
// //
@ -266,34 +271,33 @@ bool DataSocket::ParseContentLengthAndType(const char* headers, size_t length) {
// //
bool ListeningSocket::Listen(unsigned short port) { bool ListeningSocket::Listen(unsigned short port) {
RTC_DCHECK(valid()); RTC_DCHECK(valid());
int enabled = 1; int enabled = 1;
if (setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, // SO_REUSEADDR 端口复用
reinterpret_cast<const char*>(&enabled), if (setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char *>(&enabled), sizeof(enabled)) != 0) {
sizeof(enabled)) != 0) { printf("setsockopt failed\n");
printf("setsockopt failed\n"); return false;
return false; }
} struct sockaddr_in addr = {0};
struct sockaddr_in addr = {0}; addr.sin_family = AF_INET;
addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_port = htons(port);
addr.sin_port = htons(port); if (bind(socket_, reinterpret_cast<const sockaddr *>(&addr), sizeof(addr)) ==
if (bind(socket_, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) == SOCKET_ERROR) {
SOCKET_ERROR) { printf("bind failed\n");
printf("bind failed\n"); return false;
return false; }
} return listen(socket_, 5) != SOCKET_ERROR;
return listen(socket_, 5) != SOCKET_ERROR;
} }
DataSocket* ListeningSocket::Accept() const { DataSocket *ListeningSocket::Accept() const {
RTC_DCHECK(valid()); RTC_DCHECK(valid());
struct sockaddr_in addr = {0}; struct sockaddr_in addr = {0};
socklen_t size = sizeof(addr); socklen_t size = sizeof(addr);
NativeSocket client = NativeSocket client =
accept(socket_, reinterpret_cast<sockaddr*>(&addr), &size); accept(socket_, reinterpret_cast<sockaddr *>(&addr), &size);
if (client == INVALID_SOCKET) if (client == INVALID_SOCKET)
return NULL; return NULL;
return new DataSocket(client); return new DataSocket(client);
} }

View File

@ -16,9 +16,11 @@
typedef int socklen_t; typedef int socklen_t;
typedef SOCKET NativeSocket; typedef SOCKET NativeSocket;
#else #else
#include <netinet/in.h> #include <netinet/in.h>
#include <sys/select.h> #include <sys/select.h>
#include <sys/socket.h> #include <sys/socket.h>
#define closesocket close #define closesocket close
typedef int NativeSocket; typedef int NativeSocket;
@ -34,119 +36,143 @@ typedef int NativeSocket;
#include <string> #include <string>
class SocketBase { class SocketBase {
public: public:
SocketBase() : socket_(INVALID_SOCKET) {} SocketBase() : socket_(INVALID_SOCKET) {}
explicit SocketBase(NativeSocket socket) : socket_(socket) {}
SocketBase(SocketBase& other) = delete;
SocketBase& operator=(const SocketBase& other) = delete;
~SocketBase() { Close(); }
NativeSocket socket() const { return socket_; } explicit SocketBase(NativeSocket socket) : socket_(socket) {}
bool valid() const { return socket_ != INVALID_SOCKET; }
bool Create(); SocketBase(SocketBase &other) = delete;
void Close();
protected: SocketBase &operator=(const SocketBase &other) = delete;
NativeSocket socket_;
~SocketBase() { Close(); }
NativeSocket socket() const { return socket_; }
bool valid() const { return socket_ != INVALID_SOCKET; }
bool Create();
void Close();
protected:
NativeSocket socket_;
}; };
// Represents an HTTP server socket. // Represents an HTTP server socket.
// 表示 HTTP 服务器套接字。
class DataSocket : public SocketBase { class DataSocket : public SocketBase {
public: public:
enum RequestMethod { enum RequestMethod {
INVALID, INVALID,
GET, GET,
POST, POST,
OPTIONS, OPTIONS,
}; };
explicit DataSocket(NativeSocket socket) explicit DataSocket(NativeSocket socket)
: SocketBase(socket), method_(INVALID), content_length_(0) {} : SocketBase(socket), method_(INVALID), content_length_(0) {}
~DataSocket() {} ~DataSocket() {}
static const char kCrossOriginAllowHeaders[]; static const char kCrossOriginAllowHeaders[];
bool headers_received() const { return method_ != INVALID; } // 标头是否有效 收到的标头
bool headers_received() const { return method_ != INVALID; }
RequestMethod method() const { return method_; } RequestMethod method() const { return method_; }
const std::string& request_path() const { return request_path_; } // 获取请求uri /命令?参数=值
std::string request_arguments() const; const std::string &request_path() const { return request_path_; }
const std::string& data() const { return data_; } // 请求参数 ?后面拼接的参数
std::string request_arguments() const;
const std::string& content_type() const { return content_type_; } const std::string &data() const { return data_; }
size_t content_length() const { return content_length_; } const std::string &content_type() const { return content_type_; }
bool request_received() const { size_t content_length() const { return content_length_; }
return headers_received() && (method_ != POST || data_received());
}
bool data_received() const { // 已收到请求?
return method_ != POST || data_.length() >= content_length_; bool request_received() const {
} return headers_received() && (method_ != POST || data_received());
}
// 收到的数据
bool data_received() const {
return method_ != POST || data_.length() >= content_length_;
}
// Checks if the request path (minus arguments) matches a given path. // 检查请求路径 的 命令 /xxx? ?前的uri
bool PathEquals(const char* path) const; // Checks if the request path (minus arguments) matches a given path.
bool PathEquals(const char *path) const;
// Called when we have received some data from clients. // Called when we have received some data from clients.
// Returns false if an error occurred. // Returns false if an error occurred.
bool OnDataAvailable(bool* close_socket); // 当我们从客户端收到一些数据时调用。
// 如果发生错误则返回 false。
bool OnDataAvailable(bool *close_socket);
// Send a raw buffer of bytes. // Send a raw buffer of bytes.
bool Send(const std::string& data) const; bool Send(const std::string &data) const;
// Send an HTTP response. The `status` should start with a valid HTTP // Send an HTTP response. The `status` should start with a valid HTTP
// response code, followed by a string. E.g. "200 OK". // response code, followed by a string. E.g. "200 OK".
// If `connection_close` is set to true, an extra "Connection: close" HTTP // If `connection_close` is set to true, an extra "Connection: close" HTTP
// header will be included. `content_type` is the mime content type, not // header will be included. `content_type` is the mime content type, not
// including the "Content-Type: " string. // including the "Content-Type: " string.
// `extra_headers` should be either empty or a list of headers where each // `extra_headers` should be either empty or a list of headers where each
// header terminates with "\r\n". // header terminates with "\r\n".
// `data` is the body of the message. It's length will be specified via // `data` is the body of the message. It's length will be specified via
// a "Content-Length" header. // a "Content-Length" header.
bool Send(const std::string& status, bool Send(const std::string &status,
bool connection_close, bool connection_close,
const std::string& content_type, const std::string &content_type,
const std::string& extra_headers, const std::string &extra_headers,
const std::string& data) const; const std::string &data) const;
// Clears all held state and prepares the socket for receiving a new request. // Clears all held state and prepares the socket for receiving a new request.
void Clear(); void Clear();
protected: protected:
// A fairly relaxed HTTP header parser. Parses the method, path and // A fairly relaxed HTTP header parser. Parses the method, path and
// content length (POST only) of a request. // content length (POST only) of a request.
// Returns true if a valid request was received and no errors occurred. // Returns true if a valid request was received and no errors occurred.
bool ParseHeaders(); // 一个相当宽松的 HTTP 标头解析器。解析方法、路径和请求的内容长度(仅限 POST。如果收到有效请求并且没有发生错误则返回 true。
bool ParseHeaders();
// Figures out whether the request is a GET or POST and what path is // Figures out whether the request is a GET or POST and what path is
// being requested. // being requested.
bool ParseMethodAndPath(const char* begin, size_t len); // 判断请求是 GET 还是 POST 以及路径是什么,正在被请求。
bool ParseMethodAndPath(const char *begin, size_t len);
// Determines the length of the body and it's mime type. // Determines the length of the body and it's mime type.
bool ParseContentLengthAndType(const char* headers, size_t length); // 确定正文的长度及其 mime 类型。
// MIME (Multipurpose Internet Mail Extensions) 是一种在互联网上标识文件类型的标准。
// 在 WebRTC 相关的开发中MIME 类型用于标识传输的数据的类型。
bool ParseContentLengthAndType(const char *headers, size_t length);
protected: protected:
RequestMethod method_; RequestMethod method_;
size_t content_length_; size_t content_length_;
std::string content_type_; std::string content_type_;
std::string request_path_; std::string request_path_;
std::string request_headers_; std::string request_headers_;
std::string data_; std::string data_;
}; };
// The server socket. Accepts connections and generates DataSocket instances // The server socket. Accepts connections and generates DataSocket instances
// for each new connection. // for each new connection.
//
// 服务器套接字。接受连接并生成 DataSocket 实例
// 对于每个新连接。
class ListeningSocket : public SocketBase { class ListeningSocket : public SocketBase {
public: public:
ListeningSocket() {} ListeningSocket() {}
bool Listen(unsigned short port); bool Listen(unsigned short port);
DataSocket* Accept() const;
DataSocket *Accept() const;
}; };
#endif // EXAMPLES_PEERCONNECTION_SERVER_DATA_SOCKET_H_ #endif // EXAMPLES_PEERCONNECTION_SERVER_DATA_SOCKET_H_

View File

@ -10,9 +10,13 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#if defined(WEBRTC_POSIX) #if defined(WEBRTC_POSIX)
#include <sys/select.h> #include <sys/select.h>
#endif #endif
#include <time.h> #include <time.h>
#include <string> #include <string>
@ -28,166 +32,174 @@
#include "test/field_trial.h" #include "test/field_trial.h"
ABSL_FLAG( ABSL_FLAG(
std::string, std::string,
force_fieldtrials, force_fieldtrials,
"", "",
"Field trials control experimental features. This flag specifies the field " "Field trials control experimental features. This flag specifies the field "
"trials in effect. E.g. running with " "trials in effect. E.g. running with "
"--force_fieldtrials=WebRTC-FooFeature/Enabled/ " "--force_fieldtrials=WebRTC-FooFeature/Enabled/ "
"will assign the group Enabled to field trial WebRTC-FooFeature. Multiple " "will assign the group Enabled to field trial WebRTC-FooFeature. Multiple "
"trials are separated by \"/\""); "trials are separated by \"/\"");
ABSL_FLAG(int, port, 8888, "default: 8888"); ABSL_FLAG(int, port, 8888, "default: 8888");
static const size_t kMaxConnections = (FD_SETSIZE - 2); static const size_t kMaxConnections = (FD_SETSIZE - 2);
void HandleBrowserRequest(DataSocket* ds, bool* quit) { void HandleBrowserRequest(DataSocket *ds, bool *quit) {
RTC_DCHECK(ds && ds->valid()); RTC_DCHECK(ds && ds->valid());
RTC_DCHECK(quit); RTC_DCHECK(quit);
const std::string& path = ds->request_path(); const std::string &path = ds->request_path();
*quit = (path.compare("/quit") == 0); *quit = (path.compare("/quit") == 0);
if (*quit) { if (*quit) {
ds->Send("200 OK", true, "text/html", "", ds->Send("200 OK", true, "text/html", "",
"<html><body>Quitting...</body></html>"); "<html><body>Quitting...</body></html>");
} else if (ds->method() == DataSocket::OPTIONS) { } else if (ds->method() == DataSocket::OPTIONS) {
// We'll get this when a browsers do cross-resource-sharing requests. // We'll get this when a browsers do cross-resource-sharing requests.
// The headers to allow cross-origin script support will be set inside // The headers to allow cross-origin script support will be set inside
// Send. // Send.
ds->Send("200 OK", true, "", "", ""); ds->Send("200 OK", true, "", "", "");
} else { } else {
// Here we could write some useful output back to the browser depending on // Here we could write some useful output back to the browser depending on
// the path. // the path.
printf("Received an invalid request: %s\n", ds->request_path().c_str()); printf("Received an invalid request: %s\n", ds->request_path().c_str());
ds->Send("500 Sorry", true, "text/html", "", ds->Send("500 Sorry", true, "text/html", "",
"<html><body>Sorry, not yet implemented</body></html>"); "<html><body>Sorry, not yet implemented</body></html>");
} }
} }
int main(int argc, char* argv[]) { int main(int argc, char *argv[]) {
absl::SetProgramUsageMessage( absl::SetProgramUsageMessage(
"Example usage: ./peerconnection_server --port=8888\n"); "Example usage: ./peerconnection_server --port=8888\n");
absl::ParseCommandLine(argc, argv); absl::ParseCommandLine(argc, argv);
// InitFieldTrialsFromString stores the char*, so the char array must outlive // InitFieldTrialsFromString stores the char*, so the char array must outlive
// the application. // the application.
const std::string force_field_trials = absl::GetFlag(FLAGS_force_fieldtrials); const std::string force_field_trials = absl::GetFlag(FLAGS_force_fieldtrials);
webrtc::field_trial::InitFieldTrialsFromString(force_field_trials.c_str()); webrtc::field_trial::InitFieldTrialsFromString(force_field_trials.c_str());
int port = absl::GetFlag(FLAGS_port); int port = absl::GetFlag(FLAGS_port);
// Abort if the user specifies a port that is outside the allowed // Abort if the user specifies a port that is outside the allowed
// range [1, 65535]. // range [1, 65535].
if ((port < 1) || (port > 65535)) { if ((port < 1) || (port > 65535)) {
printf("Error: %i is not a valid port.\n", port); printf("Error: %i is not a valid port.\n", port);
return -1; return -1;
} }
ListeningSocket listener; ListeningSocket listener;
if (!listener.Create()) { if (!listener.Create()) {
printf("Failed to create server socket\n"); printf("Failed to create server socket\n");
return -1; return -1;
} else if (!listener.Listen(port)) { } else if (!listener.Listen(port)) {
printf("Failed to listen on server socket\n"); printf("Failed to listen on server socket\n");
return -1; return -1;
} }
printf("Server listening on port %i\n", port); printf("Server listening on port %i\n", port);
PeerChannel clients; PeerChannel clients;
typedef std::vector<DataSocket*> SocketArray; typedef std::vector<DataSocket *> SocketArray;
SocketArray sockets; SocketArray sockets;
bool quit = false; bool quit = false;
while (!quit) { while (!quit) {
fd_set socket_set; fd_set socket_set;
FD_ZERO(&socket_set); FD_ZERO(&socket_set);
if (listener.valid()) // 监听socket放入socket集合
FD_SET(listener.socket(), &socket_set); if (listener.valid())
FD_SET(listener.socket(), &socket_set);
// 后面建立的socket链接 select 设置 fd 到 fd_array select 维护
for (SocketArray::iterator i = sockets.begin(); i != sockets.end(); ++i)
FD_SET((*i)->socket(), &socket_set);
// 超时时间
struct timeval timeout = {10, 0};
if (select(FD_SETSIZE, &socket_set, NULL, NULL, &timeout) == SOCKET_ERROR) {
printf("select failed\n");
break;
}
// 有sokcet链接就执行 客户端socket发送的信令
for (SocketArray::iterator i = sockets.begin(); i != sockets.end(); ++i) {
DataSocket *s = *i;
bool socket_done = true; // 套接字完成状态
// 查看是否维护了某个fd描述符
if (FD_ISSET(s->socket(), &socket_set)) {
// 数据可用 与 已接收到请求
if (s->OnDataAvailable(&socket_done) && s->request_received()) {
ChannelMember *member = clients.Lookup(s); // 查找是否已经包含当前DataSocket 则该用户是第一次发送信令
if (member || PeerChannel::IsPeerConnection(s)) {
if (!member) {
if (s->PathEquals("/sign_in")) {
clients.AddMember(s);
} else {
printf("No member found for: %s\n", s->request_path().c_str());
s->Send("500 Error", true, "text/plain", "",
"Peer most likely gone.");
}
} else if (member->is_wait_request(s)) { // 等待
// no need to do anything.
socket_done = false;
} else {
// 对端的 ChannelMember 连接到服务器的单个对等点。
ChannelMember *target = clients.IsTargetedRequest(s); // to= 后的目标客户端target并转发数据到对端。
if (target) {
// 将请求转发到对等点
member->ForwardRequestToPeer(s, target);
} else if (s->PathEquals("/sign_out")) {
s->Send("200 OK", true, "text/plain", "", "");
} else {
printf("Couldn't find target for request: %s\n",
s->request_path().c_str());
s->Send("500 Error", true, "text/plain", "",
"Peer most likely gone.");
}
}
} else {
HandleBrowserRequest(s, &quit);
if (quit) {
printf("Quitting...\n");
FD_CLR(listener.socket(), &socket_set);
listener.Close();
clients.CloseAll();
}
}
}
} else {
socket_done = false;
}
// 响应完消息 断开 socket 从监听的队列中删除
if (socket_done) {
printf("Disconnecting socket\n");
clients.OnClosing(s);
RTC_DCHECK(s->valid()); // Close must not have been called yet.
FD_CLR(s->socket(), &socket_set);
delete (*i);
i = sockets.erase(i);
if (i == sockets.end())
break;
}
}
clients.CheckForTimeout();
// 将新连接的DataSocket加入到sockets数组中
if (FD_ISSET(listener.socket(), &socket_set)) {
DataSocket *s = listener.Accept();
if (sockets.size() >= kMaxConnections) {
delete s; // sorry, that's all we can take. 超过链接最大限制了 拒绝掉了
printf("Connection limit reached\n");
} else {
sockets.push_back(s);
printf("New connection...\n");
}
}
}
for (SocketArray::iterator i = sockets.begin(); i != sockets.end(); ++i) for (SocketArray::iterator i = sockets.begin(); i != sockets.end(); ++i)
FD_SET((*i)->socket(), &socket_set);
struct timeval timeout = {10, 0};
if (select(FD_SETSIZE, &socket_set, NULL, NULL, &timeout) == SOCKET_ERROR) {
printf("select failed\n");
break;
}
for (SocketArray::iterator i = sockets.begin(); i != sockets.end(); ++i) {
DataSocket* s = *i;
bool socket_done = true;
if (FD_ISSET(s->socket(), &socket_set)) {
if (s->OnDataAvailable(&socket_done) && s->request_received()) {
ChannelMember* member = clients.Lookup(s);
if (member || PeerChannel::IsPeerConnection(s)) {
if (!member) {
if (s->PathEquals("/sign_in")) {
clients.AddMember(s);
} else {
printf("No member found for: %s\n", s->request_path().c_str());
s->Send("500 Error", true, "text/plain", "",
"Peer most likely gone.");
}
} else if (member->is_wait_request(s)) {
// no need to do anything.
socket_done = false;
} else {
ChannelMember* target = clients.IsTargetedRequest(s);
if (target) {
member->ForwardRequestToPeer(s, target);
} else if (s->PathEquals("/sign_out")) {
s->Send("200 OK", true, "text/plain", "", "");
} else {
printf("Couldn't find target for request: %s\n",
s->request_path().c_str());
s->Send("500 Error", true, "text/plain", "",
"Peer most likely gone.");
}
}
} else {
HandleBrowserRequest(s, &quit);
if (quit) {
printf("Quitting...\n");
FD_CLR(listener.socket(), &socket_set);
listener.Close();
clients.CloseAll();
}
}
}
} else {
socket_done = false;
}
if (socket_done) {
printf("Disconnecting socket\n");
clients.OnClosing(s);
RTC_DCHECK(s->valid()); // Close must not have been called yet.
FD_CLR(s->socket(), &socket_set);
delete (*i); delete (*i);
i = sockets.erase(i); sockets.clear();
if (i == sockets.end())
break;
}
}
clients.CheckForTimeout(); return 0;
if (FD_ISSET(listener.socket(), &socket_set)) {
DataSocket* s = listener.Accept();
if (sockets.size() >= kMaxConnections) {
delete s; // sorry, that's all we can take.
printf("Connection limit reached\n");
} else {
sockets.push_back(s);
printf("New connection...\n");
}
}
}
for (SocketArray::iterator i = sockets.begin(); i != sockets.end(); ++i)
delete (*i);
sockets.clear();
return 0;
} }

View File

@ -32,16 +32,16 @@
// at this point it is not working correctly in some popular browsers. // at this point it is not working correctly in some popular browsers.
static const char kPeerIdHeader[] = "Pragma: "; static const char kPeerIdHeader[] = "Pragma: ";
static const char* kRequestPaths[] = { static const char *kRequestPaths[] = {
"/wait", "/wait",
"/sign_out", "/sign_out",
"/message", "/message",
}; };
enum RequestPathIndex { enum RequestPathIndex {
kWait, kWait,
kSignOut, kSignOut,
kMessage, kMessage,
}; };
const size_t kMaxNameLength = 512; const size_t kMaxNameLength = 512;
@ -52,113 +52,113 @@ const size_t kMaxNameLength = 512;
int ChannelMember::s_member_id_ = 0; int ChannelMember::s_member_id_ = 0;
ChannelMember::ChannelMember(DataSocket* socket) ChannelMember::ChannelMember(DataSocket *socket)
: waiting_socket_(NULL), : waiting_socket_(NULL),
id_(++s_member_id_), id_(++s_member_id_),
connected_(true), connected_(true),
timestamp_(time(NULL)) { timestamp_(time(NULL)) {
RTC_DCHECK(socket); RTC_DCHECK(socket);
RTC_DCHECK_EQ(socket->method(), DataSocket::GET); RTC_DCHECK_EQ(socket->method(), DataSocket::GET);
RTC_DCHECK(socket->PathEquals("/sign_in")); RTC_DCHECK(socket->PathEquals("/sign_in"));
name_ = socket->request_arguments(); name_ = socket->request_arguments();
if (name_.empty()) if (name_.empty())
name_ = "peer_" + int2str(id_); name_ = "peer_" + int2str(id_);
else if (name_.length() > kMaxNameLength) else if (name_.length() > kMaxNameLength)
name_.resize(kMaxNameLength); name_.resize(kMaxNameLength);
std::replace(name_.begin(), name_.end(), ',', '_'); std::replace(name_.begin(), name_.end(), ',', '_');
} }
ChannelMember::~ChannelMember() {} ChannelMember::~ChannelMember() {}
bool ChannelMember::is_wait_request(DataSocket* ds) const { bool ChannelMember::is_wait_request(DataSocket *ds) const {
return ds && ds->PathEquals(kRequestPaths[kWait]); return ds && ds->PathEquals(kRequestPaths[kWait]);
} }
bool ChannelMember::TimedOut() { bool ChannelMember::TimedOut() {
return waiting_socket_ == NULL && (time(NULL) - timestamp_) > 30; return waiting_socket_ == NULL && (time(NULL) - timestamp_) > 30;
} }
std::string ChannelMember::GetPeerIdHeader() const { std::string ChannelMember::GetPeerIdHeader() const {
std::string ret(kPeerIdHeader + int2str(id_) + "\r\n"); std::string ret(kPeerIdHeader + int2str(id_) + "\r\n");
return ret; return ret;
} }
bool ChannelMember::NotifyOfOtherMember(const ChannelMember& other) { bool ChannelMember::NotifyOfOtherMember(const ChannelMember &other) {
RTC_DCHECK_NE(&other, this); RTC_DCHECK_NE(&other, this);
QueueResponse("200 OK", "text/plain", GetPeerIdHeader(), other.GetEntry()); QueueResponse("200 OK", "text/plain", GetPeerIdHeader(), other.GetEntry());
return true; return true;
} }
// Returns a string in the form "name,id,connected\n". // Returns a string in the form "name,id,connected\n".
std::string ChannelMember::GetEntry() const { std::string ChannelMember::GetEntry() const {
RTC_DCHECK(name_.length() <= kMaxNameLength); RTC_DCHECK(name_.length() <= kMaxNameLength);
// name, 11-digit int, 1-digit bool, newline, null // name, 11-digit int, 1-digit bool, newline, null
char entry[kMaxNameLength + 15]; char entry[kMaxNameLength + 15];
snprintf(entry, sizeof(entry), "%s,%d,%d\n", snprintf(entry, sizeof(entry), "%s,%d,%d\n",
name_.substr(0, kMaxNameLength).c_str(), id_, connected_); name_.substr(0, kMaxNameLength).c_str(), id_, connected_);
return entry; return entry;
} }
void ChannelMember::ForwardRequestToPeer(DataSocket* ds, ChannelMember* peer) { void ChannelMember::ForwardRequestToPeer(DataSocket *ds, ChannelMember *peer) {
RTC_DCHECK(peer); RTC_DCHECK(peer);
RTC_DCHECK(ds); RTC_DCHECK(ds);
std::string extra_headers(GetPeerIdHeader()); std::string extra_headers(GetPeerIdHeader());
if (peer == this) { if (peer == this) {
ds->Send("200 OK", true, ds->content_type(), extra_headers, ds->data()); ds->Send("200 OK", true, ds->content_type(), extra_headers, ds->data());
} else { } else {
printf("Client %s sending to %s\n", name_.c_str(), peer->name().c_str()); printf("Client %s sending to %s\n", name_.c_str(), peer->name().c_str());
peer->QueueResponse("200 OK", ds->content_type(), extra_headers, peer->QueueResponse("200 OK", ds->content_type(), extra_headers,
ds->data()); ds->data());
ds->Send("200 OK", true, "text/plain", "", ""); ds->Send("200 OK", true, "text/plain", "", "");
}
}
void ChannelMember::OnClosing(DataSocket* ds) {
if (ds == waiting_socket_) {
waiting_socket_ = NULL;
timestamp_ = time(NULL);
}
}
void ChannelMember::QueueResponse(const std::string& status,
const std::string& content_type,
const std::string& extra_headers,
const std::string& data) {
if (waiting_socket_) {
RTC_DCHECK(queue_.empty());
RTC_DCHECK_EQ(waiting_socket_->method(), DataSocket::GET);
bool ok =
waiting_socket_->Send(status, true, content_type, extra_headers, data);
if (!ok) {
printf("Failed to deliver data to waiting socket\n");
} }
waiting_socket_ = NULL;
timestamp_ = time(NULL);
} else {
QueuedResponse qr;
qr.status = status;
qr.content_type = content_type;
qr.extra_headers = extra_headers;
qr.data = data;
queue_.push(qr);
}
} }
void ChannelMember::SetWaitingSocket(DataSocket* ds) { void ChannelMember::OnClosing(DataSocket *ds) {
RTC_DCHECK_EQ(ds->method(), DataSocket::GET); if (ds == waiting_socket_) {
if (ds && !queue_.empty()) { waiting_socket_ = NULL;
RTC_DCHECK(!waiting_socket_); timestamp_ = time(NULL);
const QueuedResponse& response = queue_.front(); }
ds->Send(response.status, true, response.content_type, }
response.extra_headers, response.data);
queue_.pop(); void ChannelMember::QueueResponse(const std::string &status,
} else { const std::string &content_type,
waiting_socket_ = ds; const std::string &extra_headers,
} const std::string &data) {
if (waiting_socket_) {
RTC_DCHECK(queue_.empty());
RTC_DCHECK_EQ(waiting_socket_->method(), DataSocket::GET);
bool ok =
waiting_socket_->Send(status, true, content_type, extra_headers, data);
if (!ok) {
printf("Failed to deliver data to waiting socket\n");
}
waiting_socket_ = NULL;
timestamp_ = time(NULL);
} else {
QueuedResponse qr;
qr.status = status;
qr.content_type = content_type;
qr.extra_headers = extra_headers;
qr.data = data;
queue_.push(qr);
}
}
void ChannelMember::SetWaitingSocket(DataSocket *ds) {
RTC_DCHECK_EQ(ds->method(), DataSocket::GET);
if (ds && !queue_.empty()) {
RTC_DCHECK(!waiting_socket_);
const QueuedResponse &response = queue_.front();
ds->Send(response.status, true, response.content_type,
response.extra_headers, response.data);
queue_.pop();
} else {
waiting_socket_ = ds;
}
} }
// //
@ -166,195 +166,195 @@ void ChannelMember::SetWaitingSocket(DataSocket* ds) {
// //
// static // static
bool PeerChannel::IsPeerConnection(const DataSocket* ds) { bool PeerChannel::IsPeerConnection(const DataSocket *ds) {
RTC_DCHECK(ds); RTC_DCHECK(ds);
return (ds->method() == DataSocket::POST && ds->content_length() > 0) || return (ds->method() == DataSocket::POST && ds->content_length() > 0) ||
(ds->method() == DataSocket::GET && ds->PathEquals("/sign_in")); (ds->method() == DataSocket::GET && ds->PathEquals("/sign_in"));
} }
ChannelMember* PeerChannel::Lookup(DataSocket* ds) const { ChannelMember *PeerChannel::Lookup(DataSocket *ds) const {
RTC_DCHECK(ds); RTC_DCHECK(ds);
if (ds->method() != DataSocket::GET && ds->method() != DataSocket::POST) if (ds->method() != DataSocket::GET && ds->method() != DataSocket::POST)
return NULL; return NULL;
size_t i = 0; size_t i = 0;
for (; i < ARRAYSIZE(kRequestPaths); ++i) { for (; i < ARRAYSIZE(kRequestPaths); ++i) {
if (ds->PathEquals(kRequestPaths[i])) if (ds->PathEquals(kRequestPaths[i]))
break; break;
}
if (i == ARRAYSIZE(kRequestPaths))
return NULL;
std::string args(ds->request_arguments());
static const char kPeerId[] = "peer_id=";
size_t found = args.find(kPeerId);
if (found == std::string::npos)
return NULL;
int id = atoi(&args[found + ARRAYSIZE(kPeerId) - 1]);
Members::const_iterator iter = members_.begin();
for (; iter != members_.end(); ++iter) {
if (id == (*iter)->id()) {
if (i == kWait)
(*iter)->SetWaitingSocket(ds);
if (i == kSignOut)
(*iter)->set_disconnected();
return *iter;
} }
}
return NULL; if (i == ARRAYSIZE(kRequestPaths))
} return NULL;
ChannelMember* PeerChannel::IsTargetedRequest(const DataSocket* ds) const { std::string args(ds->request_arguments());
RTC_DCHECK(ds); static const char kPeerId[] = "peer_id=";
// Regardless of GET or POST, we look for the peer_id parameter size_t found = args.find(kPeerId);
// only in the request_path.
const std::string& path = ds->request_path();
size_t args = path.find('?');
if (args == std::string::npos)
return NULL;
size_t found;
const char kTargetPeerIdParam[] = "to=";
do {
found = path.find(kTargetPeerIdParam, args);
if (found == std::string::npos) if (found == std::string::npos)
return NULL; return NULL;
if (found == (args + 1) || path[found - 1] == '&') {
found += ARRAYSIZE(kTargetPeerIdParam) - 1; int id = atoi(&args[found + ARRAYSIZE(kPeerId) - 1]);
break; Members::const_iterator iter = members_.begin();
for (; iter != members_.end(); ++iter) {
if (id == (*iter)->id()) {
if (i == kWait)
(*iter)->SetWaitingSocket(ds);
if (i == kSignOut)
(*iter)->set_disconnected();
return *iter;
}
} }
args = found + ARRAYSIZE(kTargetPeerIdParam) - 1;
} while (true); return NULL;
int id = atoi(&path[found]);
Members::const_iterator i = members_.begin();
for (; i != members_.end(); ++i) {
if ((*i)->id() == id) {
return *i;
}
}
return NULL;
} }
bool PeerChannel::AddMember(DataSocket* ds) { ChannelMember *PeerChannel::IsTargetedRequest(const DataSocket *ds) const {
RTC_DCHECK(IsPeerConnection(ds)); RTC_DCHECK(ds);
ChannelMember* new_guy = new ChannelMember(ds); // Regardless of GET or POST, we look for the peer_id parameter
Members failures; // only in the request_path.
BroadcastChangedState(*new_guy, &failures); const std::string &path = ds->request_path();
HandleDeliveryFailures(&failures); size_t args = path.find('?');
members_.push_back(new_guy); if (args == std::string::npos)
return NULL;
size_t found;
const char kTargetPeerIdParam[] = "to=";
do {
found = path.find(kTargetPeerIdParam, args);
if (found == std::string::npos)
return NULL;
if (found == (args + 1) || path[found - 1] == '&') {
found += ARRAYSIZE(kTargetPeerIdParam) - 1;
break;
}
args = found + ARRAYSIZE(kTargetPeerIdParam) - 1;
} while (true);
int id = atoi(&path[found]);
Members::const_iterator i = members_.begin();
for (; i != members_.end(); ++i) {
if ((*i)->id() == id) {
return *i;
}
}
return NULL;
}
printf("New member added (total=%s): %s\n", bool PeerChannel::AddMember(DataSocket *ds) {
size_t2str(members_.size()).c_str(), new_guy->name().c_str()); RTC_DCHECK(IsPeerConnection(ds));
ChannelMember *new_guy = new ChannelMember(ds);
Members failures;
BroadcastChangedState(*new_guy, &failures);
HandleDeliveryFailures(&failures);
members_.push_back(new_guy);
// Let the newly connected peer know about other members of the channel. printf("New member added (total=%s): %s\n",
std::string content_type; size_t2str(members_.size()).c_str(), new_guy->name().c_str());
std::string response = BuildResponseForNewMember(*new_guy, &content_type);
ds->Send("200 Added", true, content_type, new_guy->GetPeerIdHeader(), // Let the newly connected peer know about other members of the channel.
response); std::string content_type;
return true; std::string response = BuildResponseForNewMember(*new_guy, &content_type);
ds->Send("200 Added", true, content_type, new_guy->GetPeerIdHeader(),
response);
return true;
} }
void PeerChannel::CloseAll() { void PeerChannel::CloseAll() {
Members::const_iterator i = members_.begin(); Members::const_iterator i = members_.begin();
for (; i != members_.end(); ++i) { for (; i != members_.end(); ++i) {
(*i)->QueueResponse("200 OK", "text/plain", "", "Server shutting down"); (*i)->QueueResponse("200 OK", "text/plain", "", "Server shutting down");
} }
DeleteAll(); DeleteAll();
} }
void PeerChannel::OnClosing(DataSocket* ds) { void PeerChannel::OnClosing(DataSocket *ds) {
for (Members::iterator i = members_.begin(); i != members_.end(); ++i) { for (Members::iterator i = members_.begin(); i != members_.end(); ++i) {
ChannelMember* m = (*i); ChannelMember *m = (*i);
m->OnClosing(ds); m->OnClosing(ds);
if (!m->connected()) { if (!m->connected()) {
i = members_.erase(i); i = members_.erase(i);
Members failures; Members failures;
BroadcastChangedState(*m, &failures); BroadcastChangedState(*m, &failures);
HandleDeliveryFailures(&failures); HandleDeliveryFailures(&failures);
delete m; delete m;
if (i == members_.end()) if (i == members_.end())
break; break;
}
} }
} printf("Total connected: %s\n", size_t2str(members_.size()).c_str());
printf("Total connected: %s\n", size_t2str(members_.size()).c_str());
} }
void PeerChannel::CheckForTimeout() { void PeerChannel::CheckForTimeout() {
for (Members::iterator i = members_.begin(); i != members_.end(); ++i) { for (Members::iterator i = members_.begin(); i != members_.end(); ++i) {
ChannelMember* m = (*i); ChannelMember *m = (*i);
if (m->TimedOut()) { if (m->TimedOut()) {
printf("Timeout: %s\n", m->name().c_str()); printf("Timeout: %s\n", m->name().c_str());
m->set_disconnected(); m->set_disconnected();
i = members_.erase(i); i = members_.erase(i);
Members failures; Members failures;
BroadcastChangedState(*m, &failures); BroadcastChangedState(*m, &failures);
HandleDeliveryFailures(&failures); HandleDeliveryFailures(&failures);
delete m; delete m;
if (i == members_.end()) if (i == members_.end())
break; break;
}
} }
}
} }
void PeerChannel::DeleteAll() { void PeerChannel::DeleteAll() {
for (Members::iterator i = members_.begin(); i != members_.end(); ++i) for (Members::iterator i = members_.begin(); i != members_.end(); ++i)
delete (*i); delete (*i);
members_.clear(); members_.clear();
} }
void PeerChannel::BroadcastChangedState(const ChannelMember& member, void PeerChannel::BroadcastChangedState(const ChannelMember &member,
Members* delivery_failures) { Members *delivery_failures) {
// This function should be called prior to DataSocket::Close(). // This function should be called prior to DataSocket::Close().
RTC_DCHECK(delivery_failures); RTC_DCHECK(delivery_failures);
if (!member.connected()) { if (!member.connected()) {
printf("Member disconnected: %s\n", member.name().c_str()); printf("Member disconnected: %s\n", member.name().c_str());
} }
Members::iterator i = members_.begin(); Members::iterator i = members_.begin();
for (; i != members_.end(); ++i) { for (; i != members_.end(); ++i) {
if (&member != (*i)) { if (&member != (*i)) {
if (!(*i)->NotifyOfOtherMember(member)) { if (!(*i)->NotifyOfOtherMember(member)) {
(*i)->set_disconnected(); (*i)->set_disconnected();
delivery_failures->push_back(*i); delivery_failures->push_back(*i);
i = members_.erase(i); i = members_.erase(i);
if (i == members_.end()) if (i == members_.end())
break; break;
} }
}
} }
}
} }
void PeerChannel::HandleDeliveryFailures(Members* failures) { void PeerChannel::HandleDeliveryFailures(Members *failures) {
RTC_DCHECK(failures); RTC_DCHECK(failures);
while (!failures->empty()) { while (!failures->empty()) {
Members::iterator i = failures->begin(); Members::iterator i = failures->begin();
ChannelMember* member = *i; ChannelMember *member = *i;
RTC_DCHECK(!member->connected()); RTC_DCHECK(!member->connected());
failures->erase(i); failures->erase(i);
BroadcastChangedState(*member, failures); BroadcastChangedState(*member, failures);
delete member; delete member;
} }
} }
// Builds a simple list of "name,id\n" entries for each member. // Builds a simple list of "name,id\n" entries for each member.
std::string PeerChannel::BuildResponseForNewMember(const ChannelMember& member, std::string PeerChannel::BuildResponseForNewMember(const ChannelMember &member,
std::string* content_type) { std::string *content_type) {
RTC_DCHECK(content_type); RTC_DCHECK(content_type);
*content_type = "text/plain"; *content_type = "text/plain";
// The peer itself will always be the first entry. // The peer itself will always be the first entry.
std::string response(member.GetEntry()); std::string response(member.GetEntry());
for (Members::iterator i = members_.begin(); i != members_.end(); ++i) { for (Members::iterator i = members_.begin(); i != members_.end(); ++i) {
if (member.id() != (*i)->id()) { if (member.id() != (*i)->id()) {
RTC_DCHECK((*i)->connected()); RTC_DCHECK((*i)->connected());
response += (*i)->GetEntry(); response += (*i)->GetEntry();
}
} }
}
return response; return response;
} }

View File

@ -20,99 +20,108 @@
class DataSocket; class DataSocket;
// Represents a single peer connected to the server. // Represents a single peer connected to the server.
// 表示连接到服务器的单个对等点。
class ChannelMember { class ChannelMember {
public: public:
explicit ChannelMember(DataSocket* socket); explicit ChannelMember(DataSocket *socket);
~ChannelMember();
bool connected() const { return connected_; } ~ChannelMember();
int id() const { return id_; }
void set_disconnected() { connected_ = false; }
bool is_wait_request(DataSocket* ds) const;
const std::string& name() const { return name_; }
bool TimedOut(); bool connected() const { return connected_; }
std::string GetPeerIdHeader() const; int id() const { return id_; }
bool NotifyOfOtherMember(const ChannelMember& other); void set_disconnected() { connected_ = false; }
// Returns a string in the form "name,id\n". bool is_wait_request(DataSocket *ds) const;
std::string GetEntry() const;
void ForwardRequestToPeer(DataSocket* ds, ChannelMember* peer); const std::string &name() const { return name_; }
void OnClosing(DataSocket* ds); bool TimedOut();
void QueueResponse(const std::string& status, std::string GetPeerIdHeader() const;
const std::string& content_type,
const std::string& extra_headers,
const std::string& data);
void SetWaitingSocket(DataSocket* ds); bool NotifyOfOtherMember(const ChannelMember &other);
protected: // Returns a string in the form "name,id\n".
struct QueuedResponse { std::string GetEntry() const;
std::string status, content_type, extra_headers, data;
};
DataSocket* waiting_socket_; void ForwardRequestToPeer(DataSocket *ds, ChannelMember *peer);
int id_;
bool connected_; void OnClosing(DataSocket *ds);
time_t timestamp_;
std::string name_; void QueueResponse(const std::string &status,
std::queue<QueuedResponse> queue_; const std::string &content_type,
static int s_member_id_; const std::string &extra_headers,
const std::string &data);
void SetWaitingSocket(DataSocket *ds);
protected:
struct QueuedResponse {
std::string status, content_type, extra_headers, data;
};
DataSocket *waiting_socket_;
int id_;
bool connected_;
time_t timestamp_;
std::string name_;
std::queue<QueuedResponse> queue_;
static int s_member_id_;
}; };
// Manages all currently connected peers. // Manages all currently connected peers.
// 管理所有当前连接的对等点。
class PeerChannel { class PeerChannel {
public: public:
typedef std::vector<ChannelMember*> Members; typedef std::vector<ChannelMember *> Members;
PeerChannel() {} PeerChannel() {}
~PeerChannel() { DeleteAll(); } ~PeerChannel() { DeleteAll(); }
const Members& members() const { return members_; } const Members &members() const { return members_; }
// Returns true if the request should be treated as a new ChannelMember // Returns true if the request should be treated as a new ChannelMember
// request. Otherwise the request is not peerconnection related. // request. Otherwise the request is not peerconnection related.
static bool IsPeerConnection(const DataSocket* ds); static bool IsPeerConnection(const DataSocket *ds);
// Finds a connected peer that's associated with the `ds` socket. // Finds a connected peer that's associated with the `ds` socket.
ChannelMember* Lookup(DataSocket* ds) const; ChannelMember *Lookup(DataSocket *ds) const;
// Checks if the request has a "peer_id" parameter and if so, looks up the // Checks if the request has a "peer_id" parameter and if so, looks up the
// peer for which the request is targeted at. // peer for which the request is targeted at.
ChannelMember* IsTargetedRequest(const DataSocket* ds) const; ChannelMember *IsTargetedRequest(const DataSocket *ds) const;
// Adds a new ChannelMember instance to the list of connected peers and // Adds a new ChannelMember instance to the list of connected peers and
// associates it with the socket. // associates it with the socket.
bool AddMember(DataSocket* ds); bool AddMember(DataSocket *ds);
// Closes all connections and sends a "shutting down" message to all // Closes all connections and sends a "shutting down" message to all
// connected peers. // connected peers.
void CloseAll(); void CloseAll();
// Called when a socket was determined to be closing by the peer (or if the // Called when a socket was determined to be closing by the peer (or if the
// connection went dead). // connection went dead).
void OnClosing(DataSocket* ds); void OnClosing(DataSocket *ds);
void CheckForTimeout(); void CheckForTimeout();
protected: protected:
void DeleteAll(); void DeleteAll();
void BroadcastChangedState(const ChannelMember& member,
Members* delivery_failures);
void HandleDeliveryFailures(Members* failures);
// Builds a simple list of "name,id\n" entries for each member. void BroadcastChangedState(const ChannelMember &member,
std::string BuildResponseForNewMember(const ChannelMember& member, Members *delivery_failures);
std::string* content_type);
protected: void HandleDeliveryFailures(Members *failures);
Members members_;
// Builds a simple list of "name,id\n" entries for each member.
std::string BuildResponseForNewMember(const ChannelMember &member,
std::string *content_type);
protected:
Members members_;
}; };
#endif // EXAMPLES_PEERCONNECTION_SERVER_PEER_CHANNEL_H_ #endif // EXAMPLES_PEERCONNECTION_SERVER_PEER_CHANNEL_H_

View File

@ -1,16 +1,16 @@
project(test) project(test)
include_directories(/opt/homebrew/Cellar/sdl2_image/2.8.1/include) #include_directories(/opt/homebrew/Cellar/sdl2_image/2.8.1/include)
find_library(SDL2Image SDL2_image REQUIRED) #find_library(SDL2Image SDL2_image REQUIRED)
find_library(SDL2 SDL2 REQUIRED) #find_library(SDL2 SDL2 REQUIRED)
#
message(${SDL2Image}) #message(${SDL2Image})
message(${SDL2}) #message(${SDL2})
add_executable(test test_main.cc) add_executable(test test_main.cc)
target_link_libraries(test PUBLIC #target_link_libraries(test PUBLIC
${SDL2Image} # ${SDL2Image}
${SDL2} # ${SDL2}
PkgConfig::FFMPEG # PkgConfig::FFMPEG
) #)