Browse Source

wertc h265

v4
fajiao 2 years ago
parent
commit
5d8f24236b
  1. 4
      README.md
  2. 24
      config.yaml
  3. 150
      datachannelh265.go
  4. 50
      extend/extend.go
  5. 8
      main.go
  6. 130
      subscriberPro.go
  7. 30
      test/test.go
  8. 71
      webrtcV3.go

4
README.md

@ -7,6 +7,7 @@
https://github.com/Monibuca/plugin-webrtc
## 插件引入
```go
import ( _ "m7s.live/plugin/webrtc/v4" )
```
@ -33,6 +34,7 @@ webrtc:
## API
### 播放地址
`/webrtc/play/[streamPath]`
Body: `SDP`
@ -50,7 +52,9 @@ Body: `SDP`
Content-Type: `application/sdp`
Response Body: `SDP`
## WHIP
WebRTC-HTTP ingestion protocol
用于WebRTC交换SDP信息的规范

24
config.yaml

@ -0,0 +1,24 @@
# 支持零配置启动,即无需填写配置默认启用所有的插件。
# 只需要填写需要修改的配置项即可。不需要将所有的配置都填写进来!!。
# 全局配置参考 https://m7s.live/guide/config.html
# 插件配置参考各个插件的文档
# 插件都有一个enable配置,如果为false则不启用该插件,默认为true即不需要配置。
# global:
# console:
# secret: "ab0f6913670062af4d2f15c621205178"
# http:
# listenaddrtls: :8081
# certfile: monibuca.com.pem
# keyfile: monibuca.com.key
webrtc:
iceservers: [ ]
publicip: [ '192.168.1.119' ] # 可以是数组也可以是字符串(内部自动转成数组)
portmin: 10000
portmax: 20000
inviteportfixed: false # 设备将流发送的端口,是否固定 on 发送流到多路复用端口 如9000 off 自动从 mix_port - max_port 之间的值中 选一个可以用的端口
iceudpmux: 9000 # 接收设备端rtp流的多路复用端口
pli: 2000000000 # 2s
hls:
fragment: 3s # TS分片长度
window: 3 # 实时流m3u8文件包含的TS文件数

150
datachannelh265.go

@ -0,0 +1,150 @@
package webrtc
import (
"bytes"
"encoding/binary"
"errors"
"strconv"
// "fmt"
. "github.com/pion/webrtc/v3"
)
// H265
// https://zhuanlan.zhihu.com/p/458497037
const (
NALU_H265_VPS = 0x4001
NALU_H265_SPS = 0x4201
NALU_H265_PPS = 0x4401
NALU_H265_SEI = 0x4e01
NALU_H265_IFRAME = 0x2601
NALU_H265_PFRAME = 0x0201
HEVC_NAL_TRAIL_N = 0
HEVC_NAL_TRAIL_R = 1
HEVC_NAL_TSA_N = 2
HEVC_NAL_TSA_R = 3
HEVC_NAL_STSA_N = 4
HEVC_NAL_STSA_R = 5
HEVC_NAL_BLA_W_LP = 16
HEVC_NAL_BLA_W_RADL = 17
HEVC_NAL_BLA_N_LP = 18
HEVC_NAL_IDR_W_RADL = 19
HEVC_NAL_IDR_N_LP = 20
HEVC_NAL_CRA_NUT = 21
HEVC_NAL_RADL_N = 6
HEVC_NAL_RADL_R = 7
HEVC_NAL_RASL_N = 8
HEVC_NAL_RASL_R = 9
MAXPACKETSIZE = 65536
)
func SendH265FrameData(dc *DataChannel, data []byte, timestamp int64) {
if len(data) > 4 && dc != nil && dc.ReadyState() == DataChannelStateOpen {
var frametypestr string
glength := len(data)
count := glength / MAXPACKETSIZE
rem := glength % MAXPACKETSIZE
packets := count
if rem != 0 {
packets++
}
temptype, frametype, err := GetFrameType(data)
if err != nil {
} else {
frametypestr, err = GetFrameTypeName(frametype)
}
startstr := "h265 start ,FrameType:" + frametypestr + ",nalutype:" + strconv.Itoa(int(temptype)) + ",pts:" + strconv.FormatInt(timestamp, 10) + ",Packetslen:" + strconv.Itoa(glength) + ",packets:" + strconv.Itoa(packets) + ",rem:" + strconv.Itoa(rem)
_ = dc.SendText(startstr)
i := 0
for i = 0; i < count; i++ {
length := i * MAXPACKETSIZE
_ = dc.Send(data[length : length+MAXPACKETSIZE])
}
if rem != 0 {
_ = dc.Send(data[glength-rem : glength])
}
_ = dc.SendText("h265 end")
}
}
func GetFrameType(pdata []byte) (uint8, uint16, error) {
var frametype uint16
destcount := 0
if FindStartCode2(pdata) {
destcount = 3
} else if FindStartCode3(pdata) {
destcount = 4
} else {
return 0, 0, errors.New("not find")
}
temptype := (pdata[destcount] & 0x7E) >> 1
bytesBuffer := bytes.NewBuffer(pdata[destcount : destcount+2])
binary.Read(bytesBuffer, binary.BigEndian, &frametype)
return temptype, frametype, nil
}
func GetFrameTypeName(frametype uint16) (string, error) {
switch frametype {
case NALU_H265_VPS:
return "H265_FRAME_VPS", nil
case NALU_H265_SPS:
return "H265_FRAME_SPS", nil
case NALU_H265_PPS:
return "H265_FRAME_PPS", nil
case NALU_H265_SEI:
return "H265_FRAME_SEI", nil
case NALU_H265_IFRAME:
return "H265_FRAME_I", nil
case NALU_H265_PFRAME:
return "H265_FRAME_P", nil
default:
return "", errors.New("frametype unsupport")
}
}
func FindStartCode2(Buf []byte) bool {
if Buf[0] != 0 || Buf[1] != 0 || Buf[2] != 1 {
return false //判断是否为0x000001,如果是返回1
} else {
return true
}
}
func FindStartCode3(Buf []byte) bool {
if Buf[0] != 0 || Buf[1] != 0 || Buf[2] != 0 || Buf[3] != 1 {
return false //判断是否为0x00000001,如果是返回1
} else {
return true
}
}
func Add3ZoneOne(h265frame []byte) []byte {
var hBuf = [4]byte{0, 0, 0, 1}
var data []byte
for i := range hBuf {
data = append(data, hBuf[i])
}
for i := range h265frame {
data = append(data, h265frame[i])
}
return data
}
func AddBufs(A []byte, B []byte) []byte {
var data []byte
for i := range A {
data = append(data, A[i])
}
for i := range B {
data = append(data, B[i])
}
return data
}
type WebRtcReturn struct {
SessionDescription
IsH265 bool `json:"isH265"`
}

50
extend/extend.go

@ -0,0 +1,50 @@
package extend
import (
"context"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/common"
"m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
"net"
)
func ReadRing(vt *track.Video) *common.RingBuffer[common.AVFrame] {
return util.Clone(vt.Media.RingBuffer)
}
func Read(r *common.RingBuffer[common.AVFrame], ctx context.Context) (item *common.AVFrame) {
for item = &r.Value; ctx.Err() == nil && !item.CanRead; {
}
return
}
// PlayFullAnnexB 订阅annex-b格式的流数据,每一个I帧增加sps、pps头
func PlayFullAnnexB(vt *track.Video, ctx context.Context, onMedia func(net.Buffers) error) error {
for vr := ReadRing(vt); ctx.Err() == nil; vr.MoveNext() {
vp := Read(vr, ctx)
var data net.Buffers
if vp.IFrame {
for _, nalu := range vt.ParamaterSets {
data = append(data, codec.NALU_Delimiter2, nalu)
}
}
data = append(data, codec.NALU_Delimiter2)
i := 0
vp.AUList.Range(func(au *util.BLL) bool {
if i > 0 {
data = append(data, codec.NALU_Delimiter1, au.ToBytes())
} else {
data = append(data, au.ToBuffers()...)
}
i++
return true
})
if err := onMedia(data); err != nil {
// TODO: log err
return err
}
}
return ctx.Err()
}

8
main.go

@ -1,7 +1,7 @@
package webrtc
import (
"io/ioutil"
"io"
"net"
"net/http"
"regexp"
@ -102,7 +102,7 @@ func (conf *WebRTCConfig) OnEvent(event any) {
func (conf *WebRTCConfig) Play_(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/sdp")
streamPath := r.URL.Path[len("/webrtc/play/"):]
bytes, err := ioutil.ReadAll(r.Body)
bytes, err := io.ReadAll(r.Body)
var suber WebRTCSubscriber
suber.SDP = string(bytes)
if suber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{}); err != nil {
@ -132,7 +132,7 @@ func (conf *WebRTCConfig) Play_(w http.ResponseWriter, r *http.Request) {
func (conf *WebRTCConfig) Push_(w http.ResponseWriter, r *http.Request) {
streamPath := r.URL.Path[len("/webrtc/push/"):]
w.Header().Set("Content-Type", "application/sdp")
bytes, err := ioutil.ReadAll(r.Body)
bytes, err := io.ReadAll(r.Body)
var puber WebRTCPublisher
puber.SDP = string(bytes)
if puber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{}); err != nil {
@ -178,7 +178,7 @@ var WebRTCPlugin = engine.InstallPlugin(webrtcConfig)
func (conf *WebRTCConfig) Batch(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/sdp")
bytes, err := ioutil.ReadAll(r.Body)
bytes, err := io.ReadAll(r.Body)
var suber WebRTCBatcher
suber.SDP = string(bytes)
if suber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{}); err != nil {

130
subscriberPro.go

@ -0,0 +1,130 @@
package webrtc
import (
"fmt"
"github.com/pion/rtcp"
. "github.com/pion/webrtc/v3"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
"m7s.live/plugin/webrtc/v4/extend"
"net"
"strings"
"time"
)
type WebRTCSubscriberPro struct {
Subscriber
WebRTCIO
videoTrack *TrackLocalStaticRTP
audioTrack *TrackLocalStaticRTP
isH265 bool
}
func (suber *WebRTCSubscriberPro) OnEvent(event any) {
switch v := event.(type) {
case *track.Video:
if v.CodecID == codec.CodecID_H264 {
suber.isH265 = false
pli := "42001f"
//pli = fmt.Sprintf("%x", v.GetDecoderConfiguration().Raw[0][1:4])
if !strings.Contains(suber.SDP, pli) {
pli = reg_level.FindAllStringSubmatch(suber.SDP, -1)[0][1]
}
suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, "video", "m7s")
rtpSender, _ := suber.PeerConnection.AddTrack(suber.videoTrack)
go func() {
rtcpBuf := make([]byte, 1500)
for {
if n, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
return
} else {
if p, err := rtcp.Unmarshal(rtcpBuf[:n]); err == nil {
for _, pp := range p {
switch pp.(type) {
case *rtcp.PictureLossIndication:
// fmt.Println("PictureLossIndication")
}
}
}
}
}
}()
suber.Subscriber.AddTrack(v) //接受这个track
}
start := time.Now().UnixMilli()
var rtcDc *DataChannel
if v.CodecID == codec.CodecID_H265 {
suber.isH265 = true
nInSendH265Track := 0
suber.PeerConnection.OnDataChannel(func(dc *DataChannel) {
rtcDc = dc
rtcDc.OnOpen(func() {
annexB := v.GetAnnexB()
var h265frame []byte
for _, p := range annexB { //拼接消息头
h265frame = AddBufs(h265frame, p)
}
va := v.IDRing.Value
va.AUList.Range(func(au *util.BLL) bool {
packets := au.ToBuffers()
for _, packet := range packets {
h265frame = AddBufs(h265frame, Add3ZoneOne(packet))
}
return true
})
SendH265FrameData(rtcDc, h265frame, va.Timestamp.UnixMilli()-start)
})
rtcDc.OnMessage(func(msg DataChannelMessage) {
msg_ := string(msg.Data)
fmt.Println(msg_)
})
rtcDc.OnClose(func() {
nInSendH265Track--
})
})
go extend.PlayFullAnnexB(v, suber.IO, func(frame net.Buffers) error {
var h265frame []byte
for _, packet := range frame {
if len(h265frame) == 0 {
h265frame = packet
} else {
h265frame = AddBufs(h265frame, packet)
}
}
timestamp := time.Now().UnixMilli()
SendH265FrameData(rtcDc, h265frame, timestamp-start)
return nil
})
}
case *track.Audio:
audioMimeType := MimeTypePCMA
if v.CodecID == codec.CodecID_PCMU {
audioMimeType = MimeTypePCMU
}
if v.CodecID == codec.CodecID_PCMA || v.CodecID == codec.CodecID_PCMU {
suber.audioTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, "audio", "m7s")
suber.PeerConnection.AddTrack(suber.audioTrack)
suber.Subscriber.AddTrack(v) //接受这个track
}
case VideoRTP:
suber.videoTrack.WriteRTP(&v.Packet)
case AudioRTP:
suber.audioTrack.WriteRTP(&v.Packet)
case ISubscriber:
suber.OnConnectionStateChange(func(pcs PeerConnectionState) {
suber.Info("Connection State has changed:" + pcs.String())
switch pcs {
case PeerConnectionStateConnected:
suber.Info("Connection State has changed:")
go suber.PlayRTP()
case PeerConnectionStateDisconnected, PeerConnectionStateFailed:
suber.Stop()
suber.PeerConnection.Close()
}
})
default:
suber.Subscriber.OnEvent(event)
}
}

30
test/test.go

@ -0,0 +1,30 @@
package main
import (
"context"
"m7s.live/engine/v4"
//_ "m7s.live/plugin/hdl/v4"
_ "m7s.live/plugin/hls/v4"
_ "m7s.live/plugin/hook/v4"
//_ "m7s.live/plugin/jessica/v4"
//_ "m7s.live/plugin/logrotate/v4"
//_ "m7s.live/plugin/preview/v4"
//_ "m7s.live/plugin/record/v4"
_ "m7s.live/plugin/rtmp/v4"
_ "m7s.live/plugin/rtsp/v4"
//_ "m7s.live/plugin/snap/v4"
_ "m7s.live/plugin/webrtc/v4"
)
// engine Jun 25, 2022
// webrtc Sep 24, 2022
// rtmp Sep 24, 2022
// rtsp Sep 6, 2022
// go build -ldflags "-w -s" -o build\ test/test.go
func main() {
err := engine.Run(context.Background(), "config.yaml")
if err != nil {
return
}
}

71
webrtcV3.go

@ -0,0 +1,71 @@
package webrtc
import (
"encoding/json"
. "github.com/pion/webrtc/v3"
"io"
"net/http"
)
func (conf *WebRTCConfig) PlayV3_(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Max-Age", "86400")
w.Header().Set("Access-Control-Allow-Methods", "*")
w.Header().Set("Access-Control-Allow-Headers", "Origin, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization")
w.Header().Set("Access-Control-Expose-Headers", "*")
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Content-Type", "application/json")
streamPath := r.URL.Path[len("/webrtc/playv3/"):]
bytes, err := io.ReadAll(r.Body)
var suber WebRTCSubscriberPro
var offer SessionDescription
if err = json.Unmarshal(bytes, &offer); err != nil {
return
}
suber.SDP = offer.SDP
if suber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
suber.OnICECandidate(func(ice *ICECandidate) {
if ice != nil {
suber.Info(ice.ToJSON().Candidate)
}
})
if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: suber.SDP}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err = WebRTCPlugin.Subscribe(streamPath, &suber); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if sdp, err := suber.GetAnswerV3(); err == nil {
ret := WebRtcReturn{}
_ = json.Unmarshal(sdp, &ret)
ret.IsH265 = suber.isH265
byt, _ := json.Marshal(ret)
_, _ = w.Write(byt)
// w.Write(sdp)
} else {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}
func (IO *WebRTCIO) GetAnswerV3() ([]byte, error) {
// Sets the LocalDescription, and starts our UDP listeners
answer, err := IO.CreateAnswer(nil)
if err != nil {
return nil, err
}
gatherComplete := GatheringCompletePromise(IO.PeerConnection)
if err := IO.SetLocalDescription(answer); err != nil {
return nil, err
}
<-gatherComplete
if bytes, err := json.Marshal(IO.LocalDescription()); err != nil {
return bytes, err
} else {
return bytes, nil
}
}
Loading…
Cancel
Save