diff --git a/internal/app/ffmpeg/baseService.go b/internal/app/ffmpeg/baseService.go new file mode 100644 index 0000000..22fcfd1 --- /dev/null +++ b/internal/app/ffmpeg/baseService.go @@ -0,0 +1,35 @@ +package ffmpeg + +import ( + "go.uber.org/zap" + "ycmediakit/internal/pkg/global" + "ycmediakit/internal/pkg/result" + "ycmediakit/internal/pkg/server/ffmpegServer" + + "github.com/gin-gonic/gin" +) + +var ( + cfg *ffmpegServer.FfmpegConfig + log *zap.Logger +) + +func init() { + cfg = &global.AppConfig.Ffmpeg + log = zap.L() +} + +func ProbeStream(c *gin.Context) { + target := c.Query("target") + target, ok := ffmpegServer.PrepareUrl(target) + if !ok { + result.InvalidParams.WithVoidData().Failure(c) + return + } + jsonStr, err := ffmpegServer.ProbeStreamsWithTimeout(target, cfg.Timeout) + if err != nil { + result.Wrong.WithVoidData().WithMsg(err.Error()).Error(c) + return + } + result.Ok.WithData(jsonStr).Success(c) +} diff --git a/internal/app/ffmpeg/router.go b/internal/app/ffmpeg/router.go index bbf0acb..5d94480 100644 --- a/internal/app/ffmpeg/router.go +++ b/internal/app/ffmpeg/router.go @@ -3,6 +3,10 @@ package ffmpeg import "github.com/gin-gonic/gin" func BindRouters(group *gin.RouterGroup) { + // BaseService + group.GET("/probeStream", ProbeStream) + + // ToHlsService group.GET("/startToHls", StartToHls) group.GET("/existsToHls", ExistsToHls) group.GET("/closeToHls", CloseToHls) diff --git a/internal/app/ffmpeg/service.go b/internal/app/ffmpeg/toHlsService.go similarity index 94% rename from internal/app/ffmpeg/service.go rename to internal/app/ffmpeg/toHlsService.go index 6663108..3767cc4 100644 --- a/internal/app/ffmpeg/service.go +++ b/internal/app/ffmpeg/toHlsService.go @@ -6,18 +6,13 @@ import ( "ycmediakit/internal/pkg/server/ffmpegServer" "github.com/gin-gonic/gin" - "go.uber.org/zap" ) var ( - cfg *ffmpegServer.FfmpegConfig - log *zap.Logger toHlsServer *ffmpegServer.ToHlsServer ) func init() { - cfg = &global.AppConfig.Ffmpeg - log = zap.L() toHlsServer = ffmpegServer.NewToHlsServer(&cfg.ToHls) global.ToHlsServer = toHlsServer } diff --git a/internal/pkg/global/global.go b/internal/pkg/global/global.go index 4bc0596..fec5782 100644 --- a/internal/pkg/global/global.go +++ b/internal/pkg/global/global.go @@ -1,6 +1,8 @@ package global import ( + "fmt" + "github.com/go-playground/validator/v10" "ycmediakit/internal/pkg/config" "ycmediakit/internal/pkg/logger" "ycmediakit/internal/pkg/server/ffmpegServer" @@ -29,6 +31,15 @@ func init() { func initConfig() { rCfg := config.NewRunConfig() aCfg := config.NewAppConfig(rCfg) + validate := validator.New() + err := validate.Struct(rCfg) + if err != nil { + fmt.Println(err) + } + err = validate.Struct(aCfg) + if err != nil { + fmt.Println(err) + } RunConfig = rCfg AppConfig = aCfg } diff --git a/internal/pkg/result/result.go b/internal/pkg/result/result.go index 2ac9c97..6d89fb7 100644 --- a/internal/pkg/result/result.go +++ b/internal/pkg/result/result.go @@ -34,36 +34,49 @@ func NewResponse(options ...ResponseOption) *Response { return resp } +// WithCode code:code func (r *Response) WithCode(code int) *Response { r.Code = code return r } +// WithMsg msg:msg func (r *Response) WithMsg(msg string) *Response { r.Msg = msg return r } +// WithData data:data func (r *Response) WithData(data interface{}) *Response { r.Data = data return r } +// WithVoidData data:nil +func (r *Response) WithVoidData() *Response { + r.Data = nil + return r +} + +// WithFlag data:{"flag":flag} func (r *Response) WithFlag(flag bool) *Response { r.Data = gin.H{"flag": flag} return r } -func (r *Response) WithFlagValue(flag bool, value interface{}) *Response { +// WithFV data:{"flag":flag, "value":value} +func (r *Response) WithFV(flag bool, value interface{}) *Response { r.Data = gin.H{"flag": flag, "value": value} return r } +// WithVoidFV data:{"flag":false, "value":nil} func (r *Response) WithVoidFV() *Response { r.Data = gin.H{"flag": false, "value": nil} return r } +// WithFullFV data:{"flag":true, "value":value} func (r *Response) WithFullFV(value interface{}) *Response { r.Data = gin.H{"flag": true, "value": value} return r diff --git a/internal/pkg/server/ffmpegServer/baseCmd.go b/internal/pkg/server/ffmpegServer/baseCmd.go index 30b1ca1..6a2c41f 100644 --- a/internal/pkg/server/ffmpegServer/baseCmd.go +++ b/internal/pkg/server/ffmpegServer/baseCmd.go @@ -20,6 +20,7 @@ func PrepareUrl(url string) (string, bool) { url = urlArr[0] return url, true } + func PrepareStreamPath(streamPath string) (string, bool) { streamArr := strings.Split(streamPath, "/") if len(streamArr) != 2 { @@ -31,10 +32,10 @@ func PrepareStreamPath(streamPath string) (string, bool) { func ProbeWithTimeout(url string, timeoutMs int) error { args := ffmpegCmd.ConvertKwargsToCmdLineArgs(ffmpegCmd.KwArgs{ + "i": url, "stimeout": timeoutMs * 1000, "loglevel": "error", }) - args = append(args, url) ctx := context.Background() cmd := exec.CommandContext(ctx, "ffprobe", args...) errBuf := new(strings.Builder) @@ -46,3 +47,25 @@ func ProbeWithTimeout(url string, timeoutMs int) error { } return nil } + +func ProbeStreamsWithTimeout(url string, timeoutMs int) (string, error) { + args := ffmpegCmd.ConvertKwargsToCmdLineArgs(ffmpegCmd.KwArgs{ + "i": url, + "show_streams": "", + "stimeout": timeoutMs * 1000, + "of": "json", + "loglevel": "error", + }) + ctx := context.Background() + cmd := exec.CommandContext(ctx, "ffprobe", args...) + infoBuf, errBuf := new(strings.Builder), new(strings.Builder) + cmd.Stdout = infoBuf + cmd.Stderr = errBuf + err := cmd.Run() + if err != nil { + errStr := strings.TrimSpace(errBuf.String()) + return "", errors.New(errStr) + } + infoStr := strings.TrimSpace(infoBuf.String()) + return infoStr, nil +} diff --git a/internal/pkg/server/ffmpegServer/baseCmd_test.go b/internal/pkg/server/ffmpegServer/baseCmd_test.go new file mode 100644 index 0000000..800d214 --- /dev/null +++ b/internal/pkg/server/ffmpegServer/baseCmd_test.go @@ -0,0 +1,17 @@ +package ffmpegServer + +import ( + "fmt" + "testing" +) + +func TestProbeStreamsWithTimeout(t *testing.T) { + url := "rtsp://admin:hk123456@192.168.1.65:554/Streaming/Channels/101" + timeout := 5000 + str, err := ProbeStreamsWithTimeout(url, timeout) + if err != nil { + fmt.Println(err) + } else { + fmt.Println(str) + } +} diff --git a/internal/pkg/server/systemServer/config.go b/internal/pkg/server/systemServer/config.go index 9ab39e0..45ec674 100644 --- a/internal/pkg/server/systemServer/config.go +++ b/internal/pkg/server/systemServer/config.go @@ -5,7 +5,7 @@ type SystemConfig struct { } type PayloadConfig struct { - MaxRetryTime int - MaxCpuUsage float32 - MaxMemoryUsage float32 + MaxRetryTime int `validate:"min=1,max=5"` + MaxCpuUsage float32 `validate:"min=60,max=100"` + MaxMemoryUsage float32 `validate:"min=60,max=100"` } diff --git a/internal/pkg/server/systemServer/payload_test.go b/internal/pkg/server/systemServer/payload_test.go index 082ffc7..3d016fa 100644 --- a/internal/pkg/server/systemServer/payload_test.go +++ b/internal/pkg/server/systemServer/payload_test.go @@ -3,6 +3,7 @@ package systemServer import ( "fmt" "testing" + "time" ) func TestPayloadServer(t *testing.T) { @@ -13,3 +14,17 @@ func TestPayloadServer(t *testing.T) { fmt.Println(p) } } + +func TestPayloadServerV2(t *testing.T) { + ps := NewPayloadServer(nil) + var p Payload + for i := 0; i < 1000; i++ { + if i%2 == 0 { + time.Sleep(time.Second) + } else { + <-ps.Wait() + } + p = ps.GetPayload() + fmt.Println(p) + } +}