From 2cb81675b010bb04288efd0bb2c23b3218e90efc Mon Sep 17 00:00:00 2001 From: fajiao <1519100073@qq.com> Date: Tue, 14 Mar 2023 11:21:20 +0800 Subject: [PATCH] update scaffold and serivce --- cmd/run.go | 82 ++++++++++++++++ config/config.dev.yaml | 7 +- go.mod | 7 ++ go.sum | 22 ++++- internal/app/ffmpeg/service.go | 73 +++++++------- internal/pkg/config/config.go | 7 +- internal/pkg/global/global.go | 61 +++--------- internal/pkg/result/result.go | 41 +++++--- internal/pkg/server/ffmpegServer/baseCmd.go | 48 +++++++++ .../{ffmpegCmd.go => toHlsCmd.go} | 98 ++++++------------- internal/pkg/server/systemServer/config.go | 11 +++ internal/pkg/server/systemServer/payload.go | 80 +++++++++++++++ .../pkg/server/systemServer/payload_test.go | 15 +++ internal/pkg/util/channel.go | 47 +++++++++ main.go | 4 +- middleware/interceptor.go | 21 +++- 16 files changed, 451 insertions(+), 173 deletions(-) create mode 100644 cmd/run.go create mode 100644 internal/pkg/server/ffmpegServer/baseCmd.go rename internal/pkg/server/ffmpegServer/{ffmpegCmd.go => toHlsCmd.go} (62%) create mode 100644 internal/pkg/server/systemServer/config.go create mode 100644 internal/pkg/server/systemServer/payload.go create mode 100644 internal/pkg/server/systemServer/payload_test.go create mode 100644 internal/pkg/util/channel.go diff --git a/cmd/run.go b/cmd/run.go new file mode 100644 index 0000000..95488f3 --- /dev/null +++ b/cmd/run.go @@ -0,0 +1,82 @@ +package cmd + +import ( + "context" + "fmt" + "net/http" + "path/filepath" + "ycmediakit/internal/app/ffmpeg" + "ycmediakit/internal/pkg/config" + "ycmediakit/internal/pkg/global" + "ycmediakit/internal/pkg/server/systemServer" + "ycmediakit/internal/pkg/util" + "ycmediakit/middleware" + + "github.com/gin-gonic/gin" + "go.uber.org/zap" +) + +var ( + rCfg *config.RunConfig + aCfg *config.AppConfig + log *zap.Logger + ctx context.Context + engine *gin.Engine +) + +var ( + payloadServer *systemServer.PayloadServer +) + +func init() { + initAttr() + initServer() + initEngine() +} + +func initAttr() { + rCfg = global.RunConfig + aCfg = global.AppConfig + log = global.Logger +} + +func initServer() { + payloadServer = global.PayloadServer +} + +func initEngine() { + if !rCfg.Debug { + gin.SetMode(gin.ReleaseMode) + } + engine = gin.New() + engine.Use(middleware.Cors()) + engine.Use(middleware.GinLogger(log), middleware.GinRecovery(log, true)) + engine.Use(middleware.HttpInterceptor()) + engineBindFS() + engineBindRouters() +} + +func engineBindFS() { + fCfg := &aCfg.Ffmpeg + hlsPath := filepath.Join(util.GetWorkPath(), fCfg.ToHls.HlsRoot) + _, _ = util.CheckDir(hlsPath) + engine.StaticFS("/"+fCfg.ToHls.HlsRoot, http.Dir(hlsPath)) +} + +func engineBindRouters() { + group := engine.Group("/ffmpeg") + group.Use(middleware.ProbePayloadInterceptor(payloadServer)) + ffmpeg.BindRouters(group) +} + +func Run(ctx_ context.Context) { + ctx = ctx_ + port := aCfg.Server.Port + runMsg := "Listening and serving HTTP on :" + port + fmt.Printf(runMsg + "\n") + log.Info(runMsg) + if err := engine.Run(":" + port); err != nil { + log.Error(err.Error()) + return + } +} diff --git a/config/config.dev.yaml b/config/config.dev.yaml index 001941c..8e3b406 100644 --- a/config/config.dev.yaml +++ b/config/config.dev.yaml @@ -1,8 +1,13 @@ server: port: 8888 +system: + payload: + maxRetryTime: 3 + maxCpuUsage: 95.0 + maxMemoryUsage: 95.0 ffmpeg: timeout: 5000 - ToHls: + toHls: base: encodeCodec: h264 profile: baseline diff --git a/go.mod b/go.mod index 71c50a1..d206939 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.18 require ( github.com/gin-gonic/gin v1.9.0 + github.com/shirou/gopsutil/v3 v3.23.2 github.com/spf13/viper v1.15.0 github.com/u2takey/ffmpeg-go v0.4.1 go.uber.org/zap v1.24.0 @@ -16,6 +17,7 @@ require ( github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.11.2 // indirect @@ -25,20 +27,25 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/leodido/go-urn v1.2.1 // indirect + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-isatty v0.0.17 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.0.6 // indirect + github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/spf13/afero v1.9.3 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.4.2 // indirect + github.com/tklauser/go-sysconf v0.3.11 // indirect + github.com/tklauser/numcpus v0.6.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/u2takey/go-utils v0.3.1 // indirect github.com/ugorji/go/codec v1.2.9 // indirect + github.com/yusufpapurcu/wmi v1.2.2 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect diff --git a/go.sum b/go.sum index 3efb3a2..fa90a55 100644 --- a/go.sum +++ b/go.sum @@ -77,6 +77,8 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9 github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= @@ -125,7 +127,9 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -174,6 +178,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= @@ -196,9 +202,13 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= +github.com/shirou/gopsutil/v3 v3.23.2 h1:PAWSuiAszn7IhPMBtXsbSCafej7PqUOvY6YywlQUExU= +github.com/shirou/gopsutil/v3 v3.23.2/go.mod h1:gv0aQw33GLo3pG8SiWKiQrbDzbRY1K80RyZJ7V4Th1M= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/afero v1.9.3 h1:41FoI0fD7OR7mGcKE/aOiLkGreyf8ifIOQmJANWogMk= github.com/spf13/afero v1.9.3/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y= @@ -221,10 +231,15 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8= github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= +github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= +github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI= +github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms= +github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/u2takey/ffmpeg-go v0.4.1 h1:l5ClIwL3N2LaH1zF3xivb3kP2HW95eyG5xhHE1JdZ9Y= @@ -237,6 +252,8 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= +github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -359,6 +376,7 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -380,6 +398,7 @@ golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -387,6 +406,7 @@ golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/internal/app/ffmpeg/service.go b/internal/app/ffmpeg/service.go index 6ae237c..6663108 100644 --- a/internal/app/ffmpeg/service.go +++ b/internal/app/ffmpeg/service.go @@ -1,92 +1,95 @@ package ffmpeg import ( - "github.com/gin-gonic/gin" - "go.uber.org/zap" + "ycmediakit/internal/pkg/global" "ycmediakit/internal/pkg/result" "ycmediakit/internal/pkg/server/ffmpegServer" + + "github.com/gin-gonic/gin" + "go.uber.org/zap" ) var ( - cfg *ffmpegServer.FfmpegConfig - logger *zap.Logger - server *ffmpegServer.ToHlsServer + cfg *ffmpegServer.FfmpegConfig + log *zap.Logger + toHlsServer *ffmpegServer.ToHlsServer ) -func Init(cfg_ *ffmpegServer.FfmpegConfig) { - cfg = cfg_ - logger = zap.L() - server = ffmpegServer.NewToHlsServer(&cfg.ToHls) +func init() { + cfg = &global.AppConfig.Ffmpeg + log = zap.L() + toHlsServer = ffmpegServer.NewToHlsServer(&cfg.ToHls) + global.ToHlsServer = toHlsServer } func StartToHls(c *gin.Context) { target := c.Query("target") streamPath := c.Query("streamPath") - target, ok := server.PrepareUrl(target) + target, ok := ffmpegServer.PrepareUrl(target) if !ok { - result.InvalidParams.WithFlag(false).Failure(c) + result.InvalidParams.WithVoidFV().Failure(c) return } - streamPath, ok = server.PrepareStreamPath(streamPath) + streamPath, ok = ffmpegServer.PrepareStreamPath(streamPath) if !ok { - result.InvalidParams.WithFlag(false).Failure(c) + result.InvalidParams.WithVoidFV().Failure(c) return } - ok = server.Exists(streamPath) + ok = toHlsServer.Exists(streamPath) if ok { - result.Created.WithFlagValue(true, server.BuildHlsPath(streamPath)).Success(c) + result.Created.WithFullFV(toHlsServer.BuildHlsPath(streamPath)).Success(c) return } - err := server.ProbeWithTimeout(target, cfg.Timeout) + err := ffmpegServer.ProbeWithTimeout(target, cfg.Timeout) if err != nil { - logger.Error(err.Error()) - result.Wrong.WithMsg(err.Error()).Error(c) + log.Error(err.Error()) + result.Wrong.WithVoidFV().WithMsg(err.Error()).Error(c) return } - ok = server.Add(target, streamPath, cfg.Timeout, &cfg.ToHls) + ok = toHlsServer.Add(target, streamPath, cfg.Timeout, &cfg.ToHls) if !ok { - result.Ok.WithFlag(false).Failure(c) + result.Ok.WithVoidFV().Failure(c) return } go func() { - _, err = server.RunToHlsCmd(streamPath) + _, err = toHlsServer.RunToHlsCmd(streamPath) if err != nil { - logger.Error(err.Error()) - err = server.DeleteAndStop(streamPath) + log.Error(err.Error()) + err = toHlsServer.DeleteAndStop(streamPath) if err != nil { - logger.Error(err.Error()) + log.Error(err.Error()) } } }() - result.Ok.WithFlagValue(true, server.BuildHlsPath(streamPath)).Success(c) + result.Ok.WithFullFV(toHlsServer.BuildHlsPath(streamPath)).Success(c) } func ExistsToHls(c *gin.Context) { streamPath := c.Query("streamPath") - streamPath, ok := server.PrepareStreamPath(streamPath) + streamPath, ok := ffmpegServer.PrepareStreamPath(streamPath) if !ok { - result.InvalidParams.WithFlag(false).Failure(c) + result.InvalidParams.Failure(c) return } - ok = server.Exists(streamPath) + ok = toHlsServer.Exists(streamPath) result.Ok.WithFlag(ok).Success(c) } func CloseToHls(c *gin.Context) { streamPath := c.Query("streamPath") - streamPath, ok := server.PrepareStreamPath(streamPath) + streamPath, ok := ffmpegServer.PrepareStreamPath(streamPath) if !ok { - result.InvalidParams.WithFlag(false).Failure(c) + result.InvalidParams.Failure(c) return } - ok = server.Exists(streamPath) + ok = toHlsServer.Exists(streamPath) if !ok { - result.NotFound.WithFlag(false).Failure(c) + result.NotFound.Failure(c) return } - err := server.DeleteAndStop(streamPath) + err := toHlsServer.DeleteAndStop(streamPath) if err != nil { - logger.Error(err.Error()) + log.Error(err.Error()) result.Wrong.WithMsg(err.Error()).Error(c) return } @@ -94,5 +97,5 @@ func CloseToHls(c *gin.Context) { } func GetToHlsList(c *gin.Context) { - result.Ok.WithFlagValue(true, server.GetList()).Success(c) + result.Ok.WithData(toHlsServer.GetList()).Success(c) } diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index 8b24989..2e2937a 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -7,6 +7,7 @@ import ( "strings" "ycmediakit/internal/pkg/logger" "ycmediakit/internal/pkg/server/ffmpegServer" + "ycmediakit/internal/pkg/server/systemServer" "ycmediakit/internal/pkg/util" ) @@ -18,6 +19,7 @@ type RunConfig struct { type AppConfig struct { Server ServerConfig `` + System systemServer.SystemConfig `` Ffmpeg ffmpegServer.FfmpegConfig `` } @@ -25,9 +27,6 @@ type ServerConfig struct { Port string } -func init() { -} - func NewRunConfig() *RunConfig { v := viper.New() v.AddConfigPath(filepath.Join(util.GetWorkPath(), "config")) @@ -38,6 +37,7 @@ func NewRunConfig() *RunConfig { } var cfg RunConfig if err := v.Unmarshal(&cfg); err != nil { + log.Fatalf("read config failed: %v", err) } return &cfg } @@ -52,6 +52,7 @@ func NewAppConfig(runConfig *RunConfig) *AppConfig { } var cfg AppConfig if err := v.Unmarshal(&cfg); err != nil { + log.Fatalf("read config failed: %v", err) } return &cfg } diff --git a/internal/pkg/global/global.go b/internal/pkg/global/global.go index 55d82e3..4bc0596 100644 --- a/internal/pkg/global/global.go +++ b/internal/pkg/global/global.go @@ -1,31 +1,29 @@ package global import ( - "context" - "fmt" - "github.com/gin-gonic/gin" - "go.uber.org/zap" - "net/http" - "path/filepath" - "ycmediakit/internal/app/ffmpeg" "ycmediakit/internal/pkg/config" "ycmediakit/internal/pkg/logger" - "ycmediakit/internal/pkg/util" - "ycmediakit/middleware" + "ycmediakit/internal/pkg/server/ffmpegServer" + "ycmediakit/internal/pkg/server/systemServer" + + "go.uber.org/zap" ) var ( RunConfig *config.RunConfig AppConfig *config.AppConfig Logger *zap.Logger - Ctx context.Context - engine *gin.Engine +) + +var ( + PayloadServer *systemServer.PayloadServer + ToHlsServer *ffmpegServer.ToHlsServer ) func init() { initConfig() initLogger() - initEngine() + initServer() } func initConfig() { @@ -45,41 +43,6 @@ func initLogger() { zap.ReplaceGlobals(Logger) // 替换 zap 库中全局变量,可以直接通过 zap.L() 访问 } -func initEngine() { - if !RunConfig.Debug { - gin.SetMode(gin.ReleaseMode) - } - engine = gin.New() - engine.Use(middleware.Cors()) - engine.Use(middleware.GinLogger(Logger), middleware.GinRecovery(Logger, true)) - engine.Use(middleware.HttpInterceptor()) - engineBindFS() - engineBindRouters() -} - -func engineBindFS() { - fCfg := &AppConfig.Ffmpeg - hlsPath := filepath.Join(util.GetWorkPath(), fCfg.ToHls.HlsRoot) - _, _ = util.CheckDir(hlsPath) - engine.StaticFS("/"+fCfg.ToHls.HlsRoot, http.Dir(hlsPath)) -} - -func engineBindRouters() { - fCfg := &AppConfig.Ffmpeg - group := engine.Group("/ffmpeg") - group.Use(middleware.ProbePayloadInterceptor()) - ffmpeg.Init(fCfg) - ffmpeg.BindRouters(group) -} - -func Run(ctx context.Context) { - Ctx = ctx - port := AppConfig.Server.Port - runMsg := "Listening and serving HTTP on :" + port - fmt.Printf(runMsg + "\n") - Logger.Info(runMsg) - if err := engine.Run(":" + port); err != nil { - Logger.Error(err.Error()) - return - } +func initServer() { + PayloadServer = systemServer.NewPayloadServer(&AppConfig.System.Payload) } diff --git a/internal/pkg/result/result.go b/internal/pkg/result/result.go index 1a3b176..2ac9c97 100644 --- a/internal/pkg/result/result.go +++ b/internal/pkg/result/result.go @@ -9,13 +9,13 @@ import ( var ( Ok = NewResponse(WithCode(200), WithMsg("success")) - Created = NewResponse(WithCode(201), WithMsg("created")) - InvalidParams = NewResponse(WithCode(400), WithMsg("invalid params")) - Unauthorized = NewResponse(WithCode(401), WithMsg("unauthorized")) - Forbidden = NewResponse(WithCode(403), WithMsg("forbidden")) - NotFound = NewResponse(WithCode(404), WithMsg("not found")) - TooManyRequests = NewResponse(WithCode(429), WithMsg("too many requests")) - Wrong = NewResponse(WithCode(500), WithMsg("something wrong")) + Created = NewResponse(WithCode(201), WithFlag(true), WithMsg("created")) + InvalidParams = NewResponse(WithCode(400), WithFlag(false), WithMsg("invalid params")) + Unauthorized = NewResponse(WithCode(401), WithFlag(false), WithMsg("unauthorized")) + Forbidden = NewResponse(WithCode(403), WithFlag(false), WithMsg("forbidden")) + NotFound = NewResponse(WithCode(404), WithFlag(false), WithMsg("not found")) + TooManyRequests = NewResponse(WithCode(429), WithFlag(false), WithMsg("too many requests")) + Wrong = NewResponse(WithCode(500), WithFlag(false), WithMsg("something wrong")) ) type Response struct { @@ -49,18 +49,23 @@ func (r *Response) WithData(data interface{}) *Response { return r } -func (r *Response) WithFlag(flag interface{}) *Response { +func (r *Response) WithFlag(flag bool) *Response { r.Data = gin.H{"flag": flag} return r } -func (r *Response) WithValue(value interface{}) *Response { - r.Data = gin.H{"value": value} +func (r *Response) WithFlagValue(flag bool, value interface{}) *Response { + r.Data = gin.H{"flag": flag, "value": value} return r } -func (r *Response) WithFlagValue(flag interface{}, value interface{}) *Response { - r.Data = gin.H{"flag": flag, "value": value} +func (r *Response) WithVoidFV() *Response { + r.Data = gin.H{"flag": false, "value": nil} + return r +} + +func (r *Response) WithFullFV(value interface{}) *Response { + r.Data = gin.H{"flag": true, "value": value} return r } @@ -82,6 +87,18 @@ func WithData(data interface{}) ResponseOption { } } +func WithFlag(flag bool) ResponseOption { + return func(resp *Response) { + resp.Data = gin.H{"flag": flag} + } +} + +func WithFlagValue(flag bool, value interface{}) ResponseOption { + return func(resp *Response) { + resp.Data = gin.H{"flag": flag, "value": value} + } +} + func (r *Response) Response(c *gin.Context, httpCode int) { c.JSON(httpCode, &r) } diff --git a/internal/pkg/server/ffmpegServer/baseCmd.go b/internal/pkg/server/ffmpegServer/baseCmd.go new file mode 100644 index 0000000..30b1ca1 --- /dev/null +++ b/internal/pkg/server/ffmpegServer/baseCmd.go @@ -0,0 +1,48 @@ +package ffmpegServer + +import ( + "context" + "errors" + ffmpegCmd "github.com/u2takey/ffmpeg-go" + "os/exec" + "path/filepath" + "strings" +) + +func PrepareUrl(url string) (string, bool) { + if url == "" { + return "", false + } + urlArr := strings.Split(url, "?") + if len(urlArr) > 2 { + return "", false + } + url = urlArr[0] + return url, true +} +func PrepareStreamPath(streamPath string) (string, bool) { + streamArr := strings.Split(streamPath, "/") + if len(streamArr) != 2 { + return "", false + } + streamPath = filepath.Join(streamArr...) + return streamPath, true +} + +func ProbeWithTimeout(url string, timeoutMs int) error { + args := ffmpegCmd.ConvertKwargsToCmdLineArgs(ffmpegCmd.KwArgs{ + "stimeout": timeoutMs * 1000, + "loglevel": "error", + }) + args = append(args, url) + ctx := context.Background() + cmd := exec.CommandContext(ctx, "ffprobe", args...) + errBuf := new(strings.Builder) + cmd.Stderr = errBuf + err := cmd.Run() + if err != nil { + errStr := strings.TrimSpace(errBuf.String()) + return errors.New(errStr) + } + return nil +} diff --git a/internal/pkg/server/ffmpegServer/ffmpegCmd.go b/internal/pkg/server/ffmpegServer/toHlsCmd.go similarity index 62% rename from internal/pkg/server/ffmpegServer/ffmpegCmd.go rename to internal/pkg/server/ffmpegServer/toHlsCmd.go index c00a391..d2d31da 100644 --- a/internal/pkg/server/ffmpegServer/ffmpegCmd.go +++ b/internal/pkg/server/ffmpegServer/toHlsCmd.go @@ -1,31 +1,30 @@ package ffmpegServer import ( - "context" "errors" "fmt" - ffmpegCmd "github.com/u2takey/ffmpeg-go" - "go.uber.org/zap" "os/exec" "path/filepath" "sort" "strings" "sync" "ycmediakit/internal/pkg/util" + + ffmpegCmd "github.com/u2takey/ffmpeg-go" + "go.uber.org/zap" ) -type FfmpegCmdServer struct { - m sync.Mutex - cmdMap map[string]*CmdBaseInfo -} +var ( + log *zap.Logger +) -type CmdBaseInfo struct { - CmdBaseConfig +func init() { + log = zap.L() } type ToHlsServer struct { - hlsCfg *ToHlsConfig - lock sync.Mutex + cfg *ToHlsConfig + m sync.Mutex cmdMap map[string]*ToHlsInfo } @@ -38,19 +37,19 @@ type ToHlsInfo struct { func NewToHlsServer(hlsCfg *ToHlsConfig) *ToHlsServer { return &ToHlsServer{ - hlsCfg: hlsCfg, + cfg: hlsCfg, cmdMap: make(map[string]*ToHlsInfo), } } func (server *ToHlsServer) Add(url string, streamPath string, timeoutMs int, curHlsCfg *ToHlsConfig) bool { - server.lock.Lock() - defer server.lock.Unlock() + server.m.Lock() + defer server.m.Unlock() _, ok := server.cmdMap[streamPath] if ok { return true } - // merge curHlsCfg, hlsCfg + // merge curHlsCfg, cfg cmd := server.BuildToHlsCmd(url, streamPath, timeoutMs, curHlsCfg) info := &ToHlsInfo{ ToHlsConfig: *curHlsCfg, @@ -63,27 +62,27 @@ func (server *ToHlsServer) Add(url string, streamPath string, timeoutMs int, cur } func (server *ToHlsServer) Exists(streamPath string) bool { - server.lock.Lock() - defer server.lock.Unlock() + server.m.Lock() + defer server.m.Unlock() _, ok := server.cmdMap[streamPath] return ok } func (server *ToHlsServer) Delete(streamPath string) bool { - server.lock.Lock() - defer server.lock.Unlock() + server.m.Lock() + defer server.m.Unlock() delete(server.cmdMap, streamPath) - path := filepath.Join(util.GetWorkPath(), server.hlsCfg.HlsRoot, streamPath, "..") + path := filepath.Join(util.GetWorkPath(), server.cfg.HlsRoot, streamPath, "..") err := util.RemoveDir(path) if err != nil { - zap.L().Error(err.Error()) + log.Error(err.Error()) } return true } func (server *ToHlsServer) GetList() []string { - server.lock.Lock() - defer server.lock.Unlock() + server.m.Lock() + defer server.m.Unlock() var slice []string for _, elm := range server.cmdMap { slice = append(slice, elm.streamPath) @@ -93,65 +92,26 @@ func (server *ToHlsServer) GetList() []string { } func (server *ToHlsServer) DeleteAndStop(streamPath string) error { - server.lock.Lock() - defer server.lock.Unlock() + server.m.Lock() + defer server.m.Unlock() delete(server.cmdMap, streamPath) _, err := server.StopToHlsCmd(streamPath) if err != nil { return err } - path := filepath.Join(util.GetWorkPath(), server.hlsCfg.HlsRoot, streamPath) + path := filepath.Join(util.GetWorkPath(), server.cfg.HlsRoot, streamPath) err = util.RemoveDir(path) if err != nil { - zap.L().Error(err.Error()) - } - return nil -} - -func (server *ToHlsServer) PrepareUrl(url string) (string, bool) { - if url == "" { - return "", false - } - urlArr := strings.Split(url, "?") - if len(urlArr) > 2 { - return "", false - } - url = urlArr[0] - return url, true -} - -func (server *ToHlsServer) PrepareStreamPath(streamPath string) (string, bool) { - streamArr := strings.Split(streamPath, "/") - if len(streamArr) != 2 { - return "", false - } - streamPath = filepath.Join(streamArr...) - return streamPath, true -} - -func (server *ToHlsServer) ProbeWithTimeout(url string, timeoutMs int) error { - args := ffmpegCmd.ConvertKwargsToCmdLineArgs(ffmpegCmd.KwArgs{ - "stimeout": timeoutMs * 1000, - "loglevel": "error", - }) - args = append(args, url) - ctx := context.Background() - cmd := exec.CommandContext(ctx, "ffprobe", args...) - errBuf := new(strings.Builder) - cmd.Stderr = errBuf - err := cmd.Run() - if err != nil { - errStr := strings.TrimSpace(errBuf.String()) - return errors.New(errStr) + log.Error(err.Error()) } return nil } func (server *ToHlsServer) BuildToHlsCmd(url string, streamPath string, timeoutMs int, hlsCfg *ToHlsConfig) *exec.Cmd { - path := filepath.Join(util.GetWorkPath(), server.hlsCfg.HlsRoot, streamPath) + path := filepath.Join(util.GetWorkPath(), server.cfg.HlsRoot, streamPath) _, err := util.CheckDir(path) if err != nil { - zap.L().Error(err.Error()) + log.Error(err.Error()) } path = filepath.Join(path, hlsCfg.HlsName+".m3u8") buffer := new(strings.Builder) @@ -200,5 +160,5 @@ func (server *ToHlsServer) StopToHlsCmd(streamPath string) (bool, error) { } func (server *ToHlsServer) BuildHlsPath(streamPath string) string { - return filepath.Join(server.hlsCfg.HlsRoot, streamPath, server.hlsCfg.HlsName+".m3u8") + return filepath.Join(server.cfg.HlsRoot, streamPath, server.cfg.HlsName+".m3u8") } diff --git a/internal/pkg/server/systemServer/config.go b/internal/pkg/server/systemServer/config.go new file mode 100644 index 0000000..9ab39e0 --- /dev/null +++ b/internal/pkg/server/systemServer/config.go @@ -0,0 +1,11 @@ +package systemServer + +type SystemConfig struct { + Payload PayloadConfig +} + +type PayloadConfig struct { + MaxRetryTime int + MaxCpuUsage float32 + MaxMemoryUsage float32 +} diff --git a/internal/pkg/server/systemServer/payload.go b/internal/pkg/server/systemServer/payload.go new file mode 100644 index 0000000..b4d4860 --- /dev/null +++ b/internal/pkg/server/systemServer/payload.go @@ -0,0 +1,80 @@ +package systemServer + +import ( + "time" + "ycmediakit/internal/pkg/util" + + "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v3/mem" +) + +type PayloadServer struct { + Cfg *PayloadConfig + ref *util.Channel + p Payload +} + +type Payload struct { + Memory struct { + Total uint64 + Free uint64 + Used uint64 + Usage float32 + } + Cpu struct { + Usage float32 + } +} + +func NewPayloadServer(plCfg *PayloadConfig) *PayloadServer { + s := &PayloadServer{ + Cfg: plCfg, + ref: util.NewChannel(), + } + go s.start() + return s +} + +func (ps *PayloadServer) start() { + for range time.Tick(time.Second) { + if ps.running() { + ps.collect() + ps.done() + } + } +} + +// running 是否正在采集数据 +func (ps *PayloadServer) running() bool { + return !ps.ref.IsClosed() +} + +// Wait 等待 +func (ps *PayloadServer) Wait() chan interface{} { + ps.ref.Open() + return ps.ref.C +} + +// done 结束 +func (ps *PayloadServer) done() { + ps.ref.Close() +} + +// collect 采集数据 +func (ps *PayloadServer) collect() { + var p Payload + v, _ := mem.VirtualMemory() + p.Memory.Total = v.Total >> 20 + p.Memory.Free = v.Available >> 20 + p.Memory.Used = v.Used >> 20 + p.Memory.Usage = float32(v.UsedPercent) + if cc, _ := cpu.Percent(time.Second, false); len(cc) > 0 { + p.Cpu.Usage = float32(cc[0]) + } + ps.p = p +} + +// GetPayload 获取采集数据 +func (ps *PayloadServer) GetPayload() Payload { + return ps.p +} diff --git a/internal/pkg/server/systemServer/payload_test.go b/internal/pkg/server/systemServer/payload_test.go new file mode 100644 index 0000000..082ffc7 --- /dev/null +++ b/internal/pkg/server/systemServer/payload_test.go @@ -0,0 +1,15 @@ +package systemServer + +import ( + "fmt" + "testing" +) + +func TestPayloadServer(t *testing.T) { + ps := NewPayloadServer(nil) + for { + <-ps.Wait() + p := ps.GetPayload() + fmt.Println(p) + } +} diff --git a/internal/pkg/util/channel.go b/internal/pkg/util/channel.go new file mode 100644 index 0000000..711216b --- /dev/null +++ b/internal/pkg/util/channel.go @@ -0,0 +1,47 @@ +package util + +import ( + "sync" +) + +type Channel struct { + m sync.Mutex + closed bool + C chan interface{} +} + +func NewChannel() *Channel { + return &Channel{ + C: nil, + closed: true, + } +} + +func (c *Channel) Open() { + c.m.Lock() + defer c.m.Unlock() + if c.isClosed() { + c.C = make(chan interface{}) + c.closed = false + } +} + +func (c *Channel) Close() { + c.m.Lock() + defer c.m.Unlock() + if !c.isClosed() { + close(c.C) + c.C = nil + c.closed = true + } +} + +func (c *Channel) IsClosed() bool { + c.m.Lock() + defer c.m.Unlock() + return c.isClosed() +} + +func (c *Channel) isClosed() bool { + return c.C == nil && c.closed +} diff --git a/main.go b/main.go index b86637b..8df1c04 100644 --- a/main.go +++ b/main.go @@ -18,12 +18,12 @@ The ffmpeg server for Go import ( "context" - "ycmediakit/internal/pkg/global" + "ycmediakit/cmd" "ycmediakit/internal/pkg/util" ) func main() { ctx, cancel := context.WithCancel(context.Background()) go util.WaitTerminal(cancel) - global.Run(ctx) + cmd.Run(ctx) } diff --git a/middleware/interceptor.go b/middleware/interceptor.go index a042ff2..6dc0a1e 100644 --- a/middleware/interceptor.go +++ b/middleware/interceptor.go @@ -2,6 +2,10 @@ package middleware import ( "github.com/gin-gonic/gin" + "strconv" + "strings" + "ycmediakit/internal/pkg/result" + "ycmediakit/internal/pkg/server/systemServer" ) // HttpInterceptor 可自定义鉴权等操作 @@ -14,8 +18,23 @@ func HttpInterceptor() gin.HandlerFunc { // ProbePayloadInterceptor 探测 ffmpeg 命令服务器是否有资源得以运行 // TODO -func ProbePayloadInterceptor() gin.HandlerFunc { +func ProbePayloadInterceptor(ps *systemServer.PayloadServer) gin.HandlerFunc { return func(c *gin.Context) { + cfg := ps.Cfg + if strings.Contains(c.FullPath(), "/start") { + for i := 0; i < cfg.MaxRetryTime; i++ { + <-ps.Wait() + p := ps.GetPayload() + if p.Cpu.Usage <= cfg.MaxCpuUsage && p.Memory.Usage <= cfg.MaxMemoryUsage { + c.Next() + return + } + } + errMsg := "cpu or memory overload, retry time " + strconv.Itoa(cfg.MaxRetryTime) + result.Forbidden.WithMsg(errMsg).Failure(c) + c.Abort() + return + } c.Next() } }