Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FlexFec] Adding support for FlexFec RFC-03 #214

Merged
merged 1 commit into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions pkg/flexfec/encoder_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package flexfec

import (
"github.com/pion/interceptor"
"github.com/pion/rtp"
)

// FecInterceptor implements FlexFec.
type FecInterceptor struct {
interceptor.NoOp
flexFecEncoder FlexEncoder
packetBuffer []rtp.Packet
minNumMediaPackets uint32
}

// FecOption can be used to set initial options on Fec encoder interceptors.
type FecOption func(d *FecInterceptor) error

// FecInterceptorFactory creates new FecInterceptors.
type FecInterceptorFactory struct {
opts []FecOption
}

// NewFecInterceptor returns a new Fec interceptor factory.
func NewFecInterceptor(opts ...FecOption) (*FecInterceptorFactory, error) {
return &FecInterceptorFactory{opts: opts}, nil
}

// NewInterceptor constructs a new FecInterceptor.
func (r *FecInterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
// Hardcoded for now:
// Min num media packets to encode FEC -> 5
// Min num fec packets -> 1

interceptor := &FecInterceptor{
packetBuffer: make([]rtp.Packet, 0),
minNumMediaPackets: 5,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be configurable

}
return interceptor, nil
}

// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
// will be called once per rtp packet.
func (r *FecInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
// Chromium supports version flexfec-03 of existing draft, this is the one we will configure by default
// although we should support configuring the latest (flexfec-20) as well.
r.flexFecEncoder = NewFlexEncoder03(info.PayloadType, info.SSRC)

return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
r.packetBuffer = append(r.packetBuffer, rtp.Packet{
Header: *header,
Payload: payload,
})

// Send the media RTP packet
result, err := writer.Write(header, payload, attributes)

// Send the FEC packets
var fecPackets []rtp.Packet
if len(r.packetBuffer) == int(r.minNumMediaPackets) {
fecPackets = r.flexFecEncoder.EncodeFec(r.packetBuffer, 2)

for _, fecPacket := range fecPackets {
fecResult, fecErr := writer.Write(&fecPacket.Header, fecPacket.Payload, attributes)

if fecErr != nil && fecResult == 0 {
break
}
}
// Reset the packet buffer now that we've sent the corresponding FEC packets.
r.packetBuffer = nil
}

return result, err
})
}
96 changes: 72 additions & 24 deletions pkg/flexfec/flexfec_coverage.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,36 +42,64 @@ func NewCoverage(mediaPackets []rtp.Packet, numFecPackets uint32) *ProtectionCov
// We allocate the biggest array of bitmasks that respects the max constraints.
var packetMasks [MaxFecPackets]util.BitArray
for i := 0; i < int(MaxFecPackets); i++ {
packetMasks[i] = util.NewBitArray(MaxMediaPackets)
packetMasks[i] = util.BitArray{}
}

coverage := &ProtectionCoverage{
packetMasks: packetMasks,
numFecPackets: 0,
numMediaPackets: 0,
mediaPackets: nil,
}

coverage.UpdateCoverage(mediaPackets, numFecPackets)
return coverage
}

// UpdateCoverage updates the ProtectionCoverage object with new bitmasks accounting for the numFecPackets
// we want to use to protect the batch media packets.
func (p *ProtectionCoverage) UpdateCoverage(mediaPackets []rtp.Packet, numFecPackets uint32) {
numMediaPackets := uint32(len(mediaPackets))

// Basic sanity checks
if numMediaPackets <= 0 || numMediaPackets > MaxMediaPackets {
return
}

p.mediaPackets = mediaPackets

if numFecPackets == p.numFecPackets && numMediaPackets == p.numMediaPackets {
// We have the same number of FEC packets covering the same number of media packets, we can simply
// reuse the previous coverage map with the updated media packets.
return
}

p.numFecPackets = numFecPackets
p.numMediaPackets = numMediaPackets

// The number of FEC packets and/or the number of packets has changed, we need to update the coverage map
// to reflect these new values.
p.resetCoverage()

// Generate FEC bit mask where numFecPackets FEC packets are covering numMediaPackets Media packets.
// In the packetMasks array, each FEC packet is represented by a single BitArray, each bit in a given BitArray
// corresponds to a specific Media packet.
// Ex: Row I, Col J is set to 1 -> FEC packet I will protect media packet J.
for fecPacketIndex := uint32(0); fecPacketIndex < numFecPackets; fecPacketIndex++ {
// We use an interleaved method to determine coverage. Given N FEC packets, Media packet X will be
// covered by FEC packet X % N.
for mediaPacketIndex := uint32(0); mediaPacketIndex < numMediaPackets; mediaPacketIndex++ {
coveringFecPktIndex := mediaPacketIndex % numFecPackets
packetMasks[coveringFecPktIndex].SetBit(mediaPacketIndex, 1)
coveredMediaPacketIndex := fecPacketIndex
for coveredMediaPacketIndex < numMediaPackets {
p.packetMasks[fecPacketIndex].SetBit(coveredMediaPacketIndex)
coveredMediaPacketIndex += numFecPackets
}
}

return &ProtectionCoverage{
packetMasks: packetMasks,
numFecPackets: numFecPackets,
numMediaPackets: numMediaPackets,
mediaPackets: mediaPackets,
}
}

// ResetCoverage clears the underlying map so that we can reuse it for new batches of RTP packets.
func (p *ProtectionCoverage) ResetCoverage() {
func (p *ProtectionCoverage) resetCoverage() {
for i := uint32(0); i < MaxFecPackets; i++ {
for j := uint32(0); j < MaxMediaPackets; j++ {
p.packetMasks[i].SetBit(j, 0)
}
p.packetMasks[i].Reset()
}
}

Expand All @@ -86,26 +114,46 @@ func (p *ProtectionCoverage) GetCoveredBy(fecPacketIndex uint32) *util.MediaPack
return util.NewMediaPacketIterator(p.mediaPackets, coverage)
}

// MarshalBitmasks returns the underlying bitmask that defines which media packets are protected by the
// specified fecPacketIndex.
func (p *ProtectionCoverage) MarshalBitmasks(fecPacketIndex uint32) []byte {
return p.packetMasks[fecPacketIndex].Marshal()
}

// ExtractMask1 returns the first section of the bitmask as defined by the FEC header.
// https://datatracker.ietf.org/doc/html/rfc8627#section-4.2.2.1
func (p *ProtectionCoverage) ExtractMask1(fecPacketIndex uint32) uint16 {
return uint16(p.packetMasks[fecPacketIndex].GetBitValue(0, 14))
mask := p.packetMasks[fecPacketIndex]
// We get the first 16 bits (64 - 16 -> shift by 48) and we shift once more for K field
mask1 := mask.Lo >> 49
return uint16(mask1)
}

// ExtractMask2 returns the second section of the bitmask as defined by the FEC header.
// https://datatracker.ietf.org/doc/html/rfc8627#section-4.2.2.1
func (p *ProtectionCoverage) ExtractMask2(fecPacketIndex uint32) uint32 {
return uint32(p.packetMasks[fecPacketIndex].GetBitValue(15, 45))
mask := p.packetMasks[fecPacketIndex]
// We remove the first 15 bits
mask2 := mask.Lo << 15
// We get the first 31 bits (64 - 31 -> shift by 33) and we shift once more for K field
mask2 >>= 34
return uint32(mask2)
}

// ExtractMask3 returns the third section of the bitmask as defined by the FEC header.
// https://datatracker.ietf.org/doc/html/rfc8627#section-4.2.2.1
func (p *ProtectionCoverage) ExtractMask3(fecPacketIndex uint32) uint64 {
return p.packetMasks[fecPacketIndex].GetBitValue(46, 109)
mask := p.packetMasks[fecPacketIndex]
// We remove the first 46 bits
maskLo := mask.Lo << 46
maskHi := mask.Hi >> 18
mask3 := maskLo | maskHi
return mask3
}

// ExtractMask3_03 returns the third section of the bitmask as defined by the FEC header.
// https://datatracker.ietf.org/doc/html/draft-ietf-payload-flexible-fec-scheme-03#section-4.2
func (p *ProtectionCoverage) ExtractMask3_03(fecPacketIndex uint32) uint64 {
mask := p.packetMasks[fecPacketIndex]
// We remove the first 46 bits
maskLo := mask.Lo << 46
maskHi := mask.Hi >> 18
mask3 := maskLo | maskHi
// We shift once for the K bit.
mask3 >>= 1
return mask3
}
42 changes: 26 additions & 16 deletions pkg/flexfec/flexfec_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,37 @@ const (
BaseFecHeaderSize = 12
)

// FlexEncoder implements the Fec encoding mechanism for the "Flex" variant of FlexFec.
type FlexEncoder struct {
baseSN uint16
// FlexEncoder is the interface that FecInterceptor uses to encode Fec packets.
type FlexEncoder interface {
EncodeFec(mediaPackets []rtp.Packet, numFecPackets uint32) []rtp.Packet
}

// FlexEncoder20 implements the Fec encoding mechanism for the "Flex" variant of FlexFec.
type FlexEncoder20 struct {
fecBaseSn uint16
payloadType uint8
ssrc uint32
coverage *ProtectionCoverage
}

// NewFlexEncoder returns a new FlexFecEncer.
func NewFlexEncoder(baseSN uint16, payloadType uint8, ssrc uint32) *FlexEncoder {
return &FlexEncoder{
baseSN: baseSN,
func NewFlexEncoder(payloadType uint8, ssrc uint32) *FlexEncoder20 {
return &FlexEncoder20{
payloadType: payloadType,
ssrc: ssrc,
coverage: nil,
fecBaseSn: uint16(1000),
}
}

// EncodeFec returns a list of generated RTP packets with FEC payloads that protect the specified mediaPackets.
// This method does not account for missing RTP packets in the mediaPackets array nor does it account for
// them being passed out of order.
func (flex *FlexEncoder) EncodeFec(mediaPackets []rtp.Packet, numFecPackets uint32) []rtp.Packet {
func (flex *FlexEncoder20) EncodeFec(mediaPackets []rtp.Packet, numFecPackets uint32) []rtp.Packet {
// Start by defining which FEC packets cover which media packets
if flex.coverage == nil {
flex.coverage = NewCoverage(mediaPackets, numFecPackets)
} else {
flex.coverage.ResetCoverage()
flex.coverage.UpdateCoverage(mediaPackets, numFecPackets)
}

if flex.coverage == nil {
Expand All @@ -56,39 +60,42 @@ func (flex *FlexEncoder) EncodeFec(mediaPackets []rtp.Packet, numFecPackets uint
// Generate FEC payloads
fecPackets := make([]rtp.Packet, numFecPackets)
for fecPacketIndex := uint32(0); fecPacketIndex < numFecPackets; fecPacketIndex++ {
fecPackets[fecPacketIndex] = flex.encodeFlexFecPacket(fecPacketIndex)
fecPackets[fecPacketIndex] = flex.encodeFlexFecPacket(fecPacketIndex, mediaPackets[0].SequenceNumber)
}

return fecPackets
}

func (flex *FlexEncoder) encodeFlexFecPacket(fecPacketIndex uint32) rtp.Packet {
func (flex *FlexEncoder20) encodeFlexFecPacket(fecPacketIndex uint32, mediaBaseSn uint16) rtp.Packet {
mediaPacketsIt := flex.coverage.GetCoveredBy(fecPacketIndex)
flexFecHeader := flex.encodeFlexFecHeader(
mediaPacketsIt,
flex.coverage.ExtractMask1(fecPacketIndex),
flex.coverage.ExtractMask2(fecPacketIndex),
flex.coverage.ExtractMask3(fecPacketIndex),
mediaBaseSn,
)
flexFecRepairPayload := flex.encodeFlexFecRepairPayload(mediaPacketsIt.Reset())

return rtp.Packet{
packet := rtp.Packet{
Header: rtp.Header{
Version: 2,
Padding: false,
Extension: false,
Marker: false,
PayloadType: flex.payloadType,
SequenceNumber: flex.baseSN,
SequenceNumber: flex.fecBaseSn,
Timestamp: 54243243,
SSRC: flex.ssrc,
CSRC: []uint32{},
},
Payload: append(flexFecHeader, flexFecRepairPayload...),
}
flex.fecBaseSn++
return packet
}

func (flex *FlexEncoder) encodeFlexFecHeader(mediaPackets *util.MediaPacketIterator, mask1 uint16, optionalMask2 uint32, optionalMask3 uint64) []byte {
func (flex *FlexEncoder20) encodeFlexFecHeader(mediaPackets *util.MediaPacketIterator, mask1 uint16, optionalMask2 uint32, optionalMask3 uint64, mediaBaseSn uint16) []byte {
/*
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
Expand Down Expand Up @@ -119,7 +126,7 @@ func (flex *FlexEncoder) encodeFlexFecHeader(mediaPackets *util.MediaPacketItera
headerSize += 8
}

// Allocate a the FlexFec header
// Allocate the FlexFec header
flexFecHeader := make([]byte, headerSize)

// XOR the relevant fields for the header
Expand Down Expand Up @@ -149,6 +156,9 @@ func (flex *FlexEncoder) encodeFlexFecHeader(mediaPackets *util.MediaPacketItera
flexFecHeader[7] ^= flexFecHeader[7]
}

// Write the base SN for the batch of media packets
binary.BigEndian.PutUint16(flexFecHeader[8:10], mediaBaseSn)

// Write the bitmasks to the header
binary.BigEndian.PutUint16(flexFecHeader[10:12], mask1)

Expand All @@ -163,7 +173,7 @@ func (flex *FlexEncoder) encodeFlexFecHeader(mediaPackets *util.MediaPacketItera
return flexFecHeader
}

func (flex *FlexEncoder) encodeFlexFecRepairPayload(mediaPackets *util.MediaPacketIterator) []byte {
func (flex *FlexEncoder20) encodeFlexFecRepairPayload(mediaPackets *util.MediaPacketIterator) []byte {
flexFecPayload := make([]byte, len(mediaPackets.First().Payload))

for mediaPackets.HasNext() {
Expand Down
Loading
Loading