mirror of
https://github.com/XrayR-project/XrayR.git
synced 2025-06-08 13:29:54 +00:00
289 lines
8.0 KiB
Go
289 lines
8.0 KiB
Go
// Package limiter is to control the links that go into the dispatcher
|
|
package limiter
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/eko/gocache/lib/v4/cache"
|
|
"github.com/eko/gocache/lib/v4/marshaler"
|
|
"github.com/eko/gocache/lib/v4/store"
|
|
goCacheStore "github.com/eko/gocache/store/go_cache/v4"
|
|
redisStore "github.com/eko/gocache/store/redis/v4"
|
|
"github.com/go-redis/redis/v8"
|
|
goCache "github.com/patrickmn/go-cache"
|
|
"golang.org/x/time/rate"
|
|
|
|
"github.com/XrayR-project/XrayR/api"
|
|
)
|
|
|
|
type UserInfo struct {
|
|
UID int
|
|
SpeedLimit uint64
|
|
DeviceLimit int
|
|
}
|
|
|
|
type InboundInfo struct {
|
|
Tag string
|
|
NodeSpeedLimit uint64
|
|
UserInfo *sync.Map // Key: Email value: UserInfo
|
|
BucketHub *sync.Map // key: Email, value: *rate.Limiter
|
|
UserOnlineIP *sync.Map // Key: Email, value: {Key: IP, value: UID}
|
|
GlobalLimit struct {
|
|
config *GlobalDeviceLimitConfig
|
|
globalOnlineIP *marshaler.Marshaler
|
|
}
|
|
}
|
|
|
|
type Limiter struct {
|
|
InboundInfo *sync.Map // Key: Tag, Value: *InboundInfo
|
|
}
|
|
|
|
func New() *Limiter {
|
|
return &Limiter{
|
|
InboundInfo: new(sync.Map),
|
|
}
|
|
}
|
|
|
|
func (l *Limiter) AddInboundLimiter(tag string, nodeSpeedLimit uint64, userList *[]api.UserInfo, globalLimit *GlobalDeviceLimitConfig) error {
|
|
inboundInfo := &InboundInfo{
|
|
Tag: tag,
|
|
NodeSpeedLimit: nodeSpeedLimit,
|
|
BucketHub: new(sync.Map),
|
|
UserOnlineIP: new(sync.Map),
|
|
}
|
|
|
|
if globalLimit != nil && globalLimit.Enable {
|
|
inboundInfo.GlobalLimit.config = globalLimit
|
|
|
|
// init local store
|
|
gs := goCacheStore.NewGoCache(goCache.New(time.Duration(globalLimit.Expiry)*time.Second, 1*time.Minute))
|
|
|
|
// init redis store
|
|
rs := redisStore.NewRedis(redis.NewClient(
|
|
&redis.Options{
|
|
Addr: globalLimit.RedisAddr,
|
|
Password: globalLimit.RedisPassword,
|
|
DB: globalLimit.RedisDB,
|
|
}),
|
|
store.WithExpiration(time.Duration(globalLimit.Expiry)*time.Second))
|
|
|
|
// init chained cache. First use local go-cache, if go-cache is nil, then use redis cache
|
|
cacheManager := cache.NewChain[any](
|
|
cache.New[any](gs), // go-cache is priority
|
|
cache.New[any](rs),
|
|
)
|
|
inboundInfo.GlobalLimit.globalOnlineIP = marshaler.New(cacheManager)
|
|
}
|
|
|
|
userMap := new(sync.Map)
|
|
for _, u := range *userList {
|
|
userMap.Store(fmt.Sprintf("%s|%s|%d", tag, u.Email, u.UID), UserInfo{
|
|
UID: u.UID,
|
|
SpeedLimit: u.SpeedLimit,
|
|
DeviceLimit: u.DeviceLimit,
|
|
})
|
|
}
|
|
inboundInfo.UserInfo = userMap
|
|
l.InboundInfo.Store(tag, inboundInfo) // Replace the old inbound info
|
|
return nil
|
|
}
|
|
|
|
func (l *Limiter) UpdateInboundLimiter(tag string, updatedUserList *[]api.UserInfo) error {
|
|
if value, ok := l.InboundInfo.Load(tag); ok {
|
|
inboundInfo := value.(*InboundInfo)
|
|
// Update User info
|
|
for _, u := range *updatedUserList {
|
|
inboundInfo.UserInfo.Store(fmt.Sprintf("%s|%s|%d", tag, u.Email, u.UID), UserInfo{
|
|
UID: u.UID,
|
|
SpeedLimit: u.SpeedLimit,
|
|
DeviceLimit: u.DeviceLimit,
|
|
})
|
|
// Update old limiter bucket
|
|
limit := determineRate(inboundInfo.NodeSpeedLimit, u.SpeedLimit)
|
|
if limit > 0 {
|
|
if bucket, ok := inboundInfo.BucketHub.Load(fmt.Sprintf("%s|%s|%d", tag, u.Email, u.UID)); ok {
|
|
limiter := bucket.(*rate.Limiter)
|
|
limiter.SetLimit(rate.Limit(limit))
|
|
limiter.SetBurst(int(limit))
|
|
}
|
|
} else {
|
|
inboundInfo.BucketHub.Delete(fmt.Sprintf("%s|%s|%d", tag, u.Email, u.UID))
|
|
}
|
|
}
|
|
} else {
|
|
return fmt.Errorf("no such inbound in limiter: %s", tag)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (l *Limiter) DeleteInboundLimiter(tag string) error {
|
|
l.InboundInfo.Delete(tag)
|
|
return nil
|
|
}
|
|
|
|
func (l *Limiter) GetOnlineDevice(tag string) (*[]api.OnlineUser, error) {
|
|
var onlineUser []api.OnlineUser
|
|
|
|
if value, ok := l.InboundInfo.Load(tag); ok {
|
|
inboundInfo := value.(*InboundInfo)
|
|
// Clear Speed Limiter bucket for users who are not online
|
|
inboundInfo.BucketHub.Range(func(key, value interface{}) bool {
|
|
email := key.(string)
|
|
if _, exists := inboundInfo.UserOnlineIP.Load(email); !exists {
|
|
inboundInfo.BucketHub.Delete(email)
|
|
}
|
|
return true
|
|
})
|
|
inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool {
|
|
email := key.(string)
|
|
ipMap := value.(*sync.Map)
|
|
ipMap.Range(func(key, value interface{}) bool {
|
|
uid := value.(int)
|
|
ip := key.(string)
|
|
onlineUser = append(onlineUser, api.OnlineUser{UID: uid, IP: ip})
|
|
return true
|
|
})
|
|
inboundInfo.UserOnlineIP.Delete(email) // Reset online device
|
|
return true
|
|
})
|
|
} else {
|
|
return nil, fmt.Errorf("no such inbound in limiter: %s", tag)
|
|
}
|
|
|
|
return &onlineUser, nil
|
|
}
|
|
|
|
func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *rate.Limiter, SpeedLimit bool, Reject bool) {
|
|
if value, ok := l.InboundInfo.Load(tag); ok {
|
|
var (
|
|
userLimit uint64 = 0
|
|
deviceLimit, uid int
|
|
)
|
|
|
|
inboundInfo := value.(*InboundInfo)
|
|
nodeLimit := inboundInfo.NodeSpeedLimit
|
|
|
|
if v, ok := inboundInfo.UserInfo.Load(email); ok {
|
|
u := v.(UserInfo)
|
|
uid = u.UID
|
|
userLimit = u.SpeedLimit
|
|
deviceLimit = u.DeviceLimit
|
|
}
|
|
|
|
// Local device limit
|
|
ipMap := new(sync.Map)
|
|
ipMap.Store(ip, uid)
|
|
// If any device is online
|
|
if v, ok := inboundInfo.UserOnlineIP.LoadOrStore(email, ipMap); ok {
|
|
ipMap := v.(*sync.Map)
|
|
// If this is a new ip
|
|
if _, ok := ipMap.LoadOrStore(ip, uid); !ok {
|
|
counter := 0
|
|
ipMap.Range(func(key, value interface{}) bool {
|
|
counter++
|
|
return true
|
|
})
|
|
if counter > deviceLimit && deviceLimit > 0 {
|
|
ipMap.Delete(ip)
|
|
return nil, false, true
|
|
}
|
|
}
|
|
}
|
|
|
|
// GlobalLimit
|
|
if inboundInfo.GlobalLimit.config != nil && inboundInfo.GlobalLimit.config.Enable {
|
|
if reject := globalLimit(inboundInfo, email, uid, ip, deviceLimit); reject {
|
|
return nil, false, true
|
|
}
|
|
}
|
|
|
|
// Speed limit
|
|
limit := determineRate(nodeLimit, userLimit) // Determine the speed limit rate
|
|
if limit > 0 {
|
|
limiter := rate.NewLimiter(rate.Limit(limit), int(limit)) // Byte/s
|
|
if v, ok := inboundInfo.BucketHub.LoadOrStore(email, limiter); ok {
|
|
bucket := v.(*rate.Limiter)
|
|
return bucket, true, false
|
|
} else {
|
|
return limiter, true, false
|
|
}
|
|
} else {
|
|
return nil, false, false
|
|
}
|
|
} else {
|
|
newError("Get Inbound Limiter information failed").AtDebug().WriteToLog()
|
|
return nil, false, false
|
|
}
|
|
}
|
|
|
|
// Global device limit
|
|
func globalLimit(inboundInfo *InboundInfo, email string, uid int, ip string, deviceLimit int) bool {
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(inboundInfo.GlobalLimit.config.Timeout)*time.Second)
|
|
defer cancel()
|
|
|
|
// reformat email for unique key
|
|
uniqueKey := strings.Replace(email, inboundInfo.Tag, strconv.Itoa(deviceLimit), 1)
|
|
|
|
v, err := inboundInfo.GlobalLimit.globalOnlineIP.Get(ctx, uniqueKey, new(map[string]int))
|
|
if err != nil {
|
|
if _, ok := err.(*store.NotFound); ok {
|
|
// If the email is a new device
|
|
go pushIP(inboundInfo, uniqueKey, &map[string]int{ip: uid})
|
|
} else {
|
|
newError("cache service").Base(err).AtError().WriteToLog()
|
|
}
|
|
return false
|
|
}
|
|
|
|
ipMap := v.(*map[string]int)
|
|
// Reject device reach limit directly
|
|
if deviceLimit > 0 && len(*ipMap) > deviceLimit {
|
|
return true
|
|
}
|
|
|
|
// If the ip is not in cache
|
|
if _, ok := (*ipMap)[ip]; !ok {
|
|
(*ipMap)[ip] = uid
|
|
go pushIP(inboundInfo, email, ipMap)
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// push the ip to cache
|
|
func pushIP(inboundInfo *InboundInfo, email string, ipMap *map[string]int) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(inboundInfo.GlobalLimit.config.Timeout)*time.Second)
|
|
defer cancel()
|
|
|
|
if err := inboundInfo.GlobalLimit.globalOnlineIP.Set(ctx, email, ipMap); err != nil {
|
|
newError("cache service").Base(err).AtError().WriteToLog()
|
|
}
|
|
}
|
|
|
|
// determineRate returns the minimum non-zero rate
|
|
func determineRate(nodeLimit, userLimit uint64) (limit uint64) {
|
|
if nodeLimit == 0 || userLimit == 0 {
|
|
if nodeLimit > userLimit {
|
|
return nodeLimit
|
|
} else if nodeLimit < userLimit {
|
|
return userLimit
|
|
} else {
|
|
return 0
|
|
}
|
|
} else {
|
|
if nodeLimit > userLimit {
|
|
return userLimit
|
|
} else if nodeLimit < userLimit {
|
|
return nodeLimit
|
|
} else {
|
|
return nodeLimit
|
|
}
|
|
}
|
|
}
|