fix: refactor the InboundInfo struct

Global limit can separate settings for each node
This commit is contained in:
Senis John 2022-11-30 19:03:43 +08:00
parent b6600729b2
commit 5b45b8ffe8
No known key found for this signature in database
GPG Key ID: 845E9E4727C3E1A4
9 changed files with 89 additions and 85 deletions

View File

@ -35,7 +35,7 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v2 uses: actions/checkout@v3
# Initializes the CodeQL tools for scanning. # Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL - name: Initialize CodeQL

View File

@ -20,9 +20,9 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Check out the repo - name: Check out the repo
uses: actions/checkout@v2 uses: actions/checkout@v3
- name: Set up Docker Buildx - name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1 uses: docker/setup-buildx-action@v2
- name: Log in to the Container registry - name: Log in to the Container registry
uses: docker/login-action@v2 uses: docker/login-action@v2
with: with:
@ -35,7 +35,7 @@ jobs:
with: with:
images: ghcr.io/${{ github.repository }} images: ghcr.io/${{ github.repository }}
- name: Build and push - name: Build and push
uses: docker/build-push-action@v2 uses: docker/build-push-action@v3
with: with:
context: . context: .
platforms: linux/arm/v7,linux/arm64,linux/amd64,linux/s390x platforms: linux/arm/v7,linux/arm64,linux/amd64,linux/s390x

View File

@ -101,17 +101,17 @@ jobs:
CGO_ENABLED: 0 CGO_ENABLED: 0
steps: steps:
- name: Checkout codebase - name: Checkout codebase
uses: actions/checkout@v2 uses: actions/checkout@v3
- name: Show workflow information - name: Show workflow information
id: get_filename id: get_filename
run: | run: |
export _NAME=$(jq ".[\"$GOOS-$GOARCH$GOARM$GOMIPS\"].friendlyName" -r < .github/build/friendly-filenames.json) export _NAME=$(jq ".[\"$GOOS-$GOARCH$GOARM$GOMIPS\"].friendlyName" -r < .github/build/friendly-filenames.json)
echo "GOOS: $GOOS, GOARCH: $GOARCH, GOARM: $GOARM, GOMIPS: $GOMIPS, RELEASE_NAME: $_NAME" echo "GOOS: $GOOS, GOARCH: $GOARCH, GOARM: $GOARM, GOMIPS: $GOMIPS, RELEASE_NAME: $_NAME"
echo "::set-output name=ASSET_NAME::$_NAME" echo "ASSET_NAME=$_NAME" >> $GITHUB_OUTPUT
echo "ASSET_NAME=$_NAME" >> $GITHUB_ENV echo "ASSET_NAME=$_NAME" >> $GITHUB_ENV
- name: Set up Go - name: Set up Go
uses: actions/setup-go@v2 uses: actions/setup-go@v3
with: with:
go-version: ^1.19 go-version: ^1.19
@ -173,7 +173,7 @@ jobs:
run: | run: |
mv build_assets XrayR-$ASSET_NAME mv build_assets XrayR-$ASSET_NAME
- name: Upload files to Artifacts - name: Upload files to Artifacts
uses: actions/upload-artifact@v2 uses: actions/upload-artifact@v3
with: with:
name: XrayR-${{ steps.get_filename.outputs.ASSET_NAME }} name: XrayR-${{ steps.get_filename.outputs.ASSET_NAME }}
path: | path: |

View File

@ -25,12 +25,16 @@ type InboundInfo struct {
UserInfo *sync.Map // Key: Email value: UserInfo UserInfo *sync.Map // Key: Email value: UserInfo
BucketHub *sync.Map // key: Email, value: *rate.Limiter BucketHub *sync.Map // key: Email, value: *rate.Limiter
UserOnlineIP *sync.Map // Key: Email Value: *sync.Map: Key: IP, Value: UID UserOnlineIP *sync.Map // Key: Email Value: *sync.Map: Key: IP, Value: UID
GlobalOnlineIP *sync.Map // Key: Email Value: *sync.Map: Key: IP, Value: UID GlobalLimit *GlobalLimit
} }
type Limiter struct { type Limiter struct {
InboundInfo *sync.Map // Key: Tag, Value: *InboundInfo InboundInfo *sync.Map // Key: Tag, Value: *InboundInfo
g *GlobalDeviceLimitConfig }
type GlobalLimit struct {
*GlobalDeviceLimitConfig
OnlineIP *sync.Map // Key: Email Value: *sync.Map: Key: IP, Value: UID
} }
func New() *Limiter { func New() *Limiter {
@ -39,19 +43,18 @@ func New() *Limiter {
} }
} }
func (l *Limiter) AddInboundLimiter(tag string, nodeSpeedLimit uint64, userList *[]api.UserInfo, globalDeviceLimit *GlobalDeviceLimitConfig) error { func (l *Limiter) AddInboundLimiter(tag string, nodeSpeedLimit uint64, userList *[]api.UserInfo, globalLimit *GlobalDeviceLimitConfig) error {
// global limit
if globalDeviceLimit.Enable {
l.g = globalDeviceLimit
}
inboundInfo := &InboundInfo{ inboundInfo := &InboundInfo{
Tag: tag, Tag: tag,
NodeSpeedLimit: nodeSpeedLimit, NodeSpeedLimit: nodeSpeedLimit,
BucketHub: new(sync.Map), BucketHub: new(sync.Map),
UserOnlineIP: new(sync.Map), UserOnlineIP: new(sync.Map),
GlobalOnlineIP: new(sync.Map), GlobalLimit: &GlobalLimit{
GlobalDeviceLimitConfig: globalLimit,
OnlineIP: new(sync.Map),
},
} }
userMap := new(sync.Map) userMap := new(sync.Map)
for _, u := range *userList { for _, u := range *userList {
userMap.Store(fmt.Sprintf("%s|%s|%d", tag, u.Email, u.UID), UserInfo{ userMap.Store(fmt.Sprintf("%s|%s|%d", tag, u.Email, u.UID), UserInfo{
@ -132,13 +135,14 @@ func (l *Limiter) GetOnlineDevice(tag string) (*[]api.OnlineUser, error) {
func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *rate.Limiter, SpeedLimit bool, Reject bool) { func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *rate.Limiter, SpeedLimit bool, Reject bool) {
if value, ok := l.InboundInfo.Load(tag); ok { if value, ok := l.InboundInfo.Load(tag); ok {
inboundInfo := value.(*InboundInfo)
nodeLimit := inboundInfo.NodeSpeedLimit
var ( var (
userLimit uint64 = 0 userLimit uint64 = 0
deviceLimit, uid int deviceLimit, uid int
) )
inboundInfo := value.(*InboundInfo)
nodeLimit := inboundInfo.NodeSpeedLimit
if v, ok := inboundInfo.UserInfo.Load(email); ok { if v, ok := inboundInfo.UserInfo.Load(email); ok {
u := v.(UserInfo) u := v.(UserInfo)
uid = u.UID uid = u.UID
@ -167,10 +171,10 @@ func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *r
} }
// Global device limit // Global device limit
if l.g.Enable { if inboundInfo.GlobalLimit.Enable {
email := email[strings.Index(email, "|")+1:] email := email[strings.Index(email, "|")+1:]
if v, ok := inboundInfo.GlobalOnlineIP.Load(email); ok { if v, ok := inboundInfo.GlobalLimit.OnlineIP.Load(email); ok {
ipMap := v.(*sync.Map) ipMap := v.(*sync.Map)
// If this is a new ip // If this is a new ip
if _, ok := ipMap.LoadOrStore(ip, uid); !ok { if _, ok := ipMap.LoadOrStore(ip, uid); !ok {
@ -183,10 +187,10 @@ func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *r
ipMap.Delete(ip) ipMap.Delete(ip)
return nil, false, true return nil, false, true
} }
go l.pushIP(email, ip) go pushIP(email, ip, inboundInfo.GlobalLimit)
} }
} else { } else {
go l.pushIP(email, ip) go pushIP(email, ip, inboundInfo.GlobalLimit)
} }
} }
@ -210,17 +214,17 @@ func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *r
} }
// Push new IP to redis // Push new IP to redis
func (l *Limiter) pushIP(email string, ip string) { func pushIP(email string, ip string, g *GlobalLimit) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(l.g.Timeout)) ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(g.Timeout))
defer cancel() defer cancel()
if err := l.g.R.HSet(ctx, email, map[string]any{ip: 0}).Err(); err != nil { if err := g.R.SAdd(ctx, email, ip).Err(); err != nil {
newError(fmt.Errorf("redis: %v", err)).AtError().WriteToLog() newError(fmt.Errorf("redis: %v", err)).AtError().WriteToLog()
} }
// check ttl, if ttl == -1, then set expire time. // check ttl, if ttl == -1, then set expire time.
if l.g.R.TTL(ctx, email).Val() == -1 { if g.R.TTL(ctx, email).Val() == -1 {
if err := l.g.R.Expire(ctx, email, time.Duration(l.g.Expiry)*time.Minute).Err(); err != nil { if err := g.R.Expire(ctx, email, time.Duration(g.Expiry)*time.Minute).Err(); err != nil {
newError(fmt.Errorf("redis: %v", err)).AtError().WriteToLog() newError(fmt.Errorf("redis: %v", err)).AtError().WriteToLog()
} }
} }

View File

@ -12,13 +12,6 @@ ConnectionConfig:
UplinkOnly: 2 # Time limit when the connection downstream is closed, Second UplinkOnly: 2 # Time limit when the connection downstream is closed, Second
DownlinkOnly: 4 # Time limit when the connection is closed after the uplink is closed, Second DownlinkOnly: 4 # Time limit when the connection is closed after the uplink is closed, Second
BufferSize: 64 # The internal cache size of each connection, kB BufferSize: 64 # The internal cache size of each connection, kB
GlobalDeviceLimitConfig:
Enable: true # Enable the global device limit of a user
RedisAddr: 10.0.0.188:6379 # The redis server address
RedisPassword: # Redis password
RedisDB: 0 # Redis DB
Timeout: 5 # Timeout for redis request
Expiry: 60 # Expiry time (second)
Nodes: Nodes:
- -
PanelType: "SSpanel" # Panel type: SSpanel, V2board, NewV2board, PMpanel, Proxypanel, V2RaySocks PanelType: "SSpanel" # Panel type: SSpanel, V2board, NewV2board, PMpanel, Proxypanel, V2RaySocks
@ -45,6 +38,13 @@ Nodes:
WarnTimes: 0 # After (WarnTimes) consecutive warnings, the user will be limited. Set to 0 to punish overspeed user immediately. WarnTimes: 0 # After (WarnTimes) consecutive warnings, the user will be limited. Set to 0 to punish overspeed user immediately.
LimitSpeed: 0 # The speedlimit of a limited user (unit: mbps) LimitSpeed: 0 # The speedlimit of a limited user (unit: mbps)
LimitDuration: 0 # How many minutes will the limiting last (unit: minute) LimitDuration: 0 # How many minutes will the limiting last (unit: minute)
GlobalDeviceLimitConfig:
Enable: false # Enable the global device limit of a user
RedisAddr: 127.0.0.1:6379 # The redis server address
RedisPassword: YOUR PASSWORD # Redis password
RedisDB: 0 # Redis DB
Timeout: 5 # Timeout for redis request
Expiry: 60 # Expiry time (second)
EnableFallback: false # Only support for Trojan and Vless EnableFallback: false # Only support for Trojan and Vless
FallBackConfigs: # Support multiple fallbacks FallBackConfigs: # Support multiple fallbacks
- -

View File

@ -2,19 +2,17 @@ package panel
import ( import (
"github.com/XrayR-project/XrayR/api" "github.com/XrayR-project/XrayR/api"
"github.com/XrayR-project/XrayR/common/limiter"
"github.com/XrayR-project/XrayR/service/controller" "github.com/XrayR-project/XrayR/service/controller"
) )
type Config struct { type Config struct {
LogConfig *LogConfig `mapstructure:"Log"` LogConfig *LogConfig `mapstructure:"Log"`
DnsConfigPath string `mapstructure:"DnsConfigPath"` DnsConfigPath string `mapstructure:"DnsConfigPath"`
InboundConfigPath string `mapstructure:"InboundConfigPath"` InboundConfigPath string `mapstructure:"InboundConfigPath"`
OutboundConfigPath string `mapstructure:"OutboundConfigPath"` OutboundConfigPath string `mapstructure:"OutboundConfigPath"`
RouteConfigPath string `mapstructure:"RouteConfigPath"` RouteConfigPath string `mapstructure:"RouteConfigPath"`
ConnectionConfig *ConnectionConfig `mapstructure:"ConnectionConfig"` ConnectionConfig *ConnectionConfig `mapstructure:"ConnectionConfig"`
GlobalDeviceLimitConfig *limiter.GlobalDeviceLimitConfig `mapstructure:"GlobalDeviceLimitConfig"` NodesConfig []*NodesConfig `mapstructure:"Nodes"`
NodesConfig []*NodesConfig `mapstructure:"Nodes"`
} }
type NodesConfig struct { type NodesConfig struct {

View File

@ -164,10 +164,6 @@ func (p *Panel) Start() {
} }
p.Server = server p.Server = server
if p.panelConfig.GlobalDeviceLimitConfig.Enable {
log.Println("Global limit: Enable")
}
// Load Nodes config // Load Nodes config
for _, nodeConfig := range p.panelConfig.NodesConfig { for _, nodeConfig := range p.panelConfig.NodesConfig {
var apiClient api.API var apiClient api.API
@ -196,7 +192,7 @@ func (p *Panel) Start() {
log.Panicf("Read Controller Config Failed") log.Panicf("Read Controller Config Failed")
} }
} }
controllerService = controller.New(server, apiClient, controllerConfig, nodeConfig.PanelType, p.panelConfig.GlobalDeviceLimitConfig) controllerService = controller.New(server, apiClient, controllerConfig, nodeConfig.PanelType)
p.Service = append(p.Service, controllerService) p.Service = append(p.Service, controllerService)
} }

View File

@ -1,24 +1,26 @@
package controller package controller
import ( import (
"github.com/XrayR-project/XrayR/common/limiter"
"github.com/XrayR-project/XrayR/common/mylego" "github.com/XrayR-project/XrayR/common/mylego"
) )
type Config struct { type Config struct {
ListenIP string `mapstructure:"ListenIP"` ListenIP string `mapstructure:"ListenIP"`
SendIP string `mapstructure:"SendIP"` SendIP string `mapstructure:"SendIP"`
UpdatePeriodic int `mapstructure:"UpdatePeriodic"` UpdatePeriodic int `mapstructure:"UpdatePeriodic"`
CertConfig *mylego.CertConfig `mapstructure:"CertConfig"` CertConfig *mylego.CertConfig `mapstructure:"CertConfig"`
EnableDNS bool `mapstructure:"EnableDNS"` EnableDNS bool `mapstructure:"EnableDNS"`
DNSType string `mapstructure:"DNSType"` DNSType string `mapstructure:"DNSType"`
DisableUploadTraffic bool `mapstructure:"DisableUploadTraffic"` DisableUploadTraffic bool `mapstructure:"DisableUploadTraffic"`
DisableGetRule bool `mapstructure:"DisableGetRule"` DisableGetRule bool `mapstructure:"DisableGetRule"`
EnableProxyProtocol bool `mapstructure:"EnableProxyProtocol"` EnableProxyProtocol bool `mapstructure:"EnableProxyProtocol"`
EnableFallback bool `mapstructure:"EnableFallback"` EnableFallback bool `mapstructure:"EnableFallback"`
DisableIVCheck bool `mapstructure:"DisableIVCheck"` DisableIVCheck bool `mapstructure:"DisableIVCheck"`
DisableSniffing bool `mapstructure:"DisableSniffing"` DisableSniffing bool `mapstructure:"DisableSniffing"`
AutoSpeedLimitConfig *AutoSpeedLimitConfig `mapstructure:"AutoSpeedLimitConfig"` AutoSpeedLimitConfig *AutoSpeedLimitConfig `mapstructure:"AutoSpeedLimitConfig"`
FallBackConfigs []*FallBackConfig `mapstructure:"FallBackConfigs"` GlobalDeviceLimitConfig *limiter.GlobalDeviceLimitConfig `mapstructure:"GlobalDeviceLimitConfig"`
FallBackConfigs []*FallBackConfig `mapstructure:"FallBackConfigs"`
} }
type AutoSpeedLimitConfig struct { type AutoSpeedLimitConfig struct {

View File

@ -47,7 +47,6 @@ type Controller struct {
stm stats.Manager stm stats.Manager
dispatcher *mydispatcher.DefaultDispatcher dispatcher *mydispatcher.DefaultDispatcher
startAt time.Time startAt time.Time
g *limiter.GlobalDeviceLimitConfig
} }
type periodicTask struct { type periodicTask struct {
@ -56,7 +55,7 @@ type periodicTask struct {
} }
// New return a Controller service with default parameters. // New return a Controller service with default parameters.
func New(server *core.Instance, api api.API, config *Config, panelType string, globalConfig *limiter.GlobalDeviceLimitConfig) *Controller { func New(server *core.Instance, api api.API, config *Config, panelType string) *Controller {
controller := &Controller{ controller := &Controller{
server: server, server: server,
config: config, config: config,
@ -69,16 +68,6 @@ func New(server *core.Instance, api api.API, config *Config, panelType string, g
startAt: time.Now(), startAt: time.Now(),
} }
// Init global limit redis client
if globalConfig.Enable {
globalConfig.R = redis.NewClient(&redis.Options{
Addr: globalConfig.RedisAddr,
Password: globalConfig.RedisPassword,
DB: globalConfig.RedisDB,
})
controller.g = globalConfig
}
return controller return controller
} }
@ -113,7 +102,7 @@ func (c *Controller) Start() error {
} }
// Add Limiter // Add Limiter
if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, userInfo, c.g); err != nil { if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, userInfo, c.initGlobal()); err != nil {
log.Print(err) log.Print(err)
} }
@ -164,7 +153,7 @@ func (c *Controller) Start() error {
} }
// Check global limit in need // Check global limit in need
if c.g.Enable { if c.config.GlobalDeviceLimitConfig.Enable {
c.tasks = append(c.tasks, c.tasks = append(c.tasks,
periodicTask{ periodicTask{
tag: "global limit", tag: "global limit",
@ -273,8 +262,9 @@ func (c *Controller) nodeInfoMonitor() (err error) {
log.Print(err) log.Print(err)
return nil return nil
} }
// Add Limiter // Add Limiter
if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, newUserInfo, c.g); err != nil { if err := c.AddInboundLimiter(c.Tag, newNodeInfo.SpeedLimit, newUserInfo, c.initGlobal()); err != nil {
log.Print(err) log.Print(err)
return nil return nil
} }
@ -309,6 +299,20 @@ func (c *Controller) nodeInfoMonitor() (err error) {
return nil return nil
} }
func (c *Controller) initGlobal() *limiter.GlobalDeviceLimitConfig {
// Init global limit redis client
globalConfig := c.config.GlobalDeviceLimitConfig
if c.config.GlobalDeviceLimitConfig.Enable {
log.Printf("[%s] Global limit: enable", c.Tag)
globalConfig.R = redis.NewClient(&redis.Options{
Addr: globalConfig.RedisAddr,
Password: globalConfig.RedisPassword,
DB: globalConfig.RedisDB,
})
}
return globalConfig
}
func (c *Controller) removeOldTag(oldTag string) (err error) { func (c *Controller) removeOldTag(oldTag string) (err error) {
err = c.removeInbound(oldTag) err = c.removeInbound(oldTag)
if err != nil { if err != nil {
@ -653,27 +657,27 @@ func (c *Controller) globalLimitFetch() (err error) {
inboundInfo := value.(*limiter.InboundInfo) inboundInfo := value.(*limiter.InboundInfo)
for { for {
if emails, cursor, err = c.g.R.Scan(ctx, cursor, "*", 10000).Result(); err != nil { if emails, cursor, err = c.config.GlobalDeviceLimitConfig.R.Scan(ctx, cursor, "*", 10000).Result(); err != nil {
newError(err).AtError().WriteToLog() newError(err).AtError().WriteToLog()
} }
pipe := c.g.R.Pipeline() pipe := c.config.GlobalDeviceLimitConfig.R.Pipeline()
cmdMap := make(map[string]*redis.StringStringMapCmd) cmdMap := make(map[string]*redis.StringSliceCmd)
for i := range emails { for i := range emails {
email := emails[i] email := emails[i]
cmdMap[email] = pipe.HGetAll(ctx, email) cmdMap[email] = pipe.SMembers(ctx, email)
} }
if _, err := pipe.Exec(ctx); err != nil { if _, err := pipe.Exec(ctx); err != nil {
newError(fmt.Errorf("redis: %v", err)).AtError().WriteToLog() newError(fmt.Errorf("redis: %v", err)).AtError().WriteToLog()
} else { } else {
inboundInfo.GlobalOnlineIP = new(sync.Map) inboundInfo.GlobalLimit.OnlineIP = new(sync.Map)
for k := range cmdMap { for k := range cmdMap {
ips := cmdMap[k].Val() ips := cmdMap[k].Val()
ipMap := new(sync.Map) ipMap := new(sync.Map)
for i := range ips { for i := range ips {
ipMap.Store(i, 0) ipMap.Store(ips[i], 0)
inboundInfo.GlobalOnlineIP.Store(k, ipMap) inboundInfo.GlobalLimit.OnlineIP.Store(k, ipMap)
} }
} }
} }