From 8d0225bcbb176029229b56173fbd129e267365c0 Mon Sep 17 00:00:00 2001 From: Senis John Date: Tue, 29 Nov 2022 16:40:46 +0800 Subject: [PATCH] fix: On enable global limit, the online device report feat will failure update: add pushIP method for limiter --- api/proxypanel/model.go | 1 - api/sspanel/sspanel.go | 2 +- app/mydispatcher/default.go | 4 +- common/limiter/limiter.go | 89 +++++++++++++++++--------------- common/limiter/model.go | 5 ++ main/config.yml.example | 17 +++--- panel/config.go | 16 +++--- panel/panel.go | 25 +++++---- service/controller/config.go | 30 +++++------ service/controller/controller.go | 56 ++++++-------------- service/service.go | 2 +- 11 files changed, 119 insertions(+), 128 deletions(-) diff --git a/api/proxypanel/model.go b/api/proxypanel/model.go index 483fb81..82bf112 100644 --- a/api/proxypanel/model.go +++ b/api/proxypanel/model.go @@ -98,7 +98,6 @@ type NodeRuleItem struct { Pattern string `json:"pattern"` } -// IllegalReport type IllegalReport struct { UID int `json:"uid"` RuleID int `json:"rule_id"` diff --git a/api/sspanel/sspanel.go b/api/sspanel/sspanel.go index 0486def..b023252 100644 --- a/api/sspanel/sspanel.go +++ b/api/sspanel/sspanel.go @@ -21,7 +21,7 @@ import ( var ( firstPortRe = regexp.MustCompile(`(?m)port=(?P\d+)#?`) // First Port secondPortRe = regexp.MustCompile(`(?m)port=\d+#(\d+)`) // Second Port - hostRe = regexp.MustCompile(`(?m)host=([\w\.]+)\|?`) // Host + hostRe = regexp.MustCompile(`(?m)host=([\w.]+)\|?`) // Host ) // APIClient create a api client to the panel. diff --git a/app/mydispatcher/default.go b/app/mydispatcher/default.go index a7efa63..086b127 100644 --- a/app/mydispatcher/default.go +++ b/app/mydispatcher/default.go @@ -140,7 +140,9 @@ func (*DefaultDispatcher) Start() error { } // Close implements common.Closable. -func (*DefaultDispatcher) Close() error { return nil } +func (*DefaultDispatcher) Close() error { + return nil +} func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sniffing session.SniffingRequest) (*transport.Link, *transport.Link, error) { downOpt := pipe.OptionsFromContext(ctx) diff --git a/common/limiter/limiter.go b/common/limiter/limiter.go index 951e797..a23eb2e 100644 --- a/common/limiter/limiter.go +++ b/common/limiter/limiter.go @@ -4,12 +4,10 @@ package limiter import ( "context" "fmt" - "log" "strings" "sync" "time" - "github.com/go-redis/redis/v8" "golang.org/x/time/rate" "github.com/XrayR-project/XrayR/api" @@ -27,16 +25,12 @@ type InboundInfo struct { UserInfo *sync.Map // Key: Email value: UserInfo BucketHub *sync.Map // key: Email, value: *rate.Limiter UserOnlineIP *sync.Map // Key: Email Value: *sync.Map: Key: IP, Value: UID + GlobalOnlineIP *sync.Map // Key: Email Value: *sync.Map: Key: IP, Value: UID } type Limiter struct { InboundInfo *sync.Map // Key: Tag, Value: *InboundInfo - r *redis.Client - g struct { - enable bool - timeout int - expiry int - } + g *GlobalDeviceLimitConfig } func New() *Limiter { @@ -48,16 +42,7 @@ func New() *Limiter { func (l *Limiter) AddInboundLimiter(tag string, nodeSpeedLimit uint64, userList *[]api.UserInfo, globalDeviceLimit *GlobalDeviceLimitConfig) error { // global limit if globalDeviceLimit.Enable { - log.Printf("[%s] Global limit: enable", tag) - - l.r = redis.NewClient(&redis.Options{ - Addr: globalDeviceLimit.RedisAddr, - Password: globalDeviceLimit.RedisPassword, - DB: globalDeviceLimit.RedisDB, - }) - l.g.enable = globalDeviceLimit.Enable - l.g.timeout = globalDeviceLimit.Timeout - l.g.expiry = globalDeviceLimit.Expiry + l.g = globalDeviceLimit } inboundInfo := &InboundInfo{ @@ -65,6 +50,7 @@ func (l *Limiter) AddInboundLimiter(tag string, nodeSpeedLimit uint64, userList NodeSpeedLimit: nodeSpeedLimit, BucketHub: new(sync.Map), UserOnlineIP: new(sync.Map), + GlobalOnlineIP: new(sync.Map), } userMap := new(sync.Map) for _, u := range *userList { @@ -80,7 +66,6 @@ func (l *Limiter) AddInboundLimiter(tag string, nodeSpeedLimit uint64, userList } func (l *Limiter) UpdateInboundLimiter(tag string, updatedUserList *[]api.UserInfo) error { - if value, ok := l.InboundInfo.Load(tag); ok { inboundInfo := value.(*InboundInfo) // Update User info @@ -127,6 +112,7 @@ func (l *Limiter) GetOnlineDevice(tag string) (*[]api.OnlineUser, error) { return true }) inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool { + email := key.(string) ipMap := value.(*sync.Map) ipMap.Range(func(key, value interface{}) bool { uid := value.(int) @@ -134,6 +120,7 @@ func (l *Limiter) GetOnlineDevice(tag string) (*[]api.OnlineUser, error) { onlineUser = append(onlineUser, api.OnlineUser{UID: uid, IP: ip}) return true }) + inboundInfo.UserOnlineIP.Delete(email) // Reset online device return true }) } else { @@ -159,27 +146,6 @@ func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *r deviceLimit = u.DeviceLimit } - // Global device limit - if l.g.enable { - email = email[strings.Index(email, "|")+1:] - - go func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(l.g.timeout)) - defer cancel() - - if err := l.r.HSet(ctx, email, ip, uid).Err(); err != nil { - newError(fmt.Sprintf("Redis: %v", err)).AtError().WriteToLog() - } - - // check ttl, if ttl == -1, then set expire time. - if l.r.TTL(ctx, email).Val() == -1 { - if err := l.r.Expire(ctx, email, time.Duration(l.g.expiry)*time.Minute).Err(); err != nil { - newError(fmt.Sprintf("Redis: %v", err)).AtError().WriteToLog() - } - } - }() - } - // Local device limit ipMap := new(sync.Map) ipMap.Store(ip, uid) @@ -187,7 +153,7 @@ func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *r if v, ok := inboundInfo.UserOnlineIP.LoadOrStore(email, ipMap); ok { ipMap := v.(*sync.Map) // If this is a new ip - if _, ok := ipMap.LoadOrStore(ip, uid); !ok || l.g.enable { + if _, ok := ipMap.LoadOrStore(ip, uid); !ok { counter := 0 ipMap.Range(func(key, value interface{}) bool { counter++ @@ -200,6 +166,30 @@ func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *r } } + // Global device limit + if l.g.Enable { + email := email[strings.Index(email, "|")+1:] + + if v, ok := inboundInfo.GlobalOnlineIP.Load(email); ok { + ipMap := v.(*sync.Map) + // If this is a new ip + if _, ok := ipMap.LoadOrStore(ip, uid); !ok { + counter := 0 + ipMap.Range(func(key, value interface{}) bool { + counter++ + return true + }) + if counter > deviceLimit && deviceLimit > 0 { + ipMap.Delete(ip) + return nil, false, true + } + go l.pushIP(email, ip, uid) + } + } else { + go l.pushIP(email, ip, uid) + } + } + // Speed limit limit := determineRate(nodeLimit, userLimit) // Determine the speed limit rate if limit > 0 { @@ -219,6 +209,23 @@ func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *r } } +// Push new IP to redis +func (l *Limiter) pushIP(email string, ip string, uid int) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(l.g.Timeout)) + defer cancel() + + if err := l.g.R.HSet(ctx, email, map[string]any{ip: uid}).Err(); err != nil { + newError(fmt.Sprintf("Redis: %v", err)).AtError().WriteToLog() + } + + // check ttl, if ttl == -1, then set expire time. + if l.g.R.TTL(ctx, email).Val() == -1 { + if err := l.g.R.Expire(ctx, email, time.Duration(l.g.Expiry)*time.Minute).Err(); err != nil { + newError(fmt.Sprintf("Redis: %v", err)).AtError().WriteToLog() + } + } +} + // determineRate returns the minimum non-zero rate func determineRate(nodeLimit, userLimit uint64) (limit uint64) { if nodeLimit == 0 || userLimit == 0 { diff --git a/common/limiter/model.go b/common/limiter/model.go index 569d1fc..3c869c9 100644 --- a/common/limiter/model.go +++ b/common/limiter/model.go @@ -1,5 +1,9 @@ package limiter +import ( + "github.com/go-redis/redis/v8" +) + type GlobalDeviceLimitConfig struct { Enable bool `mapstructure:"Enable"` RedisAddr string `mapstructure:"RedisAddr"` // host:port @@ -7,4 +11,5 @@ type GlobalDeviceLimitConfig struct { RedisDB int `mapstructure:"RedisDB"` Timeout int `mapstructure:"Timeout"` Expiry int `mapstructure:"Expiry"` // second + R *redis.Client } diff --git a/main/config.yml.example b/main/config.yml.example index 321ba31..e2f8c1a 100644 --- a/main/config.yml.example +++ b/main/config.yml.example @@ -11,7 +11,14 @@ ConnectionConfig: ConnIdle: 30 # Connection idle time limit, Second UplinkOnly: 2 # Time limit when the connection downstream is closed, Second DownlinkOnly: 4 # Time limit when the connection is closed after the uplink is closed, Second - BufferSize: 64 # The internal cache size of each connection, kB + BufferSize: 64 # The internal cache size of each connection, kB +GlobalDeviceLimitConfig: + Enable: true # Enable the global device limit of a user + RedisAddr: 10.0.0.188:6379 # The redis server address + RedisPassword: # Redis password + RedisDB: 0 # Redis DB + Timeout: 5 # Timeout for redis request + Expiry: 60 # Expiry time (second) Nodes: - PanelType: "SSpanel" # Panel type: SSpanel, V2board, NewV2board, PMpanel, Proxypanel, V2RaySocks @@ -38,13 +45,6 @@ Nodes: WarnTimes: 0 # After (WarnTimes) consecutive warnings, the user will be limited. Set to 0 to punish overspeed user immediately. LimitSpeed: 0 # The speedlimit of a limited user (unit: mbps) LimitDuration: 0 # How many minutes will the limiting last (unit: minute) - GlobalDeviceLimitConfig: - Enable: false # Enable the global device limit of a user - RedisAddr: 127.0.0.1:6379 # The redis server address - RedisPassword: YOUR PASSWORD # Redis password - RedisDB: 0 # Redis DB - Timeout: 5 # Timeout for redis request - Expiry: 60 # Expiry time (second) EnableFallback: false # Only support for Trojan and Vless FallBackConfigs: # Support multiple fallbacks - @@ -89,4 +89,3 @@ Nodes: # DNSEnv: # DNS ENV option used by DNS provider # ALICLOUD_ACCESS_KEY: aaa # ALICLOUD_SECRET_KEY: bbb - diff --git a/panel/config.go b/panel/config.go index a9aa964..3b8fd93 100644 --- a/panel/config.go +++ b/panel/config.go @@ -2,17 +2,19 @@ package panel import ( "github.com/XrayR-project/XrayR/api" + "github.com/XrayR-project/XrayR/common/limiter" "github.com/XrayR-project/XrayR/service/controller" ) type Config struct { - LogConfig *LogConfig `mapstructure:"Log"` - DnsConfigPath string `mapstructure:"DnsConfigPath"` - InboundConfigPath string `mapstructure:"InboundConfigPath"` - OutboundConfigPath string `mapstructure:"OutboundConfigPath"` - RouteConfigPath string `mapstructure:"RouteConfigPath"` - ConnectionConfig *ConnectionConfig `mapstructure:"ConnectionConfig"` - NodesConfig []*NodesConfig `mapstructure:"Nodes"` + LogConfig *LogConfig `mapstructure:"Log"` + DnsConfigPath string `mapstructure:"DnsConfigPath"` + InboundConfigPath string `mapstructure:"InboundConfigPath"` + OutboundConfigPath string `mapstructure:"OutboundConfigPath"` + RouteConfigPath string `mapstructure:"RouteConfigPath"` + ConnectionConfig *ConnectionConfig `mapstructure:"ConnectionConfig"` + GlobalDeviceLimitConfig *limiter.GlobalDeviceLimitConfig `mapstructure:"GlobalDeviceLimitConfig"` + NodesConfig []*NodesConfig `mapstructure:"Nodes"` } type NodesConfig struct { diff --git a/panel/panel.go b/panel/panel.go index 653309d..b131f57 100644 --- a/panel/panel.go +++ b/panel/panel.go @@ -152,7 +152,7 @@ func (p *Panel) loadCore(panelConfig *Config) *core.Instance { return server } -// Start Start the panel +// Start the panel func (p *Panel) Start() { p.access.Lock() defer p.access.Unlock() @@ -163,6 +163,11 @@ func (p *Panel) Start() { log.Panicf("Failed to start instance: %s", err) } p.Server = server + + if p.panelConfig.GlobalDeviceLimitConfig.Enable { + log.Println("Global limit: Enable") + } + // Load Nodes config for _, nodeConfig := range p.panelConfig.NodesConfig { var apiClient api.API @@ -191,7 +196,7 @@ func (p *Panel) Start() { log.Panicf("Read Controller Config Failed") } } - controllerService = controller.New(server, apiClient, controllerConfig, nodeConfig.PanelType) + controllerService = controller.New(server, apiClient, controllerConfig, nodeConfig.PanelType, p.panelConfig.GlobalDeviceLimitConfig) p.Service = append(p.Service, controllerService) } @@ -207,7 +212,7 @@ func (p *Panel) Start() { return } -// Close Close the panel +// Close the panel func (p *Panel) Close() { p.access.Lock() defer p.access.Unlock() @@ -224,20 +229,20 @@ func (p *Panel) Close() { } func parseConnectionConfig(c *ConnectionConfig) (policy *conf.Policy) { - connetionConfig := getDefaultConnectionConfig() + connectionConfig := getDefaultConnectionConfig() if c != nil { - if _, err := diff.Merge(connetionConfig, c, connetionConfig); err != nil { + if _, err := diff.Merge(connectionConfig, c, connectionConfig); err != nil { log.Panicf("Read ConnectionConfig failed: %s", err) } } policy = &conf.Policy{ StatsUserUplink: true, StatsUserDownlink: true, - Handshake: &connetionConfig.Handshake, - ConnectionIdle: &connetionConfig.ConnIdle, - UplinkOnly: &connetionConfig.UplinkOnly, - DownlinkOnly: &connetionConfig.DownlinkOnly, - BufferSize: &connetionConfig.BufferSize, + Handshake: &connectionConfig.Handshake, + ConnectionIdle: &connectionConfig.ConnIdle, + UplinkOnly: &connectionConfig.UplinkOnly, + DownlinkOnly: &connectionConfig.DownlinkOnly, + BufferSize: &connectionConfig.BufferSize, } return diff --git a/service/controller/config.go b/service/controller/config.go index d1b7080..dee36f7 100644 --- a/service/controller/config.go +++ b/service/controller/config.go @@ -1,26 +1,24 @@ package controller import ( - "github.com/XrayR-project/XrayR/common/limiter" "github.com/XrayR-project/XrayR/common/mylego" ) type Config struct { - ListenIP string `mapstructure:"ListenIP"` - SendIP string `mapstructure:"SendIP"` - UpdatePeriodic int `mapstructure:"UpdatePeriodic"` - CertConfig *mylego.CertConfig `mapstructure:"CertConfig"` - EnableDNS bool `mapstructure:"EnableDNS"` - DNSType string `mapstructure:"DNSType"` - DisableUploadTraffic bool `mapstructure:"DisableUploadTraffic"` - DisableGetRule bool `mapstructure:"DisableGetRule"` - EnableProxyProtocol bool `mapstructure:"EnableProxyProtocol"` - EnableFallback bool `mapstructure:"EnableFallback"` - DisableIVCheck bool `mapstructure:"DisableIVCheck"` - DisableSniffing bool `mapstructure:"DisableSniffing"` - AutoSpeedLimitConfig *AutoSpeedLimitConfig `mapstructure:"AutoSpeedLimitConfig"` - GlobalDeviceLimitConfig *limiter.GlobalDeviceLimitConfig `mapstructure:"GlobalDeviceLimitConfig"` - FallBackConfigs []*FallBackConfig `mapstructure:"FallBackConfigs"` + ListenIP string `mapstructure:"ListenIP"` + SendIP string `mapstructure:"SendIP"` + UpdatePeriodic int `mapstructure:"UpdatePeriodic"` + CertConfig *mylego.CertConfig `mapstructure:"CertConfig"` + EnableDNS bool `mapstructure:"EnableDNS"` + DNSType string `mapstructure:"DNSType"` + DisableUploadTraffic bool `mapstructure:"DisableUploadTraffic"` + DisableGetRule bool `mapstructure:"DisableGetRule"` + EnableProxyProtocol bool `mapstructure:"EnableProxyProtocol"` + EnableFallback bool `mapstructure:"EnableFallback"` + DisableIVCheck bool `mapstructure:"DisableIVCheck"` + DisableSniffing bool `mapstructure:"DisableSniffing"` + AutoSpeedLimitConfig *AutoSpeedLimitConfig `mapstructure:"AutoSpeedLimitConfig"` + FallBackConfigs []*FallBackConfig `mapstructure:"FallBackConfigs"` } type AutoSpeedLimitConfig struct { diff --git a/service/controller/controller.go b/service/controller/controller.go index 01a7b1c..aed1c83 100644 --- a/service/controller/controller.go +++ b/service/controller/controller.go @@ -48,7 +48,7 @@ type Controller struct { stm stats.Manager dispatcher *mydispatcher.DefaultDispatcher startAt time.Time - r *redis.Client + g *limiter.GlobalDeviceLimitConfig } type periodicTask struct { @@ -57,7 +57,7 @@ type periodicTask struct { } // New return a Controller service with default parameters. -func New(server *core.Instance, api api.API, config *Config, panelType string) *Controller { +func New(server *core.Instance, api api.API, config *Config, panelType string, globalConfig *limiter.GlobalDeviceLimitConfig) *Controller { controller := &Controller{ server: server, config: config, @@ -71,12 +71,13 @@ func New(server *core.Instance, api api.API, config *Config, panelType string) * } // Init global limit redis client - if config.GlobalDeviceLimitConfig.Enable { - controller.r = redis.NewClient(&redis.Options{ - Addr: config.GlobalDeviceLimitConfig.RedisAddr, - Password: config.GlobalDeviceLimitConfig.RedisPassword, - DB: config.GlobalDeviceLimitConfig.RedisDB, + if globalConfig.Enable { + globalConfig.R = redis.NewClient(&redis.Options{ + Addr: globalConfig.RedisAddr, + Password: globalConfig.RedisPassword, + DB: globalConfig.RedisDB, }) + controller.g = globalConfig } return controller @@ -113,7 +114,7 @@ func (c *Controller) Start() error { } // Add Limiter - if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, userInfo, c.config.GlobalDeviceLimitConfig); err != nil { + if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, userInfo, c.g); err != nil { log.Print(err) } @@ -164,7 +165,7 @@ func (c *Controller) Start() error { } // Check global limit in need - if c.config.GlobalDeviceLimitConfig.Enable { + if c.g.Enable { c.tasks = append(c.tasks, periodicTask{ tag: "global limit", @@ -175,16 +176,6 @@ func (c *Controller) Start() error { }) } - // Reset online user - c.tasks = append(c.tasks, - periodicTask{ - tag: "reset online user", - Periodic: &task.Periodic{ - Interval: time.Duration(c.config.UpdatePeriodic) * time.Second * 15, - Execute: c.resetOnlineUser, - }, - }) - // Start periodic tasks for i := range c.tasks { log.Printf("%s Start %s periodic task", c.logPrefix(), c.tasks[i].tag) @@ -284,7 +275,7 @@ func (c *Controller) nodeInfoMonitor() (err error) { return nil } // Add Limiter - if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, newUserInfo, c.config.GlobalDeviceLimitConfig); err != nil { + if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, newUserInfo, c.g); err != nil { log.Print(err) return nil } @@ -663,10 +654,10 @@ func (c *Controller) globalLimitFetch() (err error) { inboundInfo := value.(*limiter.InboundInfo) for { - if emails, cursor, err = c.r.Scan(ctx, cursor, "*", 1000).Result(); err != nil { + if emails, cursor, err = c.g.R.Scan(ctx, cursor, "*", 1000).Result(); err != nil { newError(err).AtError().WriteToLog() } - pipe := c.r.Pipeline() + pipe := c.g.R.Pipeline() cmdMap := make(map[string]*redis.StringStringMapCmd) for i := range emails { @@ -677,13 +668,14 @@ func (c *Controller) globalLimitFetch() (err error) { if _, err := pipe.Exec(ctx); err != nil { newError(fmt.Sprintf("Redis: %v", err)).AtError().WriteToLog() } else { + inboundInfo.GlobalOnlineIP = new(sync.Map) for k := range cmdMap { ips := cmdMap[k].Val() ipMap := new(sync.Map) for i := range ips { uid, _ := strconv.Atoi(ips[i]) ipMap.Store(i, uid) - inboundInfo.UserOnlineIP.LoadOrStore(k, ipMap) + inboundInfo.GlobalOnlineIP.Store(k, ipMap) } } } @@ -696,21 +688,3 @@ func (c *Controller) globalLimitFetch() (err error) { return nil } - -func (c *Controller) resetOnlineUser() error { - // delay to start - if time.Since(c.startAt) < time.Duration(c.config.UpdatePeriodic)*time.Second*15 { - return nil - } - - if value, ok := c.dispatcher.Limiter.InboundInfo.Load(c.Tag); ok { - inboundInfo := value.(*limiter.InboundInfo) - inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool { - email := key.(string) - inboundInfo.UserOnlineIP.Delete(email) // Reset online device - return true - }) - } - - return nil -} diff --git a/service/service.go b/service/service.go index 82a975e..7ec69e7 100644 --- a/service/service.go +++ b/service/service.go @@ -1,5 +1,5 @@ // Package service contains all the services used by XrayR -// To implement an service, one needs to implement the interface below. +// To implement a service, one needs to implement the interface below. package service // Service is the interface of all the services running in the panel