diff --git a/cmd/run.go b/cmd/run.go index 95488f3..5f58f3a 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -8,6 +8,7 @@ import ( "ycmediakit/internal/app/ffmpeg" "ycmediakit/internal/pkg/config" "ycmediakit/internal/pkg/global" + "ycmediakit/internal/pkg/server/ffmpegServer" "ycmediakit/internal/pkg/server/systemServer" "ycmediakit/internal/pkg/util" "ycmediakit/middleware" @@ -26,6 +27,7 @@ var ( var ( payloadServer *systemServer.PayloadServer + toHlsServer *ffmpegServer.ToHlsServer ) func init() { @@ -42,6 +44,7 @@ func initAttr() { func initServer() { payloadServer = global.PayloadServer + toHlsServer = global.ToHlsServer } func initEngine() { @@ -52,21 +55,26 @@ func initEngine() { engine.Use(middleware.Cors()) engine.Use(middleware.GinLogger(log), middleware.GinRecovery(log, true)) engine.Use(middleware.HttpInterceptor()) - engineBindFS() - engineBindRouters() + initFfmpegServices() } -func engineBindFS() { +func initFfmpegServices() { fCfg := &aCfg.Ffmpeg - hlsPath := filepath.Join(util.GetWorkPath(), fCfg.ToHls.HlsRoot) - _, _ = util.CheckDir(hlsPath) - engine.StaticFS("/"+fCfg.ToHls.HlsRoot, http.Dir(hlsPath)) -} + ffmpegGroup := engine.Group("/ffmpeg") + if payloadServer.Cfg.Enable { + ffmpegGroup.Use(middleware.ProbePayloadInterceptor(payloadServer)) + } + ffmpeg.BindRouters(ffmpegGroup) -func engineBindRouters() { - group := engine.Group("/ffmpeg") - group.Use(middleware.ProbePayloadInterceptor(payloadServer)) - ffmpeg.BindRouters(group) + toHlsCfg := &fCfg.ToHls + hlsPath := filepath.Join(util.GetWorkPath(), toHlsCfg.HlsRoot) + _, _ = util.CheckDir(hlsPath) + hlsGroup := engine.Group("/" + toHlsCfg.HlsRoot) + if toHlsServer.Cfg.Keepalive.Enable { + hlsGroup.Use(middleware.KeepAliveToHlsInterceptor(toHlsServer.Supervisor)) + ffmpegGroup.Use(middleware.ProbePayloadInterceptor(payloadServer)) + } + hlsGroup.StaticFS("/", http.Dir(hlsPath)) } func Run(ctx_ context.Context) { diff --git a/config/config.dev.yaml b/config/config.dev.yaml index 8e3b406..e2c9939 100644 --- a/config/config.dev.yaml +++ b/config/config.dev.yaml @@ -2,11 +2,12 @@ server: port: 8888 system: payload: + enable: true maxRetryTime: 3 maxCpuUsage: 95.0 maxMemoryUsage: 95.0 ffmpeg: - timeout: 5000 + timeout: 5000 # millisecond toHls: base: encodeCodec: h264 @@ -16,8 +17,12 @@ ffmpeg: bitrate: 1M vfScale: -1:640 gopSize: 3 - HlsRoot: hls - HlsName: trans + keepalive: + enable: true + pollInterval: 5 # minute + checkInInterval: 1 # minute + hlsRoot: hls + hlsName: trans hlsTime: 3.0 hlsListSize: 3 hlsWarp: 15 diff --git a/internal/app/ffmpeg/baseService.go b/internal/app/ffmpeg/baseService.go index 22fcfd1..099001c 100644 --- a/internal/app/ffmpeg/baseService.go +++ b/internal/app/ffmpeg/baseService.go @@ -1,24 +1,12 @@ package ffmpeg import ( - "go.uber.org/zap" - "ycmediakit/internal/pkg/global" - "ycmediakit/internal/pkg/result" "ycmediakit/internal/pkg/server/ffmpegServer" + "ycmediakit/internal/pkg/unit/result" "github.com/gin-gonic/gin" ) -var ( - cfg *ffmpegServer.FfmpegConfig - log *zap.Logger -) - -func init() { - cfg = &global.AppConfig.Ffmpeg - log = zap.L() -} - func ProbeStream(c *gin.Context) { target := c.Query("target") target, ok := ffmpegServer.PrepareUrl(target) diff --git a/internal/app/ffmpeg/index.go b/internal/app/ffmpeg/index.go new file mode 100644 index 0000000..3a43501 --- /dev/null +++ b/internal/app/ffmpeg/index.go @@ -0,0 +1,21 @@ +package ffmpeg + +import ( + "go.uber.org/zap" + "ycmediakit/internal/pkg/global" + "ycmediakit/internal/pkg/server/ffmpegServer" +) + +var ( + cfg *ffmpegServer.FfmpegConfig + log *zap.Logger + + toHlsServer *ffmpegServer.ToHlsServer +) + +func init() { + cfg = &global.AppConfig.Ffmpeg + log = zap.L() + toHlsServer = ffmpegServer.NewToHlsServer(&cfg.ToHls) + global.ToHlsServer = toHlsServer +} diff --git a/internal/app/ffmpeg/toHlsService.go b/internal/app/ffmpeg/toHlsService.go index 3767cc4..ce69141 100644 --- a/internal/app/ffmpeg/toHlsService.go +++ b/internal/app/ffmpeg/toHlsService.go @@ -1,22 +1,12 @@ package ffmpeg import ( - "ycmediakit/internal/pkg/global" - "ycmediakit/internal/pkg/result" "ycmediakit/internal/pkg/server/ffmpegServer" + "ycmediakit/internal/pkg/unit/result" "github.com/gin-gonic/gin" ) -var ( - toHlsServer *ffmpegServer.ToHlsServer -) - -func init() { - toHlsServer = ffmpegServer.NewToHlsServer(&cfg.ToHls) - global.ToHlsServer = toHlsServer -} - func StartToHls(c *gin.Context) { target := c.Query("target") streamPath := c.Query("streamPath") diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index 2e2937a..b6aae3b 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -1,20 +1,21 @@ package config import ( - "github.com/spf13/viper" "log" "path/filepath" "strings" - "ycmediakit/internal/pkg/logger" "ycmediakit/internal/pkg/server/ffmpegServer" "ycmediakit/internal/pkg/server/systemServer" + "ycmediakit/internal/pkg/unit/logger" "ycmediakit/internal/pkg/util" + + "github.com/spf13/viper" ) type RunConfig struct { Debug bool `` Type string `` - Log logger.LogConfig + Log logger.Config } type AppConfig struct { @@ -42,17 +43,17 @@ func NewRunConfig() *RunConfig { return &cfg } -func NewAppConfig(runConfig *RunConfig) *AppConfig { +func NewAppConfig(cfg *RunConfig) *AppConfig { v := viper.New() v.AddConfigPath(filepath.Join(util.GetWorkPath(), "config")) - v.SetConfigName(strings.Join([]string{"config", ".", runConfig.Type}, "")) + v.SetConfigName(strings.Join([]string{"config", ".", cfg.Type}, "")) v.SetConfigType("yaml") if err := v.ReadInConfig(); err != nil { log.Fatalf("read config failed: %v", err) } - var cfg AppConfig - if err := v.Unmarshal(&cfg); err != nil { + var appCfg AppConfig + if err := v.Unmarshal(&appCfg); err != nil { log.Fatalf("read config failed: %v", err) } - return &cfg + return &appCfg } diff --git a/internal/pkg/global/global.go b/internal/pkg/global/global.go index fec5782..85f1001 100644 --- a/internal/pkg/global/global.go +++ b/internal/pkg/global/global.go @@ -2,12 +2,12 @@ package global import ( "fmt" - "github.com/go-playground/validator/v10" "ycmediakit/internal/pkg/config" - "ycmediakit/internal/pkg/logger" "ycmediakit/internal/pkg/server/ffmpegServer" "ycmediakit/internal/pkg/server/systemServer" + "ycmediakit/internal/pkg/unit/logger" + "github.com/go-playground/validator/v10" "go.uber.org/zap" ) diff --git a/internal/pkg/server/ffmpegServer/baseCmd.go b/internal/pkg/server/ffmpegServer/baseCmd.go index 6a2c41f..9ad6b6e 100644 --- a/internal/pkg/server/ffmpegServer/baseCmd.go +++ b/internal/pkg/server/ffmpegServer/baseCmd.go @@ -5,6 +5,7 @@ import ( "errors" ffmpegCmd "github.com/u2takey/ffmpeg-go" "os/exec" + "path" "path/filepath" "strings" ) @@ -26,10 +27,15 @@ func PrepareStreamPath(streamPath string) (string, bool) { if len(streamArr) != 2 { return "", false } - streamPath = filepath.Join(streamArr...) + streamPath = path.Join(streamArr...) return streamPath, true } +func BuildStreamFilePath(streamPath string) string { + streamArr := strings.Split(streamPath, "/") + return filepath.Join(streamArr...) +} + func ProbeWithTimeout(url string, timeoutMs int) error { args := ffmpegCmd.ConvertKwargsToCmdLineArgs(ffmpegCmd.KwArgs{ "i": url, diff --git a/internal/pkg/server/ffmpegServer/config.go b/internal/pkg/server/ffmpegServer/config.go index 1593606..0a3a32f 100644 --- a/internal/pkg/server/ffmpegServer/config.go +++ b/internal/pkg/server/ffmpegServer/config.go @@ -1,5 +1,7 @@ package ffmpegServer +import "ycmediakit/internal/pkg/unit/keepalive" + type FfmpegConfig struct { Timeout int ToHls ToHlsConfig @@ -18,6 +20,7 @@ type CmdBaseConfig struct { type ToHlsConfig struct { CmdBaseConfig `mapstructure:"base"` + Keepalive keepalive.Config HlsRoot string HlsName string HlsTime string diff --git a/internal/pkg/server/ffmpegServer/toHlsCmd.go b/internal/pkg/server/ffmpegServer/toHlsCmd.go index d2d31da..fa5f653 100644 --- a/internal/pkg/server/ffmpegServer/toHlsCmd.go +++ b/internal/pkg/server/ffmpegServer/toHlsCmd.go @@ -4,10 +4,12 @@ import ( "errors" "fmt" "os/exec" + "path" "path/filepath" "sort" "strings" "sync" + "ycmediakit/internal/pkg/unit/keepalive" "ycmediakit/internal/pkg/util" ffmpegCmd "github.com/u2takey/ffmpeg-go" @@ -23,9 +25,10 @@ func init() { } type ToHlsServer struct { - cfg *ToHlsConfig - m sync.Mutex - cmdMap map[string]*ToHlsInfo + Cfg *ToHlsConfig + m sync.Mutex + cmdMap map[string]*ToHlsInfo + Supervisor *keepalive.Supervisor } type ToHlsInfo struct { @@ -35,92 +38,98 @@ type ToHlsInfo struct { cmd *exec.Cmd } -func NewToHlsServer(hlsCfg *ToHlsConfig) *ToHlsServer { - return &ToHlsServer{ - cfg: hlsCfg, +func NewToHlsServer(cfg *ToHlsConfig) *ToHlsServer { + s := &ToHlsServer{ + Cfg: cfg, cmdMap: make(map[string]*ToHlsInfo), } + if s.Cfg.Keepalive.Enable { + s.Supervisor = keepalive.NewSupervisor(&s.Cfg.Keepalive) + handlePath = "/" + cfg.HlsRoot + s.Supervisor.Register(handlePath, s) + } + return s } -func (server *ToHlsServer) Add(url string, streamPath string, timeoutMs int, curHlsCfg *ToHlsConfig) bool { - server.m.Lock() - defer server.m.Unlock() - _, ok := server.cmdMap[streamPath] +func (s *ToHlsServer) Add(url string, streamPath string, timeoutMs int, curHlsCfg *ToHlsConfig) bool { + s.m.Lock() + defer s.m.Unlock() + _, ok := s.cmdMap[streamPath] if ok { return true } - // merge curHlsCfg, cfg - cmd := server.BuildToHlsCmd(url, streamPath, timeoutMs, curHlsCfg) + // merge curHlsCfg, Cfg + cmd := s.BuildToHlsCmd(url, streamPath, timeoutMs, curHlsCfg) info := &ToHlsInfo{ ToHlsConfig: *curHlsCfg, url: url, streamPath: streamPath, cmd: cmd, } - server.cmdMap[streamPath] = info + s.cmdMap[streamPath] = info return true } -func (server *ToHlsServer) Exists(streamPath string) bool { - server.m.Lock() - defer server.m.Unlock() - _, ok := server.cmdMap[streamPath] +func (s *ToHlsServer) Exists(streamPath string) bool { + s.m.Lock() + defer s.m.Unlock() + _, ok := s.cmdMap[streamPath] return ok } -func (server *ToHlsServer) Delete(streamPath string) bool { - server.m.Lock() - defer server.m.Unlock() - delete(server.cmdMap, streamPath) - path := filepath.Join(util.GetWorkPath(), server.cfg.HlsRoot, streamPath, "..") - err := util.RemoveDir(path) +func (s *ToHlsServer) Delete(streamPath string) bool { + s.m.Lock() + defer s.m.Unlock() + delete(s.cmdMap, streamPath) + dir := filepath.Join(util.GetWorkPath(), s.Cfg.HlsRoot, BuildStreamFilePath(streamPath), "..") + err := util.RemoveDir(dir) if err != nil { log.Error(err.Error()) } return true } -func (server *ToHlsServer) GetList() []string { - server.m.Lock() - defer server.m.Unlock() +func (s *ToHlsServer) GetList() []string { + s.m.Lock() + defer s.m.Unlock() var slice []string - for _, elm := range server.cmdMap { + for _, elm := range s.cmdMap { slice = append(slice, elm.streamPath) } sort.Strings(slice) return slice } -func (server *ToHlsServer) DeleteAndStop(streamPath string) error { - server.m.Lock() - defer server.m.Unlock() - delete(server.cmdMap, streamPath) - _, err := server.StopToHlsCmd(streamPath) +func (s *ToHlsServer) DeleteAndStop(streamPath string) error { + s.m.Lock() + defer s.m.Unlock() + delete(s.cmdMap, streamPath) + _, err := s.StopToHlsCmd(streamPath) if err != nil { return err } - path := filepath.Join(util.GetWorkPath(), server.cfg.HlsRoot, streamPath) - err = util.RemoveDir(path) + dir := filepath.Join(util.GetWorkPath(), s.Cfg.HlsRoot, BuildStreamFilePath(streamPath)) + err = util.RemoveDir(dir) if err != nil { log.Error(err.Error()) } return nil } -func (server *ToHlsServer) BuildToHlsCmd(url string, streamPath string, timeoutMs int, hlsCfg *ToHlsConfig) *exec.Cmd { - path := filepath.Join(util.GetWorkPath(), server.cfg.HlsRoot, streamPath) - _, err := util.CheckDir(path) +func (s *ToHlsServer) BuildToHlsCmd(url string, streamPath string, timeoutMs int, hlsCfg *ToHlsConfig) *exec.Cmd { + dir := filepath.Join(util.GetWorkPath(), s.Cfg.HlsRoot, BuildStreamFilePath(streamPath)) + _, err := util.CheckDir(dir) if err != nil { log.Error(err.Error()) } - path = filepath.Join(path, hlsCfg.HlsName+".m3u8") + p := filepath.Join(dir, hlsCfg.HlsName+".m3u8") buffer := new(strings.Builder) cmd := ffmpegCmd. Input(url, ffmpegCmd.KwArgs{ "stimeout": timeoutMs * 1000, "loglevel": "error", }). - Output(path, ffmpegCmd.KwArgs{ + Output(p, ffmpegCmd.KwArgs{ "c:v": hlsCfg.EncodeCodec, "profile:v": hlsCfg.Profile, "vf": util.Merge("scale=", hlsCfg.VfScale), @@ -134,11 +143,13 @@ func (server *ToHlsServer) BuildToHlsCmd(url string, streamPath string, timeoutM return cmd } -func (server *ToHlsServer) RunToHlsCmd(streamPath string) (bool, error) { - info, ok := server.cmdMap[streamPath] +func (s *ToHlsServer) RunToHlsCmd(streamPath string) (bool, error) { + info, ok := s.cmdMap[streamPath] if !ok { return false, nil } + s.Supervisor.Activate(streamPath) + defer s.Supervisor.DeActivate(streamPath) err := info.cmd.Run() if err != nil { errStr := strings.TrimSpace(fmt.Sprint(info.cmd.Stderr)) @@ -147,8 +158,8 @@ func (server *ToHlsServer) RunToHlsCmd(streamPath string) (bool, error) { return true, nil } -func (server *ToHlsServer) StopToHlsCmd(streamPath string) (bool, error) { - info, ok := server.cmdMap[streamPath] +func (s *ToHlsServer) StopToHlsCmd(streamPath string) (bool, error) { + info, ok := s.cmdMap[streamPath] if !ok { return true, nil } @@ -159,6 +170,29 @@ func (server *ToHlsServer) StopToHlsCmd(streamPath string) (bool, error) { return true, nil } -func (server *ToHlsServer) BuildHlsPath(streamPath string) string { - return filepath.Join(server.cfg.HlsRoot, streamPath, server.cfg.HlsName+".m3u8") +func (s *ToHlsServer) BuildHlsPath(streamPath string) string { + return path.Join("/"+s.Cfg.HlsRoot, streamPath, s.Cfg.HlsName+".m3u8") +} + +var ( + handlePath string +) + +func (s *ToHlsServer) CreateKey(oriUrl string) (string, bool) { + url := oriUrl + paramArr := strings.Split(url, "/") + if len(paramArr) != 5 || paramArr[1] != s.Cfg.HlsRoot { + return "", false + } + streamPath := paramArr[2] + "/" + paramArr[3] + return streamPath, true +} + +func (s *ToHlsServer) Expire(key string) bool { + _, ok := s.cmdMap[key] + if !ok { + return false + } + _ = s.DeleteAndStop(key) + return true } diff --git a/internal/pkg/server/systemServer/config.go b/internal/pkg/server/systemServer/config.go index 45ec674..95604a8 100644 --- a/internal/pkg/server/systemServer/config.go +++ b/internal/pkg/server/systemServer/config.go @@ -5,6 +5,7 @@ type SystemConfig struct { } type PayloadConfig struct { + Enable bool MaxRetryTime int `validate:"min=1,max=5"` MaxCpuUsage float32 `validate:"min=60,max=100"` MaxMemoryUsage float32 `validate:"min=60,max=100"` diff --git a/internal/pkg/server/systemServer/payload.go b/internal/pkg/server/systemServer/payload.go index b4d4860..191c42f 100644 --- a/internal/pkg/server/systemServer/payload.go +++ b/internal/pkg/server/systemServer/payload.go @@ -26,12 +26,14 @@ type Payload struct { } } -func NewPayloadServer(plCfg *PayloadConfig) *PayloadServer { +func NewPayloadServer(cfg *PayloadConfig) *PayloadServer { s := &PayloadServer{ - Cfg: plCfg, + Cfg: cfg, ref: util.NewChannel(), } - go s.start() + if s.Cfg.Enable { + go s.start() + } return s } diff --git a/internal/pkg/unit/keepalive/config.go b/internal/pkg/unit/keepalive/config.go new file mode 100644 index 0000000..dccc661 --- /dev/null +++ b/internal/pkg/unit/keepalive/config.go @@ -0,0 +1,7 @@ +package keepalive + +type Config struct { + Enable bool + PollInterval int `validate:"min=1,max=60"` + CheckInInterval int `validate:"min=1,max=30"` +} diff --git a/internal/pkg/unit/keepalive/keepalive.go b/internal/pkg/unit/keepalive/keepalive.go new file mode 100644 index 0000000..17f8c2a --- /dev/null +++ b/internal/pkg/unit/keepalive/keepalive.go @@ -0,0 +1,91 @@ +package keepalive + +import ( + "log" + "time" + "ycmediakit/internal/pkg/util" +) + +type Supervisor struct { + Cfg *Config + HandlePath string + guarder Guarder + fastMap *util.RwMap[string, time.Time] +} + +type Guarder interface { + CreateKey(string) (string, bool) + Expire(string) bool +} + +func NewSupervisor(cfg *Config) *Supervisor { + s := &Supervisor{ + Cfg: cfg, + fastMap: util.NewRwMap[string, time.Time](), + } + if s.Cfg.Enable { + go s.start() + } + return s +} + +func (s *Supervisor) start() { + i := time.Duration(s.Cfg.PollInterval) * time.Minute + for range time.Tick(i) { + go s.inspect() + } +} + +func (s *Supervisor) inspect() { + g := s.guarder + if g == nil { + return + } + nowTime := time.Now() + pollTime := time.Duration(s.Cfg.PollInterval) * time.Minute + s.fastMap.ModifyRange(func(key string, value time.Time) { + if nowTime.Sub(value) >= pollTime { + g.Expire(key) + delete(s.fastMap.Map, key) + } + }) +} + +func (s *Supervisor) Register(handlePath string, i interface{}) { + g, ok := i.(Guarder) + if !ok { + log.Panicf("%v is not implements Guarder\n", i) + } + if s.guarder == nil { + s.HandlePath = handlePath + s.guarder = g + } +} + +func (s *Supervisor) CheckIn(oriUrl string) { + g := s.guarder + if g == nil { + return + } + key, ok := g.CreateKey(oriUrl) + if !ok { + return + } + oldTime, ok := s.fastMap.Get(key) + newTime := time.Now() + if ok && newTime.Sub(oldTime) < time.Duration(s.Cfg.CheckInInterval)*time.Minute { + return + } + s.Activate(key) +} + +func (s *Supervisor) Activate(key string) { + s.fastMap.Set(key, time.Now()) +} + +func (s *Supervisor) DeActivate(key string) { + if !s.fastMap.Has(key) { + return + } + s.fastMap.Delete(key) +} diff --git a/internal/pkg/logger/config.go b/internal/pkg/unit/logger/config.go similarity index 96% rename from internal/pkg/logger/config.go rename to internal/pkg/unit/logger/config.go index 1808ead..48abee7 100644 --- a/internal/pkg/logger/config.go +++ b/internal/pkg/unit/logger/config.go @@ -1,6 +1,6 @@ package logger -type LogConfig struct { +type Config struct { Level string // Level 最低日志等级,DEBUG= level && lvl < zap.ErrorLevel && lvl >= zap.DebugLevel @@ -70,13 +71,13 @@ func NewMultiCore(lCfg *LogConfig) zapcore.Core { infoCore := zapcore.NewCore(infoEncoder, zapcore.AddSync(infoLogger), infoPrior) errEncoder := GetEncoder(true) - errFileName := filepath.Join(lCfg.FilePath, now+"-error.log") + errFileName := filepath.Join(cfg.FilePath, now+"-error.log") errLogger := &lumberjack.Logger{ - Filename: errFileName, // 文件位置 - MaxSize: lCfg.MaxSize, // 进行切割之前,日志文件的最大大小(MB为单位) - MaxAge: lCfg.MaxAge, // 保留旧文件的最大天数 - MaxBackups: lCfg.MaxBackups, // 保留旧文件的最大个数 - Compress: lCfg.Compress, // 是否压缩/归档旧文件 + Filename: errFileName, // 文件位置 + MaxSize: cfg.MaxSize, // 进行切割之前,日志文件的最大大小(MB为单位) + MaxAge: cfg.MaxAge, // 保留旧文件的最大天数 + MaxBackups: cfg.MaxBackups, // 保留旧文件的最大个数 + Compress: cfg.Compress, // 是否压缩/归档旧文件 } errPrior := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool { return lvl >= level && lvl >= zap.ErrorLevel diff --git a/internal/pkg/result/result.go b/internal/pkg/unit/result/result.go similarity index 100% rename from internal/pkg/result/result.go rename to internal/pkg/unit/result/result.go diff --git a/internal/pkg/util/RwMap.go b/internal/pkg/util/RwMap.go new file mode 100644 index 0000000..2b152d8 --- /dev/null +++ b/internal/pkg/util/RwMap.go @@ -0,0 +1,106 @@ +package util + +import ( + "sync" + "time" +) + +type RwMap[K comparable, V any] struct { + sync.RWMutex + Map map[K]V +} + +func NewRwMap[K comparable, V any]() *RwMap[K, V] { + return &RwMap[K, V]{ + Map: make(map[K]V), + } +} + +func (m *RwMap[K, V]) Add(k K, v V) bool { + m.Lock() + defer m.Unlock() + if _, ok := m.Map[k]; ok { + return false + } + m.Map[k] = v + return true +} + +func (m *RwMap[K, V]) Set(k K, v V) { + m.Lock() + m.Map[k] = v + m.Unlock() +} + +func (m *RwMap[K, V]) Has(k K) (ok bool) { + m.RLock() + defer m.RUnlock() + _, ok = m.Map[k] + return +} + +func (m *RwMap[K, V]) Len() int { + return len(m.Map) +} + +func (m *RwMap[K, V]) Get(k K) (V, bool) { + m.RLock() + defer m.RUnlock() + v, ok := m.Map[k] + return v, ok +} + +func (m *RwMap[K, V]) Delete(k K) (v V, ok bool) { + m.RLock() + v, ok = m.Map[k] + m.RUnlock() + if ok { + m.Lock() + delete(m.Map, k) + m.Unlock() + } + return +} + +func (m *RwMap[K, V]) ToList() (r []V) { + m.RLock() + defer m.RUnlock() + for _, s := range m.Map { + r = append(r, s) + } + return +} + +func MapList[K comparable, V any, R any](m *RwMap[K, V], f func(K, V) R) (r []R) { + m.RLock() + defer m.RUnlock() + for k, v := range m.Map { + r = append(r, f(k, v)) + } + return +} + +func (m *RwMap[K, V]) Range(f func(K, V)) { + m.RLock() + defer m.RUnlock() + for k, v := range m.Map { + f(k, v) + } +} + +func (m *RwMap[K, V]) RangeInterval(f func(K, V), d time.Duration) { + m.RLock() + defer m.RUnlock() + for k, v := range m.Map { + f(k, v) + time.Sleep(d) + } +} + +func (m *RwMap[K, V]) ModifyRange(f func(K, V)) { + m.Lock() + defer m.Unlock() + for k, v := range m.Map { + f(k, v) + } +} diff --git a/middleware/interceptor.go b/middleware/interceptor.go index 6dc0a1e..a73b782 100644 --- a/middleware/interceptor.go +++ b/middleware/interceptor.go @@ -4,20 +4,27 @@ import ( "github.com/gin-gonic/gin" "strconv" "strings" - "ycmediakit/internal/pkg/result" "ycmediakit/internal/pkg/server/systemServer" + "ycmediakit/internal/pkg/unit/keepalive" + "ycmediakit/internal/pkg/unit/result" ) // HttpInterceptor 可自定义鉴权等操作 -// TODO func HttpInterceptor() gin.HandlerFunc { return func(c *gin.Context) { c.Next() } } +// KeepAliveToHlsInterceptor 保活 toHls 服务 +func KeepAliveToHlsInterceptor(kas *keepalive.Supervisor) gin.HandlerFunc { + return func(c *gin.Context) { + kas.CheckIn(c.Request.RequestURI) + c.Next() + } +} + // ProbePayloadInterceptor 探测 ffmpeg 命令服务器是否有资源得以运行 -// TODO func ProbePayloadInterceptor(ps *systemServer.PayloadServer) gin.HandlerFunc { return func(c *gin.Context) { cfg := ps.Cfg