update: refactor global limit cache. Use gocache lib for level 2 cache

fix: shadowsocks2022 init feat
This commit is contained in:
Senis John
2022-12-07 12:56:07 +08:00
parent fc16cb0972
commit 85d73408c3
9 changed files with 154 additions and 148 deletions

View File

@@ -1,14 +1,11 @@
package controller
import (
"context"
"fmt"
"log"
"reflect"
"sync"
"time"
"github.com/go-redis/redis/v8"
"github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/common/task"
"github.com/xtls/xray-core/core"
@@ -19,7 +16,6 @@ import (
"github.com/XrayR-project/XrayR/api"
"github.com/XrayR-project/XrayR/app/mydispatcher"
"github.com/XrayR-project/XrayR/common/limiter"
"github.com/XrayR-project/XrayR/common/mylego"
"github.com/XrayR-project/XrayR/common/serverstatus"
)
@@ -102,7 +98,7 @@ func (c *Controller) Start() error {
}
// Add Limiter
if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, userInfo, c.initGlobal()); err != nil {
if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, userInfo, c.config.GlobalDeviceLimitConfig); err != nil {
log.Print(err)
}
@@ -152,18 +148,6 @@ func (c *Controller) Start() error {
}})
}
// Check global limit in need
if c.config.GlobalDeviceLimitConfig.Enable {
c.tasks = append(c.tasks,
periodicTask{
tag: "global limit",
Periodic: &task.Periodic{
Interval: time.Duration(c.config.UpdatePeriodic) * time.Second,
Execute: c.globalLimitFetch,
},
})
}
// Start periodic tasks
for i := range c.tasks {
log.Printf("%s Start %s periodic task", c.logPrefix(), c.tasks[i].tag)
@@ -264,7 +248,7 @@ func (c *Controller) nodeInfoMonitor() (err error) {
}
// Add Limiter
if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, newUserInfo, c.initGlobal()); err != nil {
if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, newUserInfo, c.config.GlobalDeviceLimitConfig); err != nil {
log.Print(err)
return nil
}
@@ -299,20 +283,6 @@ func (c *Controller) nodeInfoMonitor() (err error) {
return nil
}
func (c *Controller) initGlobal() *limiter.GlobalDeviceLimitConfig {
// Init global limit redis client
globalConfig := c.config.GlobalDeviceLimitConfig
if c.config.GlobalDeviceLimitConfig.Enable {
log.Printf("[%s] Global limit: enable", c.Tag)
globalConfig.R = redis.NewClient(&redis.Options{
Addr: globalConfig.RedisAddr,
Password: globalConfig.RedisPassword,
DB: globalConfig.RedisDB,
})
}
return globalConfig
}
func (c *Controller) removeOldTag(oldTag string) (err error) {
err = c.removeInbound(oldTag)
if err != nil {
@@ -643,50 +613,3 @@ func (c *Controller) certMonitor() error {
}
return nil
}
// Fetch global limit periodically
func (c *Controller) globalLimitFetch() (err error) {
if value, ok := c.dispatcher.Limiter.InboundInfo.Load(c.Tag); ok {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
var (
cursor uint64
emails []string
)
inboundInfo := value.(*limiter.InboundInfo)
for {
if emails, cursor, err = c.config.GlobalDeviceLimitConfig.R.Scan(ctx, cursor, "*", 10000).Result(); err != nil {
newError(err).AtError().WriteToLog()
}
pipe := c.config.GlobalDeviceLimitConfig.R.Pipeline()
cmdMap := make(map[string]*redis.StringSliceCmd)
for i := range emails {
email := emails[i]
cmdMap[email] = pipe.SMembers(ctx, email)
}
if _, err := pipe.Exec(ctx); err != nil {
newError(fmt.Errorf("redis: %v", err)).AtError().WriteToLog()
} else {
inboundInfo.GlobalLimit.OnlineIP = new(sync.Map)
for email := range cmdMap {
ips := cmdMap[email].Val()
ipMap := new(sync.Map)
for i := range ips {
ipMap.Store(ips[i], 0)
inboundInfo.GlobalLimit.OnlineIP.Store(email, ipMap)
}
}
}
if cursor == 0 {
break
}
}
}
return nil
}