基本概念
在实时流媒体系统中,jitterbuffer是在解码端,起到如下几个作用:
- 对rtp包进行排序
- 对rtp包进行去重
- 去抖动
对于1,2点比较简单。核心的是去抖动,去抖动实现的目标就是使视频能平滑播放,不因为抖动忽快忽慢。
简单的视频jitterbuffer可以只针对rtp包进行处理,只对rtp进行排序,去重。并不处理视频帧。如下图
这种简单的实现并不关心视频帧是否有错,是否可以解码。视频帧间是否可以持续解码。(针对视频帧内的RTP包,如果经过排序和去重,可以认为是可解码的)。这些全部放在解码模块去做。
当然这种形式的Jitterbuffer无法处理抖动。因为对视频帧而言,抖动的处理是针对帧间的,而不是RTP包间的。把它叫做rtp buffer应该更合适些。
webrtc中的视频jitterbuffer
webrtc中的jitterBuffer也是QOS机制中的核心,它会估算抖动,丢包,决定是否通过Nack来重传。这里我先忽略与QOS相关的一些逻辑,先看看jitterBuffer中的一些核心的基础功能。
webrtc中的jitterBuffer是基于视频帧实现,在RTP的接收端存入jitterbuffer的是rtp包,在解码端取出的是视频帧。
输入的是rtp包,输出的是平滑的(去抖动)视频帧,是保证可解码,可连续解码的。针对去抖动或帧错误的处理,都是在该类中实现的。所以如果视频出现卡顿,黑屏等问题。应该在VCMJitterBuffer找原因,查看它的帧错误处理逻辑。
基本数据结构
VCMJitterBuffer 类是具体的实现,因为是针对视频帧,所以基本的数据结构就是帧队列,如下分为三种类型的帧队列:
UnorderedFrameList free_frames_ ;
FrameList decodable_frames_ ;
FrameList incomplete_frames_ ;
free_frames 空帧队列,decodable_frames_ 可解码帧队列,incomplete_frames_ 队列
- 帧
VMCFrameBuffer
就是帧,free_frames_,decodable_frames_,incomplete_frames_分别为VMCFrameBuffer
的list, map。而VMCFrameBuffer
的VCMSessionInfo
类型的成员变量就是具体的帧缓存,查看VCMSessionInfo代码,帧缓存其实就是typedef std::list<VCMPacket> PacketList
是一个Rtp packet的list。VCMSessionInfo类就是处理Rtp包,根据Rtp包来转换帧的状态。
VCMSessionInfo
VCMSessionInfo类的几个核心接口InsertPacket,UpdateCompleteSession,UpdateDecodableSession
- InsertPacket
就是实现rtp包的排序,去重,更新插入Rtp包后的视频帧的状态
int VCMSessionInfo::InsertPacket(const VCMPacket& packet,
uint8_t* frame_buffer,
VCMDecodeErrorMode decode_error_mode,
const FrameData& frame_data) {
if (packet.frameType == kFrameEmpty) {
// Update sequence number of an empty packet.
// Only media packets are inserted into the packet list.
InformOfEmptyPacket(packet.seqNum);
return 0;
}
//一帧视频可以包含的最大rtp packet数(Nalu的最大分包数)
if (packets_.size() == kMaxPacketsInSession) {
LOG(LS_ERROR) << "Max number of packets per frame has been reached.";
return -1;
}
//按SN从小到大的顺序排序
// Find the position of this packet in the packet list in sequence number
// order and insert it. Loop over the list in reverse order.
ReversePacketIterator rit = packets_.rbegin();
for (; rit != packets_.rend(); ++rit)
if (LatestSequenceNumber(packet.seqNum, (*rit).seqNum) == packet.seqNum)
break;
//去重
// Check for duplicate packets.
if (rit != packets_.rend() &&
(*rit).seqNum == packet.seqNum && (*rit).sizeBytes > 0)
return -2;
//rtp包的mark为true,则是一帧的最后一个包
if (packet.codec == kVideoCodecH264) {
frame_type_ = packet.frameType;
if (packet.isFirstPacket &&
(first_packet_seq_num_ == -1 ||
IsNewerSequenceNumber(first_packet_seq_num_, packet.seqNum))) {
first_packet_seq_num_ = packet.seqNum;
}
if (packet.markerBit &&
(last_packet_seq_num_ == -1 ||
IsNewerSequenceNumber(packet.seqNum, last_packet_seq_num_))) {
last_packet_seq_num_ = packet.seqNum;
}
} else {
// Only insert media packets between first and last packets (when
// available).
// Placing check here, as to properly account for duplicate packets.
// Check if this is first packet (only valid for some codecs)
// Should only be set for one packet per session.
if (packet.isFirstPacket && first_packet_seq_num_ == -1) {
// The first packet in a frame signals the frame type.
frame_type_ = packet.frameType;
// Store the sequence number for the first packet.
first_packet_seq_num_ = static_cast<int>(packet.seqNum);
} else if (first_packet_seq_num_ != -1 &&
IsNewerSequenceNumber(first_packet_seq_num_, packet.seqNum)) {
LOG(LS_WARNING) << "Received packet with a sequence number which is out "
"of frame boundaries";
return -3;
} else if (frame_type_ == kFrameEmpty && packet.frameType != kFrameEmpty) {
// Update the frame type with the type of the first media packet.
// TODO(mikhal): Can this trigger?
frame_type_ = packet.frameType;
}
// Track the marker bit, should only be set for one packet per session.
if (packet.markerBit && last_packet_seq_num_ == -1) {
last_packet_seq_num_ = static_cast<int>(packet.seqNum);
} else if (last_packet_seq_num_ != -1 &&
IsNewerSequenceNumber(packet.seqNum, last_packet_seq_num_)) {
LOG(LS_WARNING) << "Received packet with a sequence number which is out "
"of frame boundaries";
return -3;
}
}
// The insert operation invalidates the iterator |rit|.
PacketIterator packet_list_it = packets_.insert(rit.base(), packet);
//插入rtp payload数据
size_t returnLength = InsertBuffer(frame_buffer, packet_list_it);
//在对一帧内的rtp包进行排序,去重后。更新该帧的状态
UpdateCompleteSession();
if (decode_error_mode == kWithErrors)
decodable_ = true;
else if (decode_error_mode == kSelectiveErrors)
UpdateDecodableSession(frame_data);
return static_cast<int>(returnLength);
}
基本流程:
- 按SN从小到大的顺序排序
- 去重
- 插入rtp payload数据
- 更新该帧的状态
- UpdateCompleteSession
更新帧状态为Complete状态,更新为Commplete状态的条件是有第一个包和最后一个包(打了mark的包),并且之间的SeqNum都是连续的,其实这种条件下的帧是满足可解码的。
void VCMSessionInfo::UpdateCompleteSession() {
if (HaveFirstPacket() && HaveLastPacket()) {
// Do we have all the packets in this session?
bool complete_session = true;
PacketIterator it = packets_.begin();
PacketIterator prev_it = it;
++it;
for (; it != packets_.end(); ++it) {
if (!InSequence(it, prev_it)) {
complete_session = false;
break;
}
prev_it = it;
}
complete_ = complete_session;
}
}
- UpdateDecodableSession
这里可解码的条件,我有点搞不明白。按照代码注释来罗列下条件:
- 非关键
- 必须有第一个包,按代码注释说明如下:
It has the first packet: In VP8 the first packet contains all or part of the first partition, which consists of the most relevant information for decoding.
- 根据帧的平均RTP包数判断
Either more than the upper threshold of the average number of packets per frame is present or less than the lower threshold of the average number of packets per frame is present: suggests a small frame.Such a frame is unlikely to contain many motion vectors, so having the first packet will likely suffice.Once we have more than the lower threshold of the frame, we know that the frame is medium or large-sized.
翻译:
存在大于每帧平均包数上限的阈值或小于表示每帧平均包数下限的阈值:建议为一小帧。这样一帧不太可能包含许多运动矢量,因此 拥有第一个数据包就足够了。一旦我们获得了帧的下限阈值以上,我们就知道该帧是中型或大型的。
对于较小的帧,其帧中包含的rtp包的数量是小于每帧平均包的下限阈值,这样的帧不会携带许多运动矢量,拥有第一个数据包就足够了。
对于比较大的帧,包含的rtp包的数量大于每帧平均包数的上限阈值,是不是这样帧是已经携带了大部分的运动矢量。加上第一个包的信息是可以解码的?
RTP包的数量介于这两者之间的是不能解码的。
void VCMSessionInfo::UpdateDecodableSession(const FrameData& frame_data) {
// Irrelevant if session is already complete or decodable
if (complete_ || decodable_)
return;
// TODO(agalusza): Account for bursty loss.
// TODO(agalusza): Refine these values to better approximate optimal ones.
// Do not decode frames if the RTT is lower than this.
const int64_t kRttThreshold = 100;
// Do not decode frames if the number of packets is between these two
// thresholds.
const float kLowPacketPercentageThreshold = 0.2f;
const float kHighPacketPercentageThreshold = 0.8f;
if (frame_data.rtt_ms < kRttThreshold
|| frame_type_ == kVideoFrameKey
|| !HaveFirstPacket()
|| (NumPackets() <= kHighPacketPercentageThreshold
* frame_data.rolling_average_packets_per_frame
&& NumPackets() > kLowPacketPercentageThreshold
* frame_data.rolling_average_packets_per_frame))
return;
decodable_ = true;
}
计算每帧包数的平均值使用了 moving average 算法,该算法是在时间段内取RTP包个数的平均值来估算视频流每帧的平均包数。
- VCMJitterBuffer
VCMJitterBuffer对视频帧进行处理,下面是InsertPacket
方法
VCMFrameBufferEnum VCMJitterBuffer::InsertPacket(const VCMPacket& packet,
bool* retransmitted) {
CriticalSectionScoped cs(crit_sect_);
if (nack_module_)
nack_module_->OnReceivedPacket(packet);
++num_packets_;
if (num_packets_ == 1) {
time_first_packet_ms_ = clock_->TimeInMilliseconds();
}
// Does this packet belong to an old frame?
if (last_decoded_state_.IsOldPacket(&packet)) {
//来的太迟的包,会被丢弃掉
// Account only for media packets.
if (packet.sizeBytes > 0) {
num_discarded_packets_++;
num_consecutive_old_packets_++;
if (stats_callback_ != NULL)
stats_callback_->OnDiscardedPacketsUpdated(num_discarded_packets_);
}
// Update last decoded sequence number if the packet arrived late and
// belongs to a frame with a timestamp equal to the last decoded
// timestamp.
last_decoded_state_.UpdateOldPacket(&packet);
DropPacketsFromNackList(last_decoded_state_.sequence_num());
// Also see if this old packet made more incomplete frames continuous.
FindAndInsertContinuousFramesWithState(last_decoded_state_);
if (num_consecutive_old_packets_ > kMaxConsecutiveOldPackets) {
LOG(LS_WARNING)
<< num_consecutive_old_packets_
<< " consecutive old packets received. Flushing the jitter buffer.";
Flush();
return kFlushIndicator;
}
return kOldPacket;
}
num_consecutive_old_packets_ = 0;
//根据RTP包的时间戳在incomplete_frames_,decodable_frames_,free_frames_ 三种list中找到对应的Frame及Frame List
VCMFrameBuffer* frame;
FrameList* frame_list;
const VCMFrameBufferEnum error = GetFrame(packet, &frame, &frame_list);
if (error != kNoError)
return error;
int64_t now_ms = clock_->TimeInMilliseconds();
// We are keeping track of the first and latest seq numbers, and
// the number of wraps to be able to calculate how many packets we expect.
if (first_packet_since_reset_) {
// Now it's time to start estimating jitter
// reset the delay estimate.
inter_frame_delay_.Reset(now_ms);
}
// Empty packets may bias the jitter estimate (lacking size component),
// therefore don't let empty packet trigger the following updates:
if (packet.frameType != kEmptyFrame) {
if (waiting_for_completion_.timestamp == packet.timestamp) {
// This can get bad if we have a lot of duplicate packets,
// we will then count some packet multiple times.
waiting_for_completion_.frame_size += packet.sizeBytes;
waiting_for_completion_.latest_packet_time = now_ms;
} else if (waiting_for_completion_.latest_packet_time >= 0 &&
waiting_for_completion_.latest_packet_time + 2000 <= now_ms) {
// A packet should never be more than two seconds late
UpdateJitterEstimate(waiting_for_completion_, true);
waiting_for_completion_.latest_packet_time = -1;
waiting_for_completion_.frame_size = 0;
waiting_for_completion_.timestamp = 0;
}
}
//获取在插入该RTP包之前,帧的状态
VCMFrameBufferStateEnum previous_state = frame->GetState();
// Insert packet.
FrameData frame_data;
frame_data.rtt_ms = rtt_ms_;
frame_data.rolling_average_packets_per_frame = average_packets_per_frame_;
//插入RTP包,同时获取帧的最新状态
VCMFrameBufferEnum buffer_state =
frame->InsertPacket(packet, now_ms, decode_error_mode_, frame_data);
if (previous_state != kStateComplete) {
TRACE_EVENT_ASYNC_BEGIN1("webrtc", "Video", frame->TimeStamp(), "timestamp",
frame->TimeStamp());
}
/* buffer_stat大于0 的状态都视频帧的正常状态,包括:
*kIncomplete //Frame incomplete
*kCompleteSession //at least one layer in the frame complete
*kDecodableSession //Frame incomplete, but ready to be decoded
*kDuplicatePacket //We're receiving a duplicate packet
*/
if (buffer_state > 0) {
incoming_bit_count_ += packet.sizeBytes << 3;
if (first_packet_since_reset_) {
latest_received_sequence_number_ = packet.seqNum;
first_packet_since_reset_ = false;
} else {
if (IsPacketRetransmitted(packet)) {
frame->IncrementNackCount();
}
if (!UpdateNackList(packet.seqNum) &&
packet.frameType != kVideoFrameKey) {
buffer_state = kFlushIndicator;
}
latest_received_sequence_number_ =
LatestSequenceNumber(latest_received_sequence_number_, packet.seqNum);
}
}
// Is the frame already in the decodable list?
bool continuous = IsContinuous(*frame);
switch (buffer_state) {
case kGeneralError:
case kTimeStampError:
case kSizeError: {
//帧为错误帧,直接被丢弃掉
free_frames_.push_back(frame);
break;
}
case kCompleteSession: {
if (previous_state != kStateDecodable &&
previous_state != kStateComplete) {
CountFrame(*frame);
if (continuous) {
// Signal that we have a complete session.
frame_event_->Set();
}
}
FALLTHROUGH();
}
// Note: There is no break here - continuing to kDecodableSession.
case kDecodableSession: {
*retransmitted = (frame->GetNackCount() > 0);
if (continuous) {
decodable_frames_.InsertFrame(frame);
FindAndInsertContinuousFrames(*frame);
} else {
incomplete_frames_.InsertFrame(frame);
// If NACKs are enabled, keyframes are triggered by |GetNackList|.
if (nack_mode_ == kNoNack &&
NonContinuousOrIncompleteDuration() >
90 * kMaxDiscontinuousFramesTime) {
return kFlushIndicator;
}
}
break;
}
case kIncomplete: {
if (frame->GetState() == kStateEmpty &&
last_decoded_state_.UpdateEmptyFrame(frame)) {
free_frames_.push_back(frame);
return kNoError;
} else {
incomplete_frames_.InsertFrame(frame);
// If NACKs are enabled, keyframes are triggered by |GetNackList|.
if (nack_mode_ == kNoNack &&
NonContinuousOrIncompleteDuration() >
90 * kMaxDiscontinuousFramesTime) {
return kFlushIndicator;
}
}
break;
}
case kNoError:
case kOutOfBoundsPacket:
case kDuplicatePacket: {
//错误的RTP Packet,相关的Frame还是保持原样
// Put back the frame where it came from.
if (frame_list != NULL) {
frame_list->InsertFrame(frame);
} else {
free_frames_.push_back(frame);
}
++num_duplicated_packets_;
break;
}
case kFlushIndicator:
free_frames_.push_back(frame);
return kFlushIndicator;
default:
assert(false);
}
return buffer_state;
}
基本流程:
- 根据RTP包的时间戳在incomplete_frames_,decodable_frames_,free_frames_ 三种list中找到对应的Frame及Frame List
- 获取在插入该RTP包之前,帧的状态
- 在所属的帧中插入该RTP 包
- 判断插入后的帧的状态,buffer_stat大于0的状态都为正常状态,包括: kIncomplete(Frame incomplete),kCompleteSession(at least one layer in the frame complete),kDecodableSession(Frame incomplete, but ready to be decoded),kDuplicatePacket(收到一个重复的RTP包,对帧的状态并没有影响)
- 判断在decodable_frames_中加入该帧后,g改帧所属的GOP是否可解码(decodable_frames_可以视为视频流,包含多个GOP)
- 判断该帧所属的List,如果所属的GOP可解码则把它放入decodable_frames_ 中,可能是从incomplete_frames_ list中放入decodeable_frames中。需要注意:错误状态(kGeneralError,kTimeStampError,kSizeError)的帧会被丢弃(插入新的RTP包后,造成帧错误),插入FreeFrame List中。kNoError,kOutOfBoundsPacket,kDuplicatePacket只是丢弃了RTP包,对帧没有影响,视频帧原来属于哪个List就放入哪个。
VCMDecodingState
在VCMJitterBuffer中的decodeable_frames_可以认为是一系列GOP,每个GOP包含了多个视频帧。按照编码理论,GOP间是不相互引用的,一个GOP内错误是不会传递到下一GOP。一个GOP的起始帧就是关键帧。对complete和decodeable状态的帧,会判断是否属于一个GOP并且可以解码后,才插入decodeable_frames_。那么VCMDecodingState就是用于判断帧间关系,是否属于一个GOP,可解码。
关于帧是否解码,视频流是否可连续解码
这两种的判断逻辑大部分是针对VP8/9的,特别的是在VCMDecodingState中通过picture id,temporal layers等信息判断,帧之间是否连续。这些信息在H264中好像并不存在?(我不太确定),应该是针对VP8/9的(因为对VP8/9不熟悉,我也不清楚判断的帧是否连续的依据是什么)。所以这里要注意下,如果在启用H264后,出现视频卡顿,黑屏等问题。这些判断帧是否解码以及帧间是否可连续解码的逻辑也可能是原因之一,针对H264这些条件可能并不成立。