licode 学习总结
参考:
licode编译以及源码分析:https://www.cnblogs.com/limedia/category/1350712.html
Licode—基于webrtc的SFU/MCU实现:https://www.jianshu.com/p/dcc5ba06b49f
Licode架构分析:https://blog.csdn.net/haitian403/article/details/89136984#6__42
licode 的singlepc 模式是怎么回事?:https://blog.csdn.net/ddr77/article/details/90065302
1.系统组成
抽取的代码没有webrtc自带的音视频处理框架(voice engine/video engine/media engine),这是由于SFC只在RTP层进行转发并不解码,只需要check一下媒体流属性(如I帧头)即可。
抽取代码的亮点在于将webrtc强大的抵抗网络时延和丢包的网络适应性算法和协议都提炼出来了,可以供广大研究视频传输的网络适应方向的开发者们单独学习、实验并快速集成到自己非webrtc-based的产品中来。
libav是从ffmpeg中分离出来的开发者发布的开源工程,代码量更小、第三方依赖库更少、编译简单。目前FFmpeg的使用更广泛。
libnice基于ICE协议实现的网络层库,Licode使用libnice库来实现端到端的ICE连接和数据流发送接收,以及candidates(候选地址)和SDP(媒体描述文件)的相互交换。(但是配置文件中默认使用nicer,此配置文件仅是example的配置)
libsrtp库主要是是用来加密rtp/rtcp的。
2.licode抽取webrtc组件
3.C++ Source
4.WebrtcConnection
WebrtcConnection是erizo进行Webrtc交互的基础类。
交互之主动发起流程:
被动呼叫:
std::string connection_id_; //唯一的ID bool audio_enabled_; //如果主动发起请求,被createOffer函数赋值,否则被processRemote赋值,表示是否有音频交互 bool video_enabled_; //表示是否有视频交互 bool trickle_enabled_; //启用ice的trickle模式 bool slide_show_mode_; //没有用,应该是留作扩展或者是以前版本的残留 bool sending_;//发送数据开关 int bundle_;//ice 使用,是否使用合并端口收发数据(音频和视频) WebRtcConnectionEventListener* conn_event_listener_; //webrtc连接事件监听 IceConfig ice_config_;//ice配置 std::vector<RtpMap> rtp_mappings_; //支持的RtpMap,生成sdp使用 RtpExtensionProcessor extension_processor_; //支持的extension_processor,生成sdp使用 boost::condition_variable cond_; std::shared_ptr<Transport> video_transport_, audio_transport_; //视频数据链路,音频数据链路 std::shared_ptr<Stats> stats_; //输出状态 WebRTCEvent global_state_; //webrtc事件状态枚举 boost::mutex update_state_mutex_; // boost::mutex event_listener_mutex_; std::shared_ptr<Worker> worker_; std::shared_ptr<IOWorker> io_worker_; std::vector<std::shared_ptr<MediaStream>> media_streams_; //这个connection使用的流 std::shared_ptr<SdpInfo> remote_sdp_; //对端的sdp std::shared_ptr<SdpInfo> local_sdp_; //本地的sdp bool audio_muted_; //扩展,在源文件中未看到使用,仅初始化 bool video_muted_; //扩展,在源文件中未看到使用,仅初始化 bool first_remote_sdp_processed_; //是否第一次处理sdp标识 std::unique_ptr<BandwidthDistributionAlgorithm> distributor_; //remb码率估计处理
/** * WebRTC Events */ enum WebRTCEvent { CONN_INITIAL = 101, CONN_STARTED = 102, CONN_GATHERED = 103, CONN_READY = 104, CONN_FINISHED = 105, CONN_CANDIDATE = 201, CONN_SDP = 202, CONN_SDP_PROCESSED = 203, CONN_FAILED = 500 }; class WebRtcConnectionEventListener { public: virtual ~WebRtcConnectionEventListener() { } virtual void notifyEvent(WebRTCEvent newEvent, const std::string& message) = 0; };
从成员可以看出,webrtcconnection,主要控制的有链路transport,交互local_sdp remote_sdp, ice控制,事件监听回调,数据流media_streams。主要依托于webrtc的标准交互:offer,candidate,answer来进行。整体流程需要根据ice的相关事件进行驱动,实际上是程序控制与ice状态的双重驱动控制。
5.Worker
erizo使用Worker来管理Task,每个Task是一个函数片段,其执行完全由Worker来接管。
class Worker : public std::enable_shared_from_this<Worker> { public: typedef std::unique_ptr<boost::asio::io_service::work> asio_worker; typedef std::function<void()> Task; //返回值为空的函数为Task typedef std::function<bool()> ScheduledTask; //返回值为bool的函数为scheduletask explicit Worker(std::weak_ptr<Scheduler> scheduler, std::shared_ptr<Clock> the_clock = std::make_shared<SteadyClock>()); ~Worker(); virtual void task(Task f); //设置运行Task virtual void start(); //开启线程 virtual void start(std::shared_ptr<std::promise<void>> start_promise); //同步方式开启线程,即确定线程启动后,调用者才会返回 virtual void close(); //停止线程 virtual std::shared_ptr<ScheduledTaskReference> scheduleFromNow(Task f, duration delta); //定时器,可以取消的定时器 virtual void unschedule(std::shared_ptr<ScheduledTaskReference> id); //取消定时器 virtual void scheduleEvery(ScheduledTask f, duration period); //循环定时器,f返回false时停止执行 private: void scheduleEvery(ScheduledTask f, duration period, duration next_delay); std::function<void()> safeTask(std::function<void(std::shared_ptr<Worker>)> f); protected: int next_scheduled_ = 0; private: std::weak_ptr<Scheduler> scheduler_; std::shared_ptr<Clock> clock_; boost::asio::io_service service_; asio_worker service_worker_; boost::thread_group group_; std::atomic<bool> closed_; };
在构造函数中,使用boost::asio::io_service,构建了基本的线程架构。
Worker::Worker(std::weak_ptr<Scheduler> scheduler, std::shared_ptr<Clock> the_clock) : scheduler_{scheduler}, //构造定时器变量 clock_{the_clock}, //构造自己的时钟变量 service_{}, //构造io_service对象 service_worker_{new asio_worker::element_type(service_)}, //为io_service注入service_worker,避免直接退出 closed_{false} { //线程控制变量,为true时,退出 }
Worker提供了两个start函数,无参的直接创建一个promise,调用有参数的,并且并未使用get_future.wait进行流程控制。这里就可以理解为:无参数start,不关心线程是否创建成功,如果在线程没有创建成功时,调用了task函数,则可能出现异常错误。有参数的start为开发控制线程存在,优化处理流程提供了可能。
void Worker::start() { auto promise = std::make_shared<std::promise<void>>(); start(promise); } void Worker::start(std::shared_ptr<std::promise<void>> start_promise) { auto this_ptr = shared_from_this(); auto worker = [this_ptr, start_promise] { //创建一个代理worker,准备好执行过程 start_promise->set_value(); //通知promise,线程已经就绪 if (!this_ptr->closed_) { //如果不是close状态 return this_ptr->service_.run(); //调用io service的run函数,开启线程过程 } return size_t(0); }; group_.add_thread(new boost::thread(worker)); //实际创建线程,并将之添加到group里面 }
void Worker::close() { closed_ = true; service_worker_.reset(); group_.join_all(); service_.stop(); }
在close函数中,将变量设为true,并调用各种析构。从start和close的控制可以看到,Worker的start和close只能成功调用一次,如果close以后,再start,线程就会直接退出了。这应该也是一个小弊端。
task调用io service的dispatch,直接执行任务。也就是说task实际上就是一个基础的处理,让任务进行执行。
void Worker::task(Task f) { service_.dispatch(f);//函数在当前线程立即执行,且同步(post在另外线程执行,异步) }
Worker也设计了定时执行.在scheduleFromNow里面,调用了scheduler的scheduleFromNow方法,在scheduler里面,放入任务队列,进入定时线程,到达时间后,执行Worker的task方法,投递一个Task,进而激活Worker,运行Task内容,完成定时执行操作。
std::shared_ptr<ScheduledTaskReference> Worker::scheduleFromNow(Task f, duration delta) { auto delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta); auto id = std::make_shared<ScheduledTaskReference>(); if (auto scheduler = scheduler_.lock()) { scheduler->scheduleFromNow(safeTask([f, id](std::shared_ptr<Worker> this_ptr) { //使用safeTask生成一个task,给scheduler的scheduleFromNow做参数传递 this_ptr->task(this_ptr->safeTask([f, id](std::shared_ptr<Worker> this_ptr) { //使用safeTask生成一个task,生成的task功能是如果id->isCancelled为true,直接返回,否则执行f,并将这个task传递给自己的任务投递方法 { if (id->isCancelled()) { return; } } f(); })); }), delta_ms); } return id; }
循环定时器,使用递归调用,来实现循环定时器,其停止依托于ScheduledTask的返回值为false,停止循环。
void Worker::scheduleEvery(ScheduledTask f, duration period) { scheduleEvery(f, period, period); } void Worker::scheduleEvery(ScheduledTask f, duration period, duration next_delay) { time_point start = clock_->now(); std::shared_ptr<Clock> clock = clock_; scheduleFromNow(safeTask([start, period, next_delay, f, clock](std::shared_ptr<Worker> this_ptr) { if (f()) { duration clock_skew = clock->now() - start - next_delay; duration delay = period - clock_skew; this_ptr->scheduleEvery(f, period, delay); //循环递归调用 } }), std::max(next_delay, duration{0})); }
Worker提供了基本的线程管理,提供了Task执行机制以及定时器控制机制,但是没有提供资源重复使用的机制,即多次调用close,start的机制
6.IOWorker
erizo使用IOWorker进行ICE,DTLS的状态交互处理。
namespace erizo { class IOWorker : public std::enable_shared_from_this<IOWorker> { public: typedef std::function<void()> Task; IOWorker(); virtual ~IOWorker(); virtual void start(); virtual void start(std::shared_ptr<std::promise<void>> start_promise); virtual void close(); virtual void task(Task f); private: std::atomic<bool> started_; std::atomic<bool> closed_; std::unique_ptr<std::thread> thread_; std::vector<Task> tasks_; mutable std::mutex task_mutex_; }; } // namespace erizo
接口定义与Worker基本没有区别,但是内部使用了atomic变量,而没有使用boost的io service,说明线程的执行是自己控制.
void IOWorker::start() { auto promise = std::make_shared<std::promise<void>>(); start(promise); } void IOWorker::start(std::shared_ptr<std::promise<void>> start_promise) { if (started_.exchange(true)) { return; } thread_ = std::unique_ptr<std::thread>(new std::thread([this, start_promise] { start_promise->set_value(); while (!closed_) { int events; struct timeval towait = {0, 100000}; struct timeval tv; int r = NR_async_event_wait2(&events, &towait); if (r == R_EOD) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } gettimeofday(&tv, 0); NR_async_timer_update_time(&tv); std::vector<Task> tasks; { std::unique_lock<std::mutex> lock(task_mutex_); tasks.swap(tasks_); } for (Task &task : tasks) { task(); } } })); } void IOWorker::task(Task f) { std::unique_lock<std::mutex> lock(task_mutex_); tasks_.push_back(f); }
在start里面做了重入检测判断,如果重入,直接返回。
在线程函数体内部,进行了定时处理,即sleep一段时间,执行所有的task。
在task函数中,将Task放入vector中,从总体实现上,与worker有很大的区别,但是从使用角度,基本是无差别的。
在IOWorker里面,使用效率可能不如Worker的效率高,而且人为的将任务集中执行,可能造成瞬时cpu过高。
7.Transport
erizo的transport部分负责网络链路处理,其包含ice处理,数据包packet处理传递。
transport存在,主要是为Dtls-srtp数据处理提供封装,其关联着ice与外部接口webrtcconnection。其关系图如下:
erizo提供了两套ICE的方案,分别使用不同的ice库,可以再iceconfig参数里进行设置。
erizo的DtlsTransport,提供了dtls交互,srtp加密和解密。
erizo的链路数据接收模型,是定义一个listener,下一级继承listener,同级聚合listener,实现异步数据的逐级传递。
在 DtlsTransport类的构造函数中根据配置决定初始化哪个库:
if (iceConfig_.use_nicer) { ice_ = NicerConnection::create(io_worker_, iceConfig_); } else { ice_.reset(LibNiceConnection::create(iceConfig_)); }
DtlsTransport会将上层来的数据传给ICE,负责进行加解密,listener会被设置成dtls实例
void DtlsTransport::start() { ice_->setIceListener(shared_from_this()); ice_->copyLogContextFrom(*this); ELOG_DEBUG("%s message: starting ice", toLog()); ice_->start(); }
void DtlsTransport::onDtlsPacket(DtlsSocketContext *ctx, const unsigned char* data, unsigned int len) { bool is_rtcp = ctx == dtlsRtcp.get(); int component_id = is_rtcp ? 2 : 1; packetPtr packet = std::make_shared<DataPacket>(component_id, data, len); if (is_rtcp) { writeDtlsPacket(dtlsRtcp.get(), packet); } else { writeDtlsPacket(dtlsRtp.get(), packet); } ELOG_DEBUG("%s message: Sending DTLS message, transportName: %s, componentId: %d", toLog(), transport_name.c_str(), packet->comp); } void DtlsTransport::writeDtlsPacket(DtlsSocketContext *ctx, packetPtr packet) { char data[1500]; unsigned int len = packet->length; memcpy(data, packet->data, len); writeOnIce(packet->comp, data, len); } //Transport.h void writeOnIce(int comp, void* buf, int len) { if (!running_) { return; } ice_->sendData(comp, buf, len);
}
void DtlsTransport::onIceData(packetPtr packet) {
if (!running_) {
return;
}
int len = packet->length;
char *data = packet->data;
unsigned int component_id = packet->comp;
int length = len;
SrtpChannel *srtp = srtp_.get();
//判断是否是dtls包,rtp, dtls, stun, unknown
if (DtlsTransport::isDtlsPacket(data, len)) { ELOG_DEBUG("%s message: Received DTLS message, transportName: %s, componentId: %u", toLog(), transport_name.c_str(), component_id); if (component_id == 1) { std::lock_guard<std::mutex> guard(dtls_mutex); dtlsRtp->read(reinterpret_cast<unsigned char*>(data), len); } else { std::lock_guard<std::mutex> guard(dtls_mutex); dtlsRtcp->read(reinterpret_cast<unsigned char*>(data), len); } return; } else if (this->getTransportState() == TRANSPORT_READY) { std::shared_ptr<DataPacket> unprotect_packet = std::make_shared<DataPacket>(component_id, data, len, VIDEO_PACKET, packet->received_time_ms); if (dtlsRtcp != NULL && component_id == 2) { srtp = srtcp_.get(); }
//如果不是dtls包会进行解密 if (srtp != NULL) { RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(unprotect_packet->data); if (chead->isRtcp()) { if (srtp->unprotectRtcp(unprotect_packet->data, &unprotect_packet->length) < 0) { return; } } else { if (srtp->unprotectRtp(unprotect_packet->data, &unprotect_packet->length) < 0) { return; } } } else { return; } if (length <= 0) { return; } if (auto listener = getTransportListener().lock()) { listener->onTransportData(unprotect_packet, this); } } }
解析出来的数据会传给listener,而listener是webrtcconnection的实例,通过pipe调用read传入media_stream中,UDP---ICE---DTLS---SRTP---RTP
void WebRtcConnection::onTransportData(std::shared_ptr<DataPacket> packet, Transport *transport) { if (getCurrentState() != CONN_READY) { return; } if (transport->mediaType == AUDIO_TYPE) { packet->type = AUDIO_PACKET; } else if (transport->mediaType == VIDEO_TYPE) { packet->type = VIDEO_PACKET; } asyncTask([packet] (std::shared_ptr<WebRtcConnection> connection) { if (!connection->pipeline_initialized_) { ELOG_DEBUG("%s message: Pipeline not initialized yet.", connection->toLog()); return; } if (connection->pipeline_) { connection->pipeline_->read(std::move(packet)); } }); } void WebRtcConnection::read(std::shared_ptr<DataPacket> packet) { Transport *transport = (bundle_ || packet->type == VIDEO_PACKET) ? video_transport_.get() : audio_transport_.get(); if (transport == nullptr) { return; } char* buf = packet->data; RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf); if (chead->isRtcp()) { onRtcpFromTransport(packet, transport); return; } else { RtpHeader *head = reinterpret_cast<RtpHeader*> (buf); uint32_t ssrc = head->getSSRC(); forEachMediaStream([packet, transport, ssrc] (const std::shared_ptr<MediaStream> &media_stream) { if (media_stream->isSourceSSRC(ssrc) || media_stream->isSinkSSRC(ssrc)) { media_stream->onTransportData(packet, transport); } }); } }
8.MediaStream
MediaStream是erizo进行流数据处理的核心模块。当网络数据,经过DtlsTransport进行srtp解密后,得到的rtp裸数据与rtcp裸数据,都要进入MediaStream进行处理;需要发送给对方的rtp数据与rtcp裸数据也要经过MediaStream处理后,才会给DtlsTransport进行加密并发送。
/** * A MediaStream. This class represents a Media Stream that can be established with other peers via a SDP negotiation */ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink, public FeedbackSource, public LogContext, public HandlerManagerListener, public std::enable_shared_from_this<MediaStream>, public Service {
整个继承体系里面,涉及处理的基类有:MediaSink,MediaSource,FeedbackSink,FeedbackSource, HandlerManagerListener,Service
MediaStream同时承载着收数据处理,和发送数据处理两部分内容。其中和丢包重传等结合起来,就变为:接收rtp数据,发送rtcp重传信息;发送rtp数据,接收rtcp重传信息。
MediaStream还继承了HandlerManagerListener,Service这两部分是媒体处理的核心模块
在接口上面,需要对外提供发送和接收到的裸数据回调。
这里面erizo的实现,是将MediaSink与MediaSource纠缠到一起的。
MediaSink:负责发送数据(write to client)
FeedbackSink:负责发送数据(write to client)
MediaSource:负责read出来rtp数据 (read from client)
FeedbackSource:负责read出来数据(read from client)
MediaStream继承MediaSink和FeedbackSink,所以直接调用MediaStream对象的deliverVideoData,deliverAudioData,deliverFeedback即可直接向对端发送数据。
要想接收对方的数据,需要MediaSource,FeedbackSource进行数据回调:
/** * A MediaSource is any class that produces audio or video data. */ class MediaSource: public virtual Monitor { protected: // SSRCs coming from the source uint32_t audio_source_ssrc_; std::vector<uint32_t> video_source_ssrc_list_; MediaSink* video_sink_; MediaSink* audio_sink_; MediaSink* event_sink_; // can it accept feedback FeedbackSink* source_fb_sink_; public: void setAudioSink(MediaSink* audio_sink) { boost::mutex::scoped_lock lock(monitor_mutex_); this->audio_sink_ = audio_sink; } void setVideoSink(MediaSink* video_sink) { boost::mutex::scoped_lock lock(monitor_mutex_); this->video_sink_ = video_sink; } void setEventSink(MediaSink* event_sink) { boost::mutex::scoped_lock lock(monitor_mutex_); this->event_sink_ = event_sink; } FeedbackSink* getFeedbackSink() { boost::mutex::scoped_lock lock(monitor_mutex_); return source_fb_sink_; } virtual int sendPLI() = 0; uint32_t getVideoSourceSSRC() { boost::mutex::scoped_lock lock(monitor_mutex_); if (video_source_ssrc_list_.empty()) { return 0; } return video_source_ssrc_list_[0]; } void setVideoSourceSSRC(uint32_t ssrc) { boost::mutex::scoped_lock lock(monitor_mutex_); if (video_source_ssrc_list_.empty()) { video_source_ssrc_list_.push_back(ssrc); return; } video_source_ssrc_list_[0] = ssrc; } std::vector<uint32_t> getVideoSourceSSRCList() { boost::mutex::scoped_lock lock(monitor_mutex_); return video_source_ssrc_list_; // return by copy to avoid concurrent access } void setVideoSourceSSRCList(const std::vector<uint32_t>& new_ssrc_list) { boost::mutex::scoped_lock lock(monitor_mutex_); video_source_ssrc_list_ = new_ssrc_list; } uint32_t getAudioSourceSSRC() { boost::mutex::scoped_lock lock(monitor_mutex_); return audio_source_ssrc_; } void setAudioSourceSSRC(uint32_t ssrc) { boost::mutex::scoped_lock lock(monitor_mutex_); audio_source_ssrc_ = ssrc; } bool isVideoSourceSSRC(uint32_t ssrc) { auto found_ssrc = std::find_if(video_source_ssrc_list_.begin(), video_source_ssrc_list_.end(), [ssrc](uint32_t known_ssrc) { return known_ssrc == ssrc; }); return (found_ssrc != video_source_ssrc_list_.end()); } bool isAudioSourceSSRC(uint32_t ssrc) { return audio_source_ssrc_ == ssrc; } MediaSource() : audio_source_ssrc_{0}, video_source_ssrc_list_{std::vector<uint32_t>(1, 0)}, video_sink_{nullptr}, audio_sink_{nullptr}, event_sink_{nullptr}, source_fb_sink_{nullptr} {} virtual ~MediaSource() {} virtual boost::future<void> close() = 0; };
MediaSource定义了4个MediaSink对象,分别对应video,audio,event,feedback。
MediaStream继承了MediaSource同样的4个set接口,让调用者可以设置MediaSource的这4个对象。当发生读取事件时,MediaStream会调用这4个设置的MediaSink对象的相应方法,来向外传递数据。FeedbackSource也是同样的道理。
void MediaStream::read(std::shared_ptr<DataPacket> packet) { char* buf = packet->data; int len = packet->length; // PROCESS RTCP RtpHeader *head = reinterpret_cast<RtpHeader*> (buf); RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf); uint32_t recvSSRC = 0; if (!chead->isRtcp()) { recvSSRC = head->getSSRC(); } else if (chead->packettype == RTCP_Sender_PT || chead->packettype == RTCP_SDES_PT) { // Sender Report recvSSRC = chead->getSSRC(); } // DELIVER FEEDBACK (RR, FEEDBACK PACKETS) if (chead->isFeedback()) { if (fb_sink_ != nullptr && should_send_feedback_) { fb_sink_->deliverFeedback(std::move(packet)); } } else { // RTP or RTCP Sender Report if (bundle_) { // Check incoming SSRC // Deliver data if (isVideoSourceSSRC(recvSSRC) && video_sink_) { parseIncomingPayloadType(buf, len, VIDEO_PACKET); parseIncomingExtensionId(buf, len, VIDEO_PACKET); video_sink_->deliverVideoData(std::move(packet)); } else if (isAudioSourceSSRC(recvSSRC) && audio_sink_) { parseIncomingPayloadType(buf, len, AUDIO_PACKET); parseIncomingExtensionId(buf, len, AUDIO_PACKET); audio_sink_->deliverAudioData(std::move(packet)); } else { ELOG_DEBUG("%s read video unknownSSRC: %u, localVideoSSRC: %u, localAudioSSRC: %u", toLog(), recvSSRC, this->getVideoSourceSSRC(), this->getAudioSourceSSRC()); } } else { if (packet->type == AUDIO_PACKET && audio_sink_) { parseIncomingPayloadType(buf, len, AUDIO_PACKET); parseIncomingExtensionId(buf, len, AUDIO_PACKET); // Firefox does not send SSRC in SDP if (getAudioSourceSSRC() == 0) { ELOG_DEBUG("%s discoveredAudioSourceSSRC:%u", toLog(), recvSSRC); this->setAudioSourceSSRC(recvSSRC); } audio_sink_->deliverAudioData(std::move(packet)); } else if (packet->type == VIDEO_PACKET && video_sink_) { parseIncomingPayloadType(buf, len, VIDEO_PACKET); parseIncomingExtensionId(buf, len, VIDEO_PACKET); // Firefox does not send SSRC in SDP if (getVideoSourceSSRC() == 0) { ELOG_DEBUG("%s discoveredVideoSourceSSRC:%u", toLog(), recvSSRC); this->setVideoSourceSSRC(recvSSRC); } // change ssrc for RTP packets, don't touch here if RTCP video_sink_->deliverVideoData(std::move(packet)); } } // if not bundle } // if not Feedback }
这里,MediaStream的读写就清楚了,如果我们需要使用MediaStream,则需要做:
1、定义一个MediaSink的子类,将之设置给MediaStream,用于接收MediaStream的数据
2、直接调用MediaStream的deliver方法,让其向外发送数据。
9.WebRTC中的SDP
SDP描述由许多文本行组成,文本行的格式为<类型>=<值>,<类型>是一个字母,<值>是结构化的文本串,其格式依<类型>而定。
SDP的 文本信息包括:
- 会话名称和意图
- 会话持续时间
- 构成会话的媒体
- 有关接收媒体的信息
会话名称和意图描述
v = (协议版本)
o = (所有者/创建者和会话标识符)
s = (会话名称)
i = * (会话信息)
u = * (URI 描述)
e = * (Email 地址)
p = * (电话号码)
c = * (连接信息 ― 如果包含在所有媒体中,则不需要该字段)
b = * (带宽信息)
时间描述
t = (会话活动时间)
r = * (0或多次重复次数)
媒体描述
m = (媒体名称和传输地址)
i = * (媒体标题)
c = * (连接信息 — 如果包含在会话层则该字段可选)
b = * (带宽信息)
k = * (加密密钥)
a = * (0 个或多个会话属性行)
v=0 //sdp版本号,一直为0,rfc4566规定 o=- 7017624586836067756 2 IN IP4 127.0.0.1 // o=<username> <sess-id> <sess-version> <nettype> <addrtype> <unicast-address> //username如何没有使用-代替,7017624586836067756是整个会话的编号,2代表会话版本,如果在会话 //过程中有改变编码之类的操作,重新生成sdp时,sess-id不变,sess-version加1 s=- //会话名,没有的话使用-代替 t=0 0 //两个值分别是会话的起始时间和结束时间,这里都是0代表没有限制 a=group:BUNDLE audio video data //需要共用一个传输通道传输的媒体,如果没有这一行,音视频,数据就会分别单独用一个udp端口来发送 a=msid-semantic: WMS h1aZ20mbQB0GSsq0YxLfJmiYWE9CBfGch97C //WMS是WebRTC Media Stream简称,这一行定义了本客户端支持同时传输多个流,一个流可以包括多个track, //一般定义了这个,后面a=ssrc这一行就会有msid,mslabel等属性 m=audio 9 UDP/TLS/RTP/SAVPF 111 103 104 9 0 8 106 105 13 126 //m=audio说明本会话包含音频,9代表音频使用端口9来传输,但是在webrtc中一现在一般不使用,如果设置为0,代表不 //传输音频,UDP/TLS/RTP/SAVPF是表示用户来传输音频支持的协议,udp,tls,rtp代表使用udp来传输rtp包,并使用tls加密 //SAVPF代表使用srtcp的反馈机制来控制通信过程,后台111 103 104 9 0 8 106 105 13 126表示本会话音频支持的编码,后台几行会有详细补充说明 c=IN IP4 0.0.0.0 //这一行表示你要用来接收或者发送音频使用的IP地址,webrtc使用ice传输,不使用这个地址 a=rtcp:9 IN IP4 0.0.0.0 //用来传输rtcp地地址和端口,webrtc中不使用 a=ice-ufrag:khLS a=ice-pwd:cxLzteJaJBou3DspNaPsJhlQ //以上两行是ice协商过程中的安全验证信息 a=fingerprint:sha-256 FA:14:42:3B:C7:97:1B:E8:AE:0C2:71:03:05:05:16:8F:B9:C7:98:E9:60:43:4B:5B:2C:28:EE:5C:8F3:17 //以上这行是dtls协商过程中需要的认证信息 a=setup:actpass //以上这行代表本客户端在dtls协商过程中,可以做客户端也可以做服务端,参考rfc4145 rfc4572 a=mid:audio //在前面BUNDLE这一行中用到的媒体标识 a=extmap:1 urn:ietf:params:rtp-hdrext:ssrc-audio-level //上一行指出我要在rtp头部中加入音量信息,参考 rfc6464 a=sendrecv //上一行指出我是双向通信,另外几种类型是recvonly,sendonly,inactive a=rtcp-mux //上一行指出rtp,rtcp包使用同一个端口来传输 //下面几行都是对m=audio这一行的媒体编码补充说明,指出了编码采用的编号,采样率,声道等 a=rtpmap:111 opus/48000/2 a=rtcp-fb:111 transport-cc //以上这行说明opus编码支持使用rtcp来控制拥塞,参考https://tools.ietf.org/html/draft-holmer-rmcat-transport-wide-cc-extensions-01 a=fmtp:111 minptime=10;useinbandfec=1 //对opus编码可选的补充说明,minptime代表最小打包时长是10ms,useinbandfec=1代表使用opus编码内置fec特性 a=rtpmap:103 ISAC/16000 a=rtpmap:104 ISAC/32000 a=rtpmap:9 G722/8000 a=rtpmap:0 PCMU/8000 a=rtpmap:8 PCMA/8000 a=rtpmap:106 CN/32000 a=rtpmap:105 CN/16000 a=rtpmap:13 CN/8000 a=rtpmap:126 telephone-event/8000 a=ssrc:18509423 cname:sTjtznXLCNH7nbRw //cname用来标识一个数据源,ssrc当发生冲突时可能会发生变化,但是cname不会发生变化,也会出现在rtcp包中SDEC中, //用于音视频同步 a=ssrc:18509423 msid:h1aZ20mbQB0GSsq0YxLfJmiYWE9CBfGch97C 15598a91-caf9-4fff-a28f-3082310b2b7a //以上这一行定义了ssrc和WebRTC中的MediaStream,AudioTrack之间的关系,msid后面第一个属性是stream-d,第二个是track-id a=ssrc:18509423 mslabel:h1aZ20mbQB0GSsq0YxLfJmiYWE9CBfGch97C a=ssrc:18509423 label:15598a91-caf9-4fff-a28f-3082310b2b7a m=video 9 UDP/TLS/RTP/SAVPF 100 101 107 116 117 96 97 99 98 //参考上面m=audio,含义类似 c=IN IP4 0.0.0.0 a=rtcp:9 IN IP4 0.0.0.0 a=ice-ufrag:khLS a=ice-pwd:cxLzteJaJBou3DspNaPsJhlQ a=fingerprint:sha-256 FA:14:42:3B:C7:97:1B:E8:AE:0C2:71:03:05:05:16:8F:B9:C7:98:E9:60:43:4B:5B:2C:28:EE:5C:8F3:17 a=setup:actpass a=mid:video a=extmap:2 urn:ietf:params:rtp-hdrext:toffset a=extmap:3 http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time a=extmap:4 urn:3gpp:video-orientation a=extmap:5 http://www.ietf.org/id/draft-hol ... de-cc-extensions-01 a=extmap:6 http://www.webrtc.org/experiments/rtp-hdrext/playout-delay a=sendrecv a=rtcp-mux a=rtcp-rsize a=rtpmap:100 VP8/90000 a=rtcp-fb:100 ccm fir //ccm是codec control using RTCP feedback message简称,意思是支持使用rtcp反馈机制来实现编码控制,fir是Full Intra Request //简称,意思是接收方通知发送方发送幅完全帧过来 a=rtcp-fb:100 nack //支持丢包重传,参考rfc4585 a=rtcp-fb:100 nack pli //支持关键帧丢包重传,参考rfc4585 a=rtcp-fb:100 goog-remb //支持使用rtcp包来控制发送方的码流 a=rtcp-fb:100 transport-cc //参考上面opus a=rtpmap:101 VP9/90000 a=rtcp-fb:101 ccm fir a=rtcp-fb:101 nack a=rtcp-fb:101 nack pli a=rtcp-fb:101 goog-remb a=rtcp-fb:101 transport-cc a=rtpmap:107 H264/90000 a=rtcp-fb:107 ccm fir a=rtcp-fb:107 nack a=rtcp-fb:107 nack pli a=rtcp-fb:107 goog-remb a=rtcp-fb:107 transport-cc a=fmtp:107 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f //h264编码可选的附加说明 a=rtpmap:116 red/90000 //fec冗余编码,一般如果sdp中有这一行的话,rtp头部负载类型就是116,否则就是各编码原生负责类型 a=rtpmap:117 ulpfec/90000 //支持ULP FEC,参考rfc5109 a=rtpmap:96 rtx/90000 a=fmtp:96 apt=100 //以上两行是VP8编码的重传包rtp类型 a=rtpmap:97 rtx/90000 a=fmtp:97 apt=101 a=rtpmap:99 rtx/90000 a=fmtp:99 apt=107 a=rtpmap:98 rtx/90000 a=fmtp:98 apt=116 a=ssrc-group:FID 3463951252 1461041037 //在webrtc中,重传包和正常包ssrc是不同的,上一行中前一个是正常rtp包的ssrc,后一个是重传包的ssrc a=ssrc:3463951252 cname:sTjtznXLCNH7nbRw a=ssrc:3463951252 msid:h1aZ20mbQB0GSsq0YxLfJmiYWE9CBfGch97C ead4b4e9-b650-4ed5-86f8-6f5f5806346d a=ssrc:3463951252 mslabel:h1aZ20mbQB0GSsq0YxLfJmiYWE9CBfGch97C a=ssrc:3463951252 label:ead4b4e9-b650-4ed5-86f8-6f5f5806346d a=ssrc:1461041037 cname:sTjtznXLCNH7nbRw a=ssrc:1461041037 msid:h1aZ20mbQB0GSsq0YxLfJmiYWE9CBfGch97C ead4b4e9-b650-4ed5-86f8-6f5f5806346d a=ssrc:1461041037 mslabel:h1aZ20mbQB0GSsq0YxLfJmiYWE9CBfGch97C a=ssrc:1461041037 label:ead4b4e9-b650-4ed5-86f8-6f5f5806346d m=application 9 DTLS/SCTP 5000 c=IN IP4 0.0.0.0 a=ice-ufrag:khLS a=ice-pwd:cxLzteJaJBou3DspNaPsJhlQ a=fingerprint:sha-256 FA:14:42:3B:C7:97:1B:E8:AE:0C2:71:03:05:05:16:8F:B9:C7:98:E9:60:43:4B:5B:2C:28:EE:5C:8F3:17 a=setup:actpass a=mid:data a=sctpmap:5000 webrtc-datachannel 1024
10.Pipeline_service
Pipeline是媒体处理的核心流程逻辑。
Pipeline里面定义了两个主要的概念:Service和Handler。
Service负责处理那些不仅要看当前数据包,还要分析之前的数据包的那些业务,比如丢包重传;Handler处理当前的数据包的情形,比如生成填充字节。
在Pipeline里面,Handler和Service是配合起来一起工作的,他们通过一套框架将之关联起来。
void MediaStream::initializePipeline() { if (pipeline_initialized_) { return; } handler_manager_ = std::make_shared<HandlerManager>(shared_from_this()); pipeline_->addService(shared_from_this()); pipeline_->addService(handler_manager_); pipeline_->addService(rtcp_processor_); pipeline_->addService(stats_); pipeline_->addService(quality_manager_); pipeline_->addService(packet_buffer_); pipeline_->addFront(std::make_shared<PacketReader>(this)); pipeline_->addFront(std::make_shared<RtcpProcessorHandler>()); pipeline_->addFront(std::make_shared<FecReceiverHandler>()); pipeline_->addFront(std::make_shared<LayerBitrateCalculationHandler>()); pipeline_->addFront(std::make_shared<QualityFilterHandler>()); pipeline_->addFront(std::make_shared<IncomingStatsHandler>()); pipeline_->addFront(std::make_shared<FakeKeyframeGeneratorHandler>()); pipeline_->addFront(std::make_shared<RtpTrackMuteHandler>()); pipeline_->addFront(std::make_shared<RtpSlideShowHandler>()); pipeline_->addFront(std::make_shared<RtpPaddingGeneratorHandler>()); pipeline_->addFront(std::make_shared<PeriodicPliHandler>()); pipeline_->addFront(std::make_shared<PliPriorityHandler>()); pipeline_->addFront(std::make_shared<PliPacerHandler>()); pipeline_->addFront(std::make_shared<RtpPaddingRemovalHandler>()); pipeline_->addFront(std::make_shared<BandwidthEstimationHandler>()); pipeline_->addFront(std::make_shared<RtcpFeedbackGenerationHandler>()); pipeline_->addFront(std::make_shared<RtpRetransmissionHandler>()); pipeline_->addFront(std::make_shared<SRPacketHandler>()); pipeline_->addFront(std::make_shared<LayerDetectorHandler>()); pipeline_->addFront(std::make_shared<OutgoingStatsHandler>()); pipeline_->addFront(std::make_shared<PacketCodecParser>()); pipeline_->addFront(std::make_shared<PacketWriter>(this)); pipeline_->finalize(); if (connection_) { quality_manager_->setConnectionQualityLevel(connection_->getConnectionQualityLevel()); } pipeline_initialized_ = true; }
在初始化时,pipeline调用了addService和addFront接口,将Service和Handler添加到pipeline中去。在初始化里面,我们可以看到其支持了哪些处理。
在实际使用中,接收到的数据,调用pipeline的read接口,就完成了解析为裸数据的事儿;调用write接口,就完成了fec等处理数据的事儿。
pipeline的数据,read的源需要是srtp解密后的数据,处理后为rtp裸数据;write的源为rtp裸数据,处理后的数据经过srtp加密输出到网络。(网络使用的是DtlsTransport接口对接的)
pipeline的Service部分的继承体系以及数据结构:
结合PipelineBase的addService实现:
template <class S> void PipelineBase::addService(std::shared_ptr<S> service) { typedef typename ServiceContextType<S>::type Context; service_ctxs_.push_back(std::make_shared<Context>(shared_from_this(), std::move(service))); } template <class Service> struct ServiceContextType { typedef ServiceContextImpl<Service> type; };
addService其实就是传递一个Service的子类对象,这个子类对象是用来给Context的构造函数传递参数的;Context就是ServiceContextImpl,也就是说addService里面的参数,就是为了创建一个ServiceContextImpl对象,这个对象创建出来以后,被存储在pipelinebase的service_ctxs_成员中。在addService接口中,还将pipeline自身,作为参数,传递给了ServiceContextImpl。
explicit ServiceContextImpl( std::weak_ptr<PipelineBase> pipeline, std::weak_ptr<S> service) { this->impl_ = this; this->initialize(pipeline, std::move(service)); } void initialize( std::weak_ptr<PipelineBase> pipeline, std::weak_ptr<S> service) { pipeline_weak_ = pipeline; pipeline_raw_ = pipeline.lock().get(); service_ = std::move(service); }
ServiceContextImpl在构造时存储了PipelineBase和Service,这样外面再使用时,可以通过getService来获取到Service的实例。
pipeline的notifyUpdate方法,看实际的处理handler(RtcpProcessorHandler)
void RtcpProcessorHandler::notifyUpdate() { auto pipeline = getContext()->getPipelineShared(); if (pipeline && !stream_) { stream_ = pipeline->getService<MediaStream>().get(); processor_ = pipeline->getService<RtcpProcessor>(); stats_ = pipeline->getService<Stats>(); } }
通过调用getService方法,模板传递不同的类型,则能够获取到不同的对象实例.
template <class S> typename ServiceContextType<S>::type* PipelineBase::getServiceContext() { for (auto pipeline_service_ctx : service_ctxs_) { auto ctx = dynamic_cast<typename ServiceContextType<S>::type*>(pipeline_service_ctx.get()); if (ctx) { return ctx; } } return nullptr; } template <class S> std::shared_ptr<S> PipelineBase::getService() { auto ctx = getServiceContext<S>(); return ctx ? ctx->getService().lock() : std::shared_ptr<S>(); }
在getServiceContext方法里面,遍历了pipeline的service_ctxs_,并对每一个ctx进行dynamic_cast转换,能够成功,就返回,不能成功就继续。形成了一个共享的方式,所有的handler,都可以获得到所有的service的子类实例,在实现过程中就极大的提升了灵活性,每个service独立做自己的事儿,并且由handler直接进行数据驱动。
Service的核心意义是共享,即每个handler都可以通过类型来获取到所有的Service子类实例,进行使用,而不必要为每个Handler定义不同的接口来传递Service对象。Service也为了多个Handler公用数据而提供服务。
11.Pipeline_handle
erizo的pipeline的handle,是媒体数据处理的基本操作,handle分为3类:IN,OUT,BOTH
IN:数据进入handle,handle需要read数据并传递给下一级
OUT:数据进入handle,handle需要write数据并传递给下一级
BOTH:可以同时进行read和write
在宏观语义上,IN的目标是输出rtp裸数据;OUT的目标是封装rtp裸数据
pipeline的handle的继承体系如下:
handle有三种:InboundHandler、Handler、OutboundHandler,他们的工作方式都差不多;他们分别对应一个Context:
InboundHandler==>InboundContextImpl
Handler==>ContextImpl
OutboundHandler==>OutboundContextImpl
pipeline有addFront方法,该方法是注册Handler的,先看看这个方法的相关实现:
template <class H> PipelineBase& PipelineBase::addFront(std::shared_ptr<H> handler) { typedef typename ContextType<H>::type Context; return addHelper( std::make_shared<Context>(shared_from_this(), std::move(handler)), true); } template <class H> PipelineBase& PipelineBase::addFront(H&& handler) { return addFront(std::make_shared<H>(std::forward<H>(handler))); } template <class H> PipelineBase& PipelineBase::addFront(H* handler) { return addFront(std::shared_ptr<H>(handler, [](H*){})); // NOLINT } template <class Context> PipelineBase& PipelineBase::addHelper( std::shared_ptr<Context>&& ctx, // NOLINT bool front) { ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx); if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) { inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get()); } if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) { outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get()); } return *this; }
在licode中输入传参都是shared_ptr,addFront的功能,就是创建一个Context的对象,并将之传递给addHelper为参数。Context的构造还被传递了Handler自己的指针,让Context能够知道handler。
addHelper将Context存储起来,并且判断类型,分别放到inCtxs_和outCtxs_里面,待用
template <class Handler> struct ContextType { typedef typename std::conditional< Handler::dir == HandlerDir_BOTH, ContextImpl<Handler>, typename std::conditional< Handler::dir == HandlerDir_IN, InboundContextImpl<Handler>, OutboundContextImpl<Handler> >::type>::type type; };
Context的实际类型,要依托于Handler::dir成员的值,这个值是一个常量,是每个Handler都有的。
HandlerDir_IN:Context为InboundContextImpl
HandlerDir_BOTH:Context为ContextImpl
HandlerDir_其他:Context为OutboundContextImpl
所以,pipeline的addFront方法,实际上是创建HandlerContext的实例,并且将之存储的过程。
pipeline的数据链路建立:
void Pipeline::finalize() { front_ = nullptr; if (!inCtxs_.empty()) { front_ = dynamic_cast<InboundLink*>(inCtxs_.front()); for (size_t i = 0; i < inCtxs_.size() - 1; i++) { inCtxs_[i]->setNextIn(inCtxs_[i+1]); } inCtxs_.back()->setNextIn(nullptr); } back_ = nullptr; if (!outCtxs_.empty()) { back_ = dynamic_cast<OutboundLink*>(outCtxs_.back()); for (size_t i = outCtxs_.size() - 1; i > 0; i--) { outCtxs_[i]->setNextOut(outCtxs_[i-1]); } outCtxs_.front()->setNextOut(nullptr); } if (!front_) { // detail::logWarningIfNotUnit<R>( // "No inbound handler in Pipeline, inbound operations will throw " // "std::invalid_argument"); } if (!back_) { // detail::logWarningIfNotUnit<W>( // "No outbound handler in Pipeline, outbound operations will throw " // "std::invalid_argument"); } for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) { (*it)->attachPipeline(); } for (auto it = service_ctxs_.rbegin(); it != service_ctxs_.rend(); it++) { (*it)->attachPipeline(); } notifyUpdate(); }
pipeline里面的finalize方法为In和Out的Context设置任务链,并且设置头结点(front_,back_)之后,每个HandlerContext就知道自己的下一级任务是什么了。
MediaStream的OnTransportData获得packet数据:
void MediaStream::onTransportData(std::shared_ptr<DataPacket> incoming_packet, Transport *transport) { if ((audio_sink_ == nullptr && video_sink_ == nullptr && fb_sink_ == nullptr)) { return; } std::shared_ptr<DataPacket> packet = std::make_shared<DataPacket>(*incoming_packet); if (transport->mediaType == AUDIO_TYPE) { packet->type = AUDIO_PACKET; } else if (transport->mediaType == VIDEO_TYPE) { packet->type = VIDEO_PACKET; } auto stream_ptr = shared_from_this(); worker_->task([stream_ptr, packet]{ if (!stream_ptr->pipeline_initialized_) { ELOG_DEBUG("%s message: Pipeline not initialized yet.", stream_ptr->toLog()); return; } char* buf = packet->data; RtpHeader *head = reinterpret_cast<RtpHeader*> (buf); RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf); if (!chead->isRtcp()) { uint32_t recvSSRC = head->getSSRC(); if (stream_ptr->isVideoSourceSSRC(recvSSRC)) { packet->type = VIDEO_PACKET; } else if (stream_ptr->isAudioSourceSSRC(recvSSRC)) { packet->type = AUDIO_PACKET; } } if (stream_ptr->pipeline_) { stream_ptr->pipeline_->read(std::move(packet)); } }); }
在里面调用了pipeline的read方法:
//调用了front的read,front是一个HandlerContext对象,并且是处理链的头结点 void Pipeline::read(std::shared_ptr<DataPacket> packet) { if (!front_) { return; } front_->read(std::move(packet));//InboundLink } //在HandlerContext里面调用了handler_的read方法,并且把自己作为参数也同时传递给了handler。 // InboundLink overrides void read(std::shared_ptr<DataPacket> packet) override { auto guard = this->pipelineWeak_.lock(); this->handler_->read(this, std::move(packet)); } //在handler的read里面,再次调用了ctx的fireRead方法,并且把packet进行传递 void LayerDetectorHandler::read(Context *ctx, std::shared_ptr<DataPacket> packet) { RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(packet->data); if (!chead->isRtcp() && enabled_ && packet->type == VIDEO_PACKET) { if (packet->codec == "VP8") { parseLayerInfoFromVP8(packet); } else if (packet->codec == "VP9") { parseLayerInfoFromVP9(packet); } else if (packet->codec == "H264") { parseLayerInfoFromH264(packet); } } ctx->fireRead(std::move(packet)); }
在fireRead中,调用了nextIn_的read方法,nextIn_是下一个HandlerContext。
这样就形成了一个任务链。
ContextRead-->HanderRead-->Context fireRead-->Context next read-->ContextRead.........
形成一个read的任务链。write的基本原理也是相似的。
erizo的pipeline的handler是负责实际数据处理的,通过处理链路,将之串联起来