diff --git a/README.md b/README.md index f16762f..4e7925d 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ https://github.com/Monibuca/plugin-webrtc ## 插件引入 + ```go import ( _ "m7s.live/plugin/webrtc/v4" ) ``` @@ -33,6 +34,7 @@ webrtc: ## API ### 播放地址 + `/webrtc/play/[streamPath]` Body: `SDP` @@ -50,7 +52,9 @@ Body: `SDP` Content-Type: `application/sdp` Response Body: `SDP` + ## WHIP + WebRTC-HTTP ingestion protocol 用于WebRTC交换SDP信息的规范 diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..6d539c6 --- /dev/null +++ b/config.yaml @@ -0,0 +1,24 @@ +# 支持零配置启动,即无需填写配置默认启用所有的插件。 +# 只需要填写需要修改的配置项即可。不需要将所有的配置都填写进来!!。 +# 全局配置参考 https://m7s.live/guide/config.html +# 插件配置参考各个插件的文档 +# 插件都有一个enable配置,如果为false则不启用该插件,默认为true即不需要配置。 + +# global: +# console: +# secret: "ab0f6913670062af4d2f15c621205178" +# http: +# listenaddrtls: :8081 +# certfile: monibuca.com.pem +# keyfile: monibuca.com.key +webrtc: + iceservers: [ ] + publicip: [ '192.168.1.119' ] # 可以是数组也可以是字符串(内部自动转成数组) + portmin: 10000 + portmax: 20000 + inviteportfixed: false # 设备将流发送的端口,是否固定 on 发送流到多路复用端口 如9000 off 自动从 mix_port - max_port 之间的值中 选一个可以用的端口 + iceudpmux: 9000 # 接收设备端rtp流的多路复用端口 + pli: 2000000000 # 2s +hls: + fragment: 3s # TS分片长度 + window: 3 # 实时流m3u8文件包含的TS文件数 \ No newline at end of file diff --git a/datachannelh265.go b/datachannelh265.go new file mode 100644 index 0000000..ca120f4 --- /dev/null +++ b/datachannelh265.go @@ -0,0 +1,150 @@ +package webrtc + +import ( + "bytes" + "encoding/binary" + "errors" + "strconv" + // "fmt" + . "github.com/pion/webrtc/v3" +) + +// H265 +// https://zhuanlan.zhihu.com/p/458497037 +const ( + NALU_H265_VPS = 0x4001 + NALU_H265_SPS = 0x4201 + NALU_H265_PPS = 0x4401 + NALU_H265_SEI = 0x4e01 + NALU_H265_IFRAME = 0x2601 + NALU_H265_PFRAME = 0x0201 + HEVC_NAL_TRAIL_N = 0 + HEVC_NAL_TRAIL_R = 1 + HEVC_NAL_TSA_N = 2 + HEVC_NAL_TSA_R = 3 + HEVC_NAL_STSA_N = 4 + HEVC_NAL_STSA_R = 5 + HEVC_NAL_BLA_W_LP = 16 + HEVC_NAL_BLA_W_RADL = 17 + HEVC_NAL_BLA_N_LP = 18 + HEVC_NAL_IDR_W_RADL = 19 + HEVC_NAL_IDR_N_LP = 20 + HEVC_NAL_CRA_NUT = 21 + HEVC_NAL_RADL_N = 6 + HEVC_NAL_RADL_R = 7 + HEVC_NAL_RASL_N = 8 + HEVC_NAL_RASL_R = 9 + MAXPACKETSIZE = 65536 +) + +func SendH265FrameData(dc *DataChannel, data []byte, timestamp int64) { + if len(data) > 4 && dc != nil && dc.ReadyState() == DataChannelStateOpen { + var frametypestr string + glength := len(data) + count := glength / MAXPACKETSIZE + rem := glength % MAXPACKETSIZE + packets := count + if rem != 0 { + packets++ + } + temptype, frametype, err := GetFrameType(data) + if err != nil { + + } else { + frametypestr, err = GetFrameTypeName(frametype) + } + + startstr := "h265 start ,FrameType:" + frametypestr + ",nalutype:" + strconv.Itoa(int(temptype)) + ",pts:" + strconv.FormatInt(timestamp, 10) + ",Packetslen:" + strconv.Itoa(glength) + ",packets:" + strconv.Itoa(packets) + ",rem:" + strconv.Itoa(rem) + + _ = dc.SendText(startstr) + i := 0 + for i = 0; i < count; i++ { + length := i * MAXPACKETSIZE + _ = dc.Send(data[length : length+MAXPACKETSIZE]) + } + if rem != 0 { + _ = dc.Send(data[glength-rem : glength]) + } + _ = dc.SendText("h265 end") + } +} + +func GetFrameType(pdata []byte) (uint8, uint16, error) { + var frametype uint16 + + destcount := 0 + if FindStartCode2(pdata) { + destcount = 3 + } else if FindStartCode3(pdata) { + destcount = 4 + } else { + return 0, 0, errors.New("not find") + } + temptype := (pdata[destcount] & 0x7E) >> 1 + bytesBuffer := bytes.NewBuffer(pdata[destcount : destcount+2]) + binary.Read(bytesBuffer, binary.BigEndian, &frametype) + return temptype, frametype, nil +} + +func GetFrameTypeName(frametype uint16) (string, error) { + switch frametype { + case NALU_H265_VPS: + return "H265_FRAME_VPS", nil + case NALU_H265_SPS: + return "H265_FRAME_SPS", nil + case NALU_H265_PPS: + return "H265_FRAME_PPS", nil + case NALU_H265_SEI: + return "H265_FRAME_SEI", nil + case NALU_H265_IFRAME: + return "H265_FRAME_I", nil + case NALU_H265_PFRAME: + return "H265_FRAME_P", nil + default: + return "", errors.New("frametype unsupport") + } +} + +func FindStartCode2(Buf []byte) bool { + if Buf[0] != 0 || Buf[1] != 0 || Buf[2] != 1 { + return false //判断是否为0x000001,如果是返回1 + } else { + return true + } +} + +func FindStartCode3(Buf []byte) bool { + if Buf[0] != 0 || Buf[1] != 0 || Buf[2] != 0 || Buf[3] != 1 { + return false //判断是否为0x00000001,如果是返回1 + } else { + return true + } +} + +func Add3ZoneOne(h265frame []byte) []byte { + var hBuf = [4]byte{0, 0, 0, 1} + var data []byte + for i := range hBuf { + data = append(data, hBuf[i]) + } + for i := range h265frame { + data = append(data, h265frame[i]) + } + return data +} + +func AddBufs(A []byte, B []byte) []byte { + var data []byte + for i := range A { + data = append(data, A[i]) + } + for i := range B { + data = append(data, B[i]) + } + return data +} + +type WebRtcReturn struct { + SessionDescription + IsH265 bool `json:"isH265"` +} diff --git a/extend/extend.go b/extend/extend.go new file mode 100644 index 0000000..89a18d1 --- /dev/null +++ b/extend/extend.go @@ -0,0 +1,50 @@ +package extend + +import ( + "context" + "m7s.live/engine/v4/codec" + "m7s.live/engine/v4/common" + "m7s.live/engine/v4/track" + "m7s.live/engine/v4/util" + "net" +) + +func ReadRing(vt *track.Video) *common.RingBuffer[common.AVFrame] { + return util.Clone(vt.Media.RingBuffer) +} + +func Read(r *common.RingBuffer[common.AVFrame], ctx context.Context) (item *common.AVFrame) { + for item = &r.Value; ctx.Err() == nil && !item.CanRead; { + } + return +} + +// PlayFullAnnexB 订阅annex-b格式的流数据,每一个I帧增加sps、pps头 +func PlayFullAnnexB(vt *track.Video, ctx context.Context, onMedia func(net.Buffers) error) error { + for vr := ReadRing(vt); ctx.Err() == nil; vr.MoveNext() { + vp := Read(vr, ctx) + var data net.Buffers + if vp.IFrame { + for _, nalu := range vt.ParamaterSets { + data = append(data, codec.NALU_Delimiter2, nalu) + } + } + data = append(data, codec.NALU_Delimiter2) + + i := 0 + vp.AUList.Range(func(au *util.BLL) bool { + if i > 0 { + data = append(data, codec.NALU_Delimiter1, au.ToBytes()) + } else { + data = append(data, au.ToBuffers()...) + } + i++ + return true + }) + if err := onMedia(data); err != nil { + // TODO: log err + return err + } + } + return ctx.Err() +} diff --git a/main.go b/main.go index e627d98..47ecfa2 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,7 @@ package webrtc import ( - "io/ioutil" + "io" "net" "net/http" "regexp" @@ -102,7 +102,7 @@ func (conf *WebRTCConfig) OnEvent(event any) { func (conf *WebRTCConfig) Play_(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/sdp") streamPath := r.URL.Path[len("/webrtc/play/"):] - bytes, err := ioutil.ReadAll(r.Body) + bytes, err := io.ReadAll(r.Body) var suber WebRTCSubscriber suber.SDP = string(bytes) if suber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{}); err != nil { @@ -132,7 +132,7 @@ func (conf *WebRTCConfig) Play_(w http.ResponseWriter, r *http.Request) { func (conf *WebRTCConfig) Push_(w http.ResponseWriter, r *http.Request) { streamPath := r.URL.Path[len("/webrtc/push/"):] w.Header().Set("Content-Type", "application/sdp") - bytes, err := ioutil.ReadAll(r.Body) + bytes, err := io.ReadAll(r.Body) var puber WebRTCPublisher puber.SDP = string(bytes) if puber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{}); err != nil { @@ -178,7 +178,7 @@ var WebRTCPlugin = engine.InstallPlugin(webrtcConfig) func (conf *WebRTCConfig) Batch(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/sdp") - bytes, err := ioutil.ReadAll(r.Body) + bytes, err := io.ReadAll(r.Body) var suber WebRTCBatcher suber.SDP = string(bytes) if suber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{}); err != nil { diff --git a/subscriberPro.go b/subscriberPro.go new file mode 100644 index 0000000..cf90433 --- /dev/null +++ b/subscriberPro.go @@ -0,0 +1,130 @@ +package webrtc + +import ( + "fmt" + "github.com/pion/rtcp" + . "github.com/pion/webrtc/v3" + . "m7s.live/engine/v4" + "m7s.live/engine/v4/codec" + "m7s.live/engine/v4/track" + "m7s.live/engine/v4/util" + "m7s.live/plugin/webrtc/v4/extend" + "net" + "strings" + "time" +) + +type WebRTCSubscriberPro struct { + Subscriber + WebRTCIO + videoTrack *TrackLocalStaticRTP + audioTrack *TrackLocalStaticRTP + isH265 bool +} + +func (suber *WebRTCSubscriberPro) OnEvent(event any) { + switch v := event.(type) { + case *track.Video: + if v.CodecID == codec.CodecID_H264 { + suber.isH265 = false + pli := "42001f" + //pli = fmt.Sprintf("%x", v.GetDecoderConfiguration().Raw[0][1:4]) + if !strings.Contains(suber.SDP, pli) { + pli = reg_level.FindAllStringSubmatch(suber.SDP, -1)[0][1] + } + suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, "video", "m7s") + rtpSender, _ := suber.PeerConnection.AddTrack(suber.videoTrack) + go func() { + rtcpBuf := make([]byte, 1500) + for { + if n, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil { + return + } else { + if p, err := rtcp.Unmarshal(rtcpBuf[:n]); err == nil { + for _, pp := range p { + switch pp.(type) { + case *rtcp.PictureLossIndication: + // fmt.Println("PictureLossIndication") + } + } + } + } + } + }() + suber.Subscriber.AddTrack(v) //接受这个track + } + start := time.Now().UnixMilli() + var rtcDc *DataChannel + if v.CodecID == codec.CodecID_H265 { + suber.isH265 = true + nInSendH265Track := 0 + suber.PeerConnection.OnDataChannel(func(dc *DataChannel) { + rtcDc = dc + rtcDc.OnOpen(func() { + annexB := v.GetAnnexB() + var h265frame []byte + for _, p := range annexB { //拼接消息头 + h265frame = AddBufs(h265frame, p) + } + va := v.IDRing.Value + va.AUList.Range(func(au *util.BLL) bool { + packets := au.ToBuffers() + for _, packet := range packets { + h265frame = AddBufs(h265frame, Add3ZoneOne(packet)) + } + return true + }) + SendH265FrameData(rtcDc, h265frame, va.Timestamp.UnixMilli()-start) + }) + rtcDc.OnMessage(func(msg DataChannelMessage) { + msg_ := string(msg.Data) + fmt.Println(msg_) + }) + rtcDc.OnClose(func() { + nInSendH265Track-- + }) + }) + go extend.PlayFullAnnexB(v, suber.IO, func(frame net.Buffers) error { + var h265frame []byte + for _, packet := range frame { + if len(h265frame) == 0 { + h265frame = packet + } else { + h265frame = AddBufs(h265frame, packet) + } + } + timestamp := time.Now().UnixMilli() + SendH265FrameData(rtcDc, h265frame, timestamp-start) + return nil + }) + } + case *track.Audio: + audioMimeType := MimeTypePCMA + if v.CodecID == codec.CodecID_PCMU { + audioMimeType = MimeTypePCMU + } + if v.CodecID == codec.CodecID_PCMA || v.CodecID == codec.CodecID_PCMU { + suber.audioTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, "audio", "m7s") + suber.PeerConnection.AddTrack(suber.audioTrack) + suber.Subscriber.AddTrack(v) //接受这个track + } + case VideoRTP: + suber.videoTrack.WriteRTP(&v.Packet) + case AudioRTP: + suber.audioTrack.WriteRTP(&v.Packet) + case ISubscriber: + suber.OnConnectionStateChange(func(pcs PeerConnectionState) { + suber.Info("Connection State has changed:" + pcs.String()) + switch pcs { + case PeerConnectionStateConnected: + suber.Info("Connection State has changed:") + go suber.PlayRTP() + case PeerConnectionStateDisconnected, PeerConnectionStateFailed: + suber.Stop() + suber.PeerConnection.Close() + } + }) + default: + suber.Subscriber.OnEvent(event) + } +} diff --git a/test/test.go b/test/test.go new file mode 100644 index 0000000..53146c2 --- /dev/null +++ b/test/test.go @@ -0,0 +1,30 @@ +package main + +import ( + "context" + "m7s.live/engine/v4" + + //_ "m7s.live/plugin/hdl/v4" + _ "m7s.live/plugin/hls/v4" + _ "m7s.live/plugin/hook/v4" + //_ "m7s.live/plugin/jessica/v4" + //_ "m7s.live/plugin/logrotate/v4" + //_ "m7s.live/plugin/preview/v4" + //_ "m7s.live/plugin/record/v4" + _ "m7s.live/plugin/rtmp/v4" + _ "m7s.live/plugin/rtsp/v4" + //_ "m7s.live/plugin/snap/v4" + _ "m7s.live/plugin/webrtc/v4" +) + +// engine Jun 25, 2022 +// webrtc Sep 24, 2022 +// rtmp Sep 24, 2022 +// rtsp Sep 6, 2022 +// go build -ldflags "-w -s" -o build\ test/test.go +func main() { + err := engine.Run(context.Background(), "config.yaml") + if err != nil { + return + } +} diff --git a/webrtcV3.go b/webrtcV3.go new file mode 100644 index 0000000..f8fb06e --- /dev/null +++ b/webrtcV3.go @@ -0,0 +1,71 @@ +package webrtc + +import ( + "encoding/json" + . "github.com/pion/webrtc/v3" + "io" + "net/http" +) + +func (conf *WebRTCConfig) PlayV3_(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Max-Age", "86400") + w.Header().Set("Access-Control-Allow-Methods", "*") + w.Header().Set("Access-Control-Allow-Headers", "Origin, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization") + w.Header().Set("Access-Control-Expose-Headers", "*") + w.Header().Set("Access-Control-Allow-Credentials", "true") + w.Header().Set("Content-Type", "application/json") + streamPath := r.URL.Path[len("/webrtc/playv3/"):] + bytes, err := io.ReadAll(r.Body) + var suber WebRTCSubscriberPro + var offer SessionDescription + if err = json.Unmarshal(bytes, &offer); err != nil { + return + } + suber.SDP = offer.SDP + if suber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{}); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + suber.OnICECandidate(func(ice *ICECandidate) { + if ice != nil { + suber.Info(ice.ToJSON().Candidate) + } + }) + if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: suber.SDP}); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if err = WebRTCPlugin.Subscribe(streamPath, &suber); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if sdp, err := suber.GetAnswerV3(); err == nil { + ret := WebRtcReturn{} + _ = json.Unmarshal(sdp, &ret) + ret.IsH265 = suber.isH265 + byt, _ := json.Marshal(ret) + _, _ = w.Write(byt) + // w.Write(sdp) + } else { + http.Error(w, err.Error(), http.StatusBadRequest) + } +} + +func (IO *WebRTCIO) GetAnswerV3() ([]byte, error) { + // Sets the LocalDescription, and starts our UDP listeners + answer, err := IO.CreateAnswer(nil) + if err != nil { + return nil, err + } + gatherComplete := GatheringCompletePromise(IO.PeerConnection) + if err := IO.SetLocalDescription(answer); err != nil { + return nil, err + } + <-gatherComplete + + if bytes, err := json.Marshal(IO.LocalDescription()); err != nil { + return bytes, err + } else { + return bytes, nil + } +}