一、基本原理
WebRTC的线程功能由Thread
类提供。Thread
继承于消息队列MessageQueue
,这样WebRTC中的每个线程都有了自己的消息循环,外部可以向该线程的消息循环Post消息Message
,然后该线程轮询从消息循环Get
到消息后处理消息。
UML如下:
二、Message和MessageQueue
Message
是单个消息的定义:
* posted_from
标记改条消息发送自哪个函数,一般都是直接用RTC_FROM_HERE
宏来赋值;
* message_id
32位整数,消息ID;
* pdata
消息携带的数据指针,虽然定义MessageData*
类型,但也可以等同于void*
。
* ts_sensitive
消息的敏感时间点。在使用MessageQueue::Post
等方法发送消息时,若time_sensitive == true
则设置ts_sensitive = TimeMillis() + kMaxMsgLatency
即当前时间 + 最大消息延迟时间。当消息被从队列中取出的时间大于该时间,则会打印警告日志。
* phandler
消息处理器,MessageHandler接口的指针。用户需要继承该类,并重写其OnMessage
虚函数。
MessageQueue
实现了WebRTC中消息队列的功能,如Post
方法用于添加消息;Get
方法用于从队列取出消息,若队列没有消息则一直等待,具体的等待和唤醒的方式通过SocketServer
的Wait
和WakeUp
来实现。
SocketServer
是一个纯抽象类,NullSocketServer
和PhysicalSocketServer
都派生自该类。NullSocketServer
比较简单,没有创建SOCKET,只处理本地事件(CreateEvent系列函数);PhysicalSocketServer
会创建SOCKET,并可以处理网络事件(WSACreateEvent系列函数)。在单纯的线程模型中,SocketServer
只用于处理等待和唤醒操作。
三、 Thread
Thread类提供了2个static函数(Create
, CreateWithSocketServer
)来构造一个Thread实例。不建议使用new Thread()
调用默认构造函数的方式,代码注释中已经给出了解释:
// DEPRECATED.
// The default constructor should not be used because it hides whether or
// not a socket server will be associated with the thread. Most instances
// of Thread do actually not need one, so please use either of the Create*
// methods to construct an instance of Thread.
Thread();
下面是Thread类构造的大致过程:
+-----------------------------+
| |
| new Thread(SocketServer*) |
| |
+---+-------------------------+
|
|
| +-----------------------------------+
| | |
+---------> new MessageQueue(SocketServer*) |
| |
+-----------------------------------+
构造完Thread实例之后,调用Start
来启动线程,框架会根据Start函数的Runnable参数是否为NULL来判断是需要运行用户自定义的Runnable->Run()
, 还是运行默认ProcessMessage
去循环从消息队列中获取消息来处理。
- 若运行用户自定义的
Runnable->Run()
,则用户需要继承rtc::Runnable
去重载Run()
; - 若运行默认的
ProcessMessage
,则用户则需要定义Message
,向线程中Post
该Message
,让线程来执行。
流程如下:
+---------------------------+
| |
| Thread->Start(Runnable*) |
| |
+-+-------------------------+
|
| +-------------------------+
| | |
+-------> CreateThrad(PreRun) |
| |
+-------------------------+
|Runnable* is NULL?
|
| +------------------+
| | |
+-----------> Runnable->Run() |
| | |
| +------------------+
|
|
| +------------------+
| | |
+-----------> Thread->Run() |
| |
+-+----------------+
|
|
| +--------------------------+
| | |
+----> Thread->ProcessMessage() |
| |
+--+-----------------------+
|
| +--------------------------------+
| | |
+-----> Loop call MessageQueue->Get() |
| |
+--------------------------------+
四、 示例
4.1 实现Runnbale
#include "rtc_base/thread.h"
class MyTask : public rtc::Runnable {
public:
MyTask(const std::string &name) : name_(name) {
}
protected:
void Run(Thread* thread) {
std::cout << "task name: " << name_ << std::endl;
std::cout << "my thread id: " << thread->GetId() << std::endl;
}
private:
std::string name_;
};
int main()
{
std::unique_ptr<rtc::Thread> thread = rtc::Thread::Create();
thread->Start(new MyTask("task1"));
getchar();
return 0;
}
4.2 发送Message
class MyTaskHandler : public rtc::MessageHandler {
public:
MyTaskHandler(const std::string &name) : name_(name) {
}
protected:
void OnMessage(Message* msg) {
std::cout << "task name: " << name_ << std::endl;
std::cout << "my thread id: " << Thread::Current()->GetId() << std::endl;
}
private:
std::string name_;
};
int main()
{
std::unique_ptr<rtc::Thread> thread = rtc::Thread::Create();
thread->Start(nullptr);
thread->Post(RTC_FROM_HERE, new MyTaskHandler("task2"), 0, nullptr, false);
getchar();
return 0;
}