fix: On enable global limit, the online device report feat will failure

update: add pushIP method for limiter
This commit is contained in:
Senis John 2022-11-29 16:40:46 +08:00
parent ce5fe799f4
commit 8d0225bcbb
11 changed files with 119 additions and 128 deletions

View File

@ -98,7 +98,6 @@ type NodeRuleItem struct {
Pattern string `json:"pattern"` Pattern string `json:"pattern"`
} }
// IllegalReport
type IllegalReport struct { type IllegalReport struct {
UID int `json:"uid"` UID int `json:"uid"`
RuleID int `json:"rule_id"` RuleID int `json:"rule_id"`

View File

@ -21,7 +21,7 @@ import (
var ( var (
firstPortRe = regexp.MustCompile(`(?m)port=(?P<outport>\d+)#?`) // First Port firstPortRe = regexp.MustCompile(`(?m)port=(?P<outport>\d+)#?`) // First Port
secondPortRe = regexp.MustCompile(`(?m)port=\d+#(\d+)`) // Second Port secondPortRe = regexp.MustCompile(`(?m)port=\d+#(\d+)`) // Second Port
hostRe = regexp.MustCompile(`(?m)host=([\w\.]+)\|?`) // Host hostRe = regexp.MustCompile(`(?m)host=([\w.]+)\|?`) // Host
) )
// APIClient create a api client to the panel. // APIClient create a api client to the panel.

View File

@ -140,7 +140,9 @@ func (*DefaultDispatcher) Start() error {
} }
// Close implements common.Closable. // Close implements common.Closable.
func (*DefaultDispatcher) Close() error { return nil } func (*DefaultDispatcher) Close() error {
return nil
}
func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sniffing session.SniffingRequest) (*transport.Link, *transport.Link, error) { func (d *DefaultDispatcher) getLink(ctx context.Context, network net.Network, sniffing session.SniffingRequest) (*transport.Link, *transport.Link, error) {
downOpt := pipe.OptionsFromContext(ctx) downOpt := pipe.OptionsFromContext(ctx)

View File

@ -4,12 +4,10 @@ package limiter
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/go-redis/redis/v8"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/XrayR-project/XrayR/api" "github.com/XrayR-project/XrayR/api"
@ -27,16 +25,12 @@ type InboundInfo struct {
UserInfo *sync.Map // Key: Email value: UserInfo UserInfo *sync.Map // Key: Email value: UserInfo
BucketHub *sync.Map // key: Email, value: *rate.Limiter BucketHub *sync.Map // key: Email, value: *rate.Limiter
UserOnlineIP *sync.Map // Key: Email Value: *sync.Map: Key: IP, Value: UID UserOnlineIP *sync.Map // Key: Email Value: *sync.Map: Key: IP, Value: UID
GlobalOnlineIP *sync.Map // Key: Email Value: *sync.Map: Key: IP, Value: UID
} }
type Limiter struct { type Limiter struct {
InboundInfo *sync.Map // Key: Tag, Value: *InboundInfo InboundInfo *sync.Map // Key: Tag, Value: *InboundInfo
r *redis.Client g *GlobalDeviceLimitConfig
g struct {
enable bool
timeout int
expiry int
}
} }
func New() *Limiter { func New() *Limiter {
@ -48,16 +42,7 @@ func New() *Limiter {
func (l *Limiter) AddInboundLimiter(tag string, nodeSpeedLimit uint64, userList *[]api.UserInfo, globalDeviceLimit *GlobalDeviceLimitConfig) error { func (l *Limiter) AddInboundLimiter(tag string, nodeSpeedLimit uint64, userList *[]api.UserInfo, globalDeviceLimit *GlobalDeviceLimitConfig) error {
// global limit // global limit
if globalDeviceLimit.Enable { if globalDeviceLimit.Enable {
log.Printf("[%s] Global limit: enable", tag) l.g = globalDeviceLimit
l.r = redis.NewClient(&redis.Options{
Addr: globalDeviceLimit.RedisAddr,
Password: globalDeviceLimit.RedisPassword,
DB: globalDeviceLimit.RedisDB,
})
l.g.enable = globalDeviceLimit.Enable
l.g.timeout = globalDeviceLimit.Timeout
l.g.expiry = globalDeviceLimit.Expiry
} }
inboundInfo := &InboundInfo{ inboundInfo := &InboundInfo{
@ -65,6 +50,7 @@ func (l *Limiter) AddInboundLimiter(tag string, nodeSpeedLimit uint64, userList
NodeSpeedLimit: nodeSpeedLimit, NodeSpeedLimit: nodeSpeedLimit,
BucketHub: new(sync.Map), BucketHub: new(sync.Map),
UserOnlineIP: new(sync.Map), UserOnlineIP: new(sync.Map),
GlobalOnlineIP: new(sync.Map),
} }
userMap := new(sync.Map) userMap := new(sync.Map)
for _, u := range *userList { for _, u := range *userList {
@ -80,7 +66,6 @@ func (l *Limiter) AddInboundLimiter(tag string, nodeSpeedLimit uint64, userList
} }
func (l *Limiter) UpdateInboundLimiter(tag string, updatedUserList *[]api.UserInfo) error { func (l *Limiter) UpdateInboundLimiter(tag string, updatedUserList *[]api.UserInfo) error {
if value, ok := l.InboundInfo.Load(tag); ok { if value, ok := l.InboundInfo.Load(tag); ok {
inboundInfo := value.(*InboundInfo) inboundInfo := value.(*InboundInfo)
// Update User info // Update User info
@ -127,6 +112,7 @@ func (l *Limiter) GetOnlineDevice(tag string) (*[]api.OnlineUser, error) {
return true return true
}) })
inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool { inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool {
email := key.(string)
ipMap := value.(*sync.Map) ipMap := value.(*sync.Map)
ipMap.Range(func(key, value interface{}) bool { ipMap.Range(func(key, value interface{}) bool {
uid := value.(int) uid := value.(int)
@ -134,6 +120,7 @@ func (l *Limiter) GetOnlineDevice(tag string) (*[]api.OnlineUser, error) {
onlineUser = append(onlineUser, api.OnlineUser{UID: uid, IP: ip}) onlineUser = append(onlineUser, api.OnlineUser{UID: uid, IP: ip})
return true return true
}) })
inboundInfo.UserOnlineIP.Delete(email) // Reset online device
return true return true
}) })
} else { } else {
@ -159,27 +146,6 @@ func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *r
deviceLimit = u.DeviceLimit deviceLimit = u.DeviceLimit
} }
// Global device limit
if l.g.enable {
email = email[strings.Index(email, "|")+1:]
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(l.g.timeout))
defer cancel()
if err := l.r.HSet(ctx, email, ip, uid).Err(); err != nil {
newError(fmt.Sprintf("Redis: %v", err)).AtError().WriteToLog()
}
// check ttl, if ttl == -1, then set expire time.
if l.r.TTL(ctx, email).Val() == -1 {
if err := l.r.Expire(ctx, email, time.Duration(l.g.expiry)*time.Minute).Err(); err != nil {
newError(fmt.Sprintf("Redis: %v", err)).AtError().WriteToLog()
}
}
}()
}
// Local device limit // Local device limit
ipMap := new(sync.Map) ipMap := new(sync.Map)
ipMap.Store(ip, uid) ipMap.Store(ip, uid)
@ -187,7 +153,7 @@ func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *r
if v, ok := inboundInfo.UserOnlineIP.LoadOrStore(email, ipMap); ok { if v, ok := inboundInfo.UserOnlineIP.LoadOrStore(email, ipMap); ok {
ipMap := v.(*sync.Map) ipMap := v.(*sync.Map)
// If this is a new ip // If this is a new ip
if _, ok := ipMap.LoadOrStore(ip, uid); !ok || l.g.enable { if _, ok := ipMap.LoadOrStore(ip, uid); !ok {
counter := 0 counter := 0
ipMap.Range(func(key, value interface{}) bool { ipMap.Range(func(key, value interface{}) bool {
counter++ counter++
@ -200,6 +166,30 @@ func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *r
} }
} }
// Global device limit
if l.g.Enable {
email := email[strings.Index(email, "|")+1:]
if v, ok := inboundInfo.GlobalOnlineIP.Load(email); ok {
ipMap := v.(*sync.Map)
// If this is a new ip
if _, ok := ipMap.LoadOrStore(ip, uid); !ok {
counter := 0
ipMap.Range(func(key, value interface{}) bool {
counter++
return true
})
if counter > deviceLimit && deviceLimit > 0 {
ipMap.Delete(ip)
return nil, false, true
}
go l.pushIP(email, ip, uid)
}
} else {
go l.pushIP(email, ip, uid)
}
}
// Speed limit // Speed limit
limit := determineRate(nodeLimit, userLimit) // Determine the speed limit rate limit := determineRate(nodeLimit, userLimit) // Determine the speed limit rate
if limit > 0 { if limit > 0 {
@ -219,6 +209,23 @@ func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *r
} }
} }
// Push new IP to redis
func (l *Limiter) pushIP(email string, ip string, uid int) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(l.g.Timeout))
defer cancel()
if err := l.g.R.HSet(ctx, email, map[string]any{ip: uid}).Err(); err != nil {
newError(fmt.Sprintf("Redis: %v", err)).AtError().WriteToLog()
}
// check ttl, if ttl == -1, then set expire time.
if l.g.R.TTL(ctx, email).Val() == -1 {
if err := l.g.R.Expire(ctx, email, time.Duration(l.g.Expiry)*time.Minute).Err(); err != nil {
newError(fmt.Sprintf("Redis: %v", err)).AtError().WriteToLog()
}
}
}
// determineRate returns the minimum non-zero rate // determineRate returns the minimum non-zero rate
func determineRate(nodeLimit, userLimit uint64) (limit uint64) { func determineRate(nodeLimit, userLimit uint64) (limit uint64) {
if nodeLimit == 0 || userLimit == 0 { if nodeLimit == 0 || userLimit == 0 {

View File

@ -1,5 +1,9 @@
package limiter package limiter
import (
"github.com/go-redis/redis/v8"
)
type GlobalDeviceLimitConfig struct { type GlobalDeviceLimitConfig struct {
Enable bool `mapstructure:"Enable"` Enable bool `mapstructure:"Enable"`
RedisAddr string `mapstructure:"RedisAddr"` // host:port RedisAddr string `mapstructure:"RedisAddr"` // host:port
@ -7,4 +11,5 @@ type GlobalDeviceLimitConfig struct {
RedisDB int `mapstructure:"RedisDB"` RedisDB int `mapstructure:"RedisDB"`
Timeout int `mapstructure:"Timeout"` Timeout int `mapstructure:"Timeout"`
Expiry int `mapstructure:"Expiry"` // second Expiry int `mapstructure:"Expiry"` // second
R *redis.Client
} }

View File

@ -11,7 +11,14 @@ ConnectionConfig:
ConnIdle: 30 # Connection idle time limit, Second ConnIdle: 30 # Connection idle time limit, Second
UplinkOnly: 2 # Time limit when the connection downstream is closed, Second UplinkOnly: 2 # Time limit when the connection downstream is closed, Second
DownlinkOnly: 4 # Time limit when the connection is closed after the uplink is closed, Second DownlinkOnly: 4 # Time limit when the connection is closed after the uplink is closed, Second
BufferSize: 64 # The internal cache size of each connection, kB BufferSize: 64 # The internal cache size of each connection, kB
GlobalDeviceLimitConfig:
Enable: true # Enable the global device limit of a user
RedisAddr: 10.0.0.188:6379 # The redis server address
RedisPassword: # Redis password
RedisDB: 0 # Redis DB
Timeout: 5 # Timeout for redis request
Expiry: 60 # Expiry time (second)
Nodes: Nodes:
- -
PanelType: "SSpanel" # Panel type: SSpanel, V2board, NewV2board, PMpanel, Proxypanel, V2RaySocks PanelType: "SSpanel" # Panel type: SSpanel, V2board, NewV2board, PMpanel, Proxypanel, V2RaySocks
@ -38,13 +45,6 @@ Nodes:
WarnTimes: 0 # After (WarnTimes) consecutive warnings, the user will be limited. Set to 0 to punish overspeed user immediately. WarnTimes: 0 # After (WarnTimes) consecutive warnings, the user will be limited. Set to 0 to punish overspeed user immediately.
LimitSpeed: 0 # The speedlimit of a limited user (unit: mbps) LimitSpeed: 0 # The speedlimit of a limited user (unit: mbps)
LimitDuration: 0 # How many minutes will the limiting last (unit: minute) LimitDuration: 0 # How many minutes will the limiting last (unit: minute)
GlobalDeviceLimitConfig:
Enable: false # Enable the global device limit of a user
RedisAddr: 127.0.0.1:6379 # The redis server address
RedisPassword: YOUR PASSWORD # Redis password
RedisDB: 0 # Redis DB
Timeout: 5 # Timeout for redis request
Expiry: 60 # Expiry time (second)
EnableFallback: false # Only support for Trojan and Vless EnableFallback: false # Only support for Trojan and Vless
FallBackConfigs: # Support multiple fallbacks FallBackConfigs: # Support multiple fallbacks
- -
@ -89,4 +89,3 @@ Nodes:
# DNSEnv: # DNS ENV option used by DNS provider # DNSEnv: # DNS ENV option used by DNS provider
# ALICLOUD_ACCESS_KEY: aaa # ALICLOUD_ACCESS_KEY: aaa
# ALICLOUD_SECRET_KEY: bbb # ALICLOUD_SECRET_KEY: bbb

View File

@ -2,17 +2,19 @@ package panel
import ( import (
"github.com/XrayR-project/XrayR/api" "github.com/XrayR-project/XrayR/api"
"github.com/XrayR-project/XrayR/common/limiter"
"github.com/XrayR-project/XrayR/service/controller" "github.com/XrayR-project/XrayR/service/controller"
) )
type Config struct { type Config struct {
LogConfig *LogConfig `mapstructure:"Log"` LogConfig *LogConfig `mapstructure:"Log"`
DnsConfigPath string `mapstructure:"DnsConfigPath"` DnsConfigPath string `mapstructure:"DnsConfigPath"`
InboundConfigPath string `mapstructure:"InboundConfigPath"` InboundConfigPath string `mapstructure:"InboundConfigPath"`
OutboundConfigPath string `mapstructure:"OutboundConfigPath"` OutboundConfigPath string `mapstructure:"OutboundConfigPath"`
RouteConfigPath string `mapstructure:"RouteConfigPath"` RouteConfigPath string `mapstructure:"RouteConfigPath"`
ConnectionConfig *ConnectionConfig `mapstructure:"ConnectionConfig"` ConnectionConfig *ConnectionConfig `mapstructure:"ConnectionConfig"`
NodesConfig []*NodesConfig `mapstructure:"Nodes"` GlobalDeviceLimitConfig *limiter.GlobalDeviceLimitConfig `mapstructure:"GlobalDeviceLimitConfig"`
NodesConfig []*NodesConfig `mapstructure:"Nodes"`
} }
type NodesConfig struct { type NodesConfig struct {

View File

@ -152,7 +152,7 @@ func (p *Panel) loadCore(panelConfig *Config) *core.Instance {
return server return server
} }
// Start Start the panel // Start the panel
func (p *Panel) Start() { func (p *Panel) Start() {
p.access.Lock() p.access.Lock()
defer p.access.Unlock() defer p.access.Unlock()
@ -163,6 +163,11 @@ func (p *Panel) Start() {
log.Panicf("Failed to start instance: %s", err) log.Panicf("Failed to start instance: %s", err)
} }
p.Server = server p.Server = server
if p.panelConfig.GlobalDeviceLimitConfig.Enable {
log.Println("Global limit: Enable")
}
// Load Nodes config // Load Nodes config
for _, nodeConfig := range p.panelConfig.NodesConfig { for _, nodeConfig := range p.panelConfig.NodesConfig {
var apiClient api.API var apiClient api.API
@ -191,7 +196,7 @@ func (p *Panel) Start() {
log.Panicf("Read Controller Config Failed") log.Panicf("Read Controller Config Failed")
} }
} }
controllerService = controller.New(server, apiClient, controllerConfig, nodeConfig.PanelType) controllerService = controller.New(server, apiClient, controllerConfig, nodeConfig.PanelType, p.panelConfig.GlobalDeviceLimitConfig)
p.Service = append(p.Service, controllerService) p.Service = append(p.Service, controllerService)
} }
@ -207,7 +212,7 @@ func (p *Panel) Start() {
return return
} }
// Close Close the panel // Close the panel
func (p *Panel) Close() { func (p *Panel) Close() {
p.access.Lock() p.access.Lock()
defer p.access.Unlock() defer p.access.Unlock()
@ -224,20 +229,20 @@ func (p *Panel) Close() {
} }
func parseConnectionConfig(c *ConnectionConfig) (policy *conf.Policy) { func parseConnectionConfig(c *ConnectionConfig) (policy *conf.Policy) {
connetionConfig := getDefaultConnectionConfig() connectionConfig := getDefaultConnectionConfig()
if c != nil { if c != nil {
if _, err := diff.Merge(connetionConfig, c, connetionConfig); err != nil { if _, err := diff.Merge(connectionConfig, c, connectionConfig); err != nil {
log.Panicf("Read ConnectionConfig failed: %s", err) log.Panicf("Read ConnectionConfig failed: %s", err)
} }
} }
policy = &conf.Policy{ policy = &conf.Policy{
StatsUserUplink: true, StatsUserUplink: true,
StatsUserDownlink: true, StatsUserDownlink: true,
Handshake: &connetionConfig.Handshake, Handshake: &connectionConfig.Handshake,
ConnectionIdle: &connetionConfig.ConnIdle, ConnectionIdle: &connectionConfig.ConnIdle,
UplinkOnly: &connetionConfig.UplinkOnly, UplinkOnly: &connectionConfig.UplinkOnly,
DownlinkOnly: &connetionConfig.DownlinkOnly, DownlinkOnly: &connectionConfig.DownlinkOnly,
BufferSize: &connetionConfig.BufferSize, BufferSize: &connectionConfig.BufferSize,
} }
return return

View File

@ -1,26 +1,24 @@
package controller package controller
import ( import (
"github.com/XrayR-project/XrayR/common/limiter"
"github.com/XrayR-project/XrayR/common/mylego" "github.com/XrayR-project/XrayR/common/mylego"
) )
type Config struct { type Config struct {
ListenIP string `mapstructure:"ListenIP"` ListenIP string `mapstructure:"ListenIP"`
SendIP string `mapstructure:"SendIP"` SendIP string `mapstructure:"SendIP"`
UpdatePeriodic int `mapstructure:"UpdatePeriodic"` UpdatePeriodic int `mapstructure:"UpdatePeriodic"`
CertConfig *mylego.CertConfig `mapstructure:"CertConfig"` CertConfig *mylego.CertConfig `mapstructure:"CertConfig"`
EnableDNS bool `mapstructure:"EnableDNS"` EnableDNS bool `mapstructure:"EnableDNS"`
DNSType string `mapstructure:"DNSType"` DNSType string `mapstructure:"DNSType"`
DisableUploadTraffic bool `mapstructure:"DisableUploadTraffic"` DisableUploadTraffic bool `mapstructure:"DisableUploadTraffic"`
DisableGetRule bool `mapstructure:"DisableGetRule"` DisableGetRule bool `mapstructure:"DisableGetRule"`
EnableProxyProtocol bool `mapstructure:"EnableProxyProtocol"` EnableProxyProtocol bool `mapstructure:"EnableProxyProtocol"`
EnableFallback bool `mapstructure:"EnableFallback"` EnableFallback bool `mapstructure:"EnableFallback"`
DisableIVCheck bool `mapstructure:"DisableIVCheck"` DisableIVCheck bool `mapstructure:"DisableIVCheck"`
DisableSniffing bool `mapstructure:"DisableSniffing"` DisableSniffing bool `mapstructure:"DisableSniffing"`
AutoSpeedLimitConfig *AutoSpeedLimitConfig `mapstructure:"AutoSpeedLimitConfig"` AutoSpeedLimitConfig *AutoSpeedLimitConfig `mapstructure:"AutoSpeedLimitConfig"`
GlobalDeviceLimitConfig *limiter.GlobalDeviceLimitConfig `mapstructure:"GlobalDeviceLimitConfig"` FallBackConfigs []*FallBackConfig `mapstructure:"FallBackConfigs"`
FallBackConfigs []*FallBackConfig `mapstructure:"FallBackConfigs"`
} }
type AutoSpeedLimitConfig struct { type AutoSpeedLimitConfig struct {

View File

@ -48,7 +48,7 @@ type Controller struct {
stm stats.Manager stm stats.Manager
dispatcher *mydispatcher.DefaultDispatcher dispatcher *mydispatcher.DefaultDispatcher
startAt time.Time startAt time.Time
r *redis.Client g *limiter.GlobalDeviceLimitConfig
} }
type periodicTask struct { type periodicTask struct {
@ -57,7 +57,7 @@ type periodicTask struct {
} }
// New return a Controller service with default parameters. // New return a Controller service with default parameters.
func New(server *core.Instance, api api.API, config *Config, panelType string) *Controller { func New(server *core.Instance, api api.API, config *Config, panelType string, globalConfig *limiter.GlobalDeviceLimitConfig) *Controller {
controller := &Controller{ controller := &Controller{
server: server, server: server,
config: config, config: config,
@ -71,12 +71,13 @@ func New(server *core.Instance, api api.API, config *Config, panelType string) *
} }
// Init global limit redis client // Init global limit redis client
if config.GlobalDeviceLimitConfig.Enable { if globalConfig.Enable {
controller.r = redis.NewClient(&redis.Options{ globalConfig.R = redis.NewClient(&redis.Options{
Addr: config.GlobalDeviceLimitConfig.RedisAddr, Addr: globalConfig.RedisAddr,
Password: config.GlobalDeviceLimitConfig.RedisPassword, Password: globalConfig.RedisPassword,
DB: config.GlobalDeviceLimitConfig.RedisDB, DB: globalConfig.RedisDB,
}) })
controller.g = globalConfig
} }
return controller return controller
@ -113,7 +114,7 @@ func (c *Controller) Start() error {
} }
// Add Limiter // Add Limiter
if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, userInfo, c.config.GlobalDeviceLimitConfig); err != nil { if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, userInfo, c.g); err != nil {
log.Print(err) log.Print(err)
} }
@ -164,7 +165,7 @@ func (c *Controller) Start() error {
} }
// Check global limit in need // Check global limit in need
if c.config.GlobalDeviceLimitConfig.Enable { if c.g.Enable {
c.tasks = append(c.tasks, c.tasks = append(c.tasks,
periodicTask{ periodicTask{
tag: "global limit", tag: "global limit",
@ -175,16 +176,6 @@ func (c *Controller) Start() error {
}) })
} }
// Reset online user
c.tasks = append(c.tasks,
periodicTask{
tag: "reset online user",
Periodic: &task.Periodic{
Interval: time.Duration(c.config.UpdatePeriodic) * time.Second * 15,
Execute: c.resetOnlineUser,
},
})
// Start periodic tasks // Start periodic tasks
for i := range c.tasks { for i := range c.tasks {
log.Printf("%s Start %s periodic task", c.logPrefix(), c.tasks[i].tag) log.Printf("%s Start %s periodic task", c.logPrefix(), c.tasks[i].tag)
@ -284,7 +275,7 @@ func (c *Controller) nodeInfoMonitor() (err error) {
return nil return nil
} }
// Add Limiter // Add Limiter
if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, newUserInfo, c.config.GlobalDeviceLimitConfig); err != nil { if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, newUserInfo, c.g); err != nil {
log.Print(err) log.Print(err)
return nil return nil
} }
@ -663,10 +654,10 @@ func (c *Controller) globalLimitFetch() (err error) {
inboundInfo := value.(*limiter.InboundInfo) inboundInfo := value.(*limiter.InboundInfo)
for { for {
if emails, cursor, err = c.r.Scan(ctx, cursor, "*", 1000).Result(); err != nil { if emails, cursor, err = c.g.R.Scan(ctx, cursor, "*", 1000).Result(); err != nil {
newError(err).AtError().WriteToLog() newError(err).AtError().WriteToLog()
} }
pipe := c.r.Pipeline() pipe := c.g.R.Pipeline()
cmdMap := make(map[string]*redis.StringStringMapCmd) cmdMap := make(map[string]*redis.StringStringMapCmd)
for i := range emails { for i := range emails {
@ -677,13 +668,14 @@ func (c *Controller) globalLimitFetch() (err error) {
if _, err := pipe.Exec(ctx); err != nil { if _, err := pipe.Exec(ctx); err != nil {
newError(fmt.Sprintf("Redis: %v", err)).AtError().WriteToLog() newError(fmt.Sprintf("Redis: %v", err)).AtError().WriteToLog()
} else { } else {
inboundInfo.GlobalOnlineIP = new(sync.Map)
for k := range cmdMap { for k := range cmdMap {
ips := cmdMap[k].Val() ips := cmdMap[k].Val()
ipMap := new(sync.Map) ipMap := new(sync.Map)
for i := range ips { for i := range ips {
uid, _ := strconv.Atoi(ips[i]) uid, _ := strconv.Atoi(ips[i])
ipMap.Store(i, uid) ipMap.Store(i, uid)
inboundInfo.UserOnlineIP.LoadOrStore(k, ipMap) inboundInfo.GlobalOnlineIP.Store(k, ipMap)
} }
} }
} }
@ -696,21 +688,3 @@ func (c *Controller) globalLimitFetch() (err error) {
return nil return nil
} }
func (c *Controller) resetOnlineUser() error {
// delay to start
if time.Since(c.startAt) < time.Duration(c.config.UpdatePeriodic)*time.Second*15 {
return nil
}
if value, ok := c.dispatcher.Limiter.InboundInfo.Load(c.Tag); ok {
inboundInfo := value.(*limiter.InboundInfo)
inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool {
email := key.(string)
inboundInfo.UserOnlineIP.Delete(email) // Reset online device
return true
})
}
return nil
}

View File

@ -1,5 +1,5 @@
// Package service contains all the services used by XrayR // Package service contains all the services used by XrayR
// To implement an service, one needs to implement the interface below. // To implement a service, one needs to implement the interface below.
package service package service
// Service is the interface of all the services running in the panel // Service is the interface of all the services running in the panel