// Package limiter is to control the links that go into the dispatcher package limiter import ( "context" "fmt" "strconv" "strings" "sync" "time" "github.com/eko/gocache/lib/v4/cache" "github.com/eko/gocache/lib/v4/marshaler" "github.com/eko/gocache/lib/v4/store" goCacheStore "github.com/eko/gocache/store/go_cache/v4" redisStore "github.com/eko/gocache/store/redis/v4" goCache "github.com/patrickmn/go-cache" "github.com/redis/go-redis/v9" "golang.org/x/time/rate" "github.com/XrayR-project/XrayR/api" ) type UserInfo struct { UID int SpeedLimit uint64 DeviceLimit int } type InboundInfo struct { Tag string NodeSpeedLimit uint64 UserInfo *sync.Map // Key: Email value: UserInfo BucketHub *sync.Map // key: Email, value: *rate.Limiter UserOnlineIP *sync.Map // Key: Email, value: {Key: IP, value: UID} GlobalLimit struct { config *GlobalDeviceLimitConfig globalOnlineIP *marshaler.Marshaler } } type Limiter struct { InboundInfo *sync.Map // Key: Tag, Value: *InboundInfo } func New() *Limiter { return &Limiter{ InboundInfo: new(sync.Map), } } func (l *Limiter) AddInboundLimiter(tag string, nodeSpeedLimit uint64, userList *[]api.UserInfo, globalLimit *GlobalDeviceLimitConfig) error { inboundInfo := &InboundInfo{ Tag: tag, NodeSpeedLimit: nodeSpeedLimit, BucketHub: new(sync.Map), UserOnlineIP: new(sync.Map), } if globalLimit != nil && globalLimit.Enable { inboundInfo.GlobalLimit.config = globalLimit // init local store gs := goCacheStore.NewGoCache(goCache.New(time.Duration(globalLimit.Expiry)*time.Second, 1*time.Minute)) // init redis store rs := redisStore.NewRedis(redis.NewClient( &redis.Options{ Addr: globalLimit.RedisAddr, Password: globalLimit.RedisPassword, DB: globalLimit.RedisDB, }), store.WithExpiration(time.Duration(globalLimit.Expiry)*time.Second)) // init chained cache. First use local go-cache, if go-cache is nil, then use redis cache cacheManager := cache.NewChain[any]( cache.New[any](gs), // go-cache is priority cache.New[any](rs), ) inboundInfo.GlobalLimit.globalOnlineIP = marshaler.New(cacheManager) } userMap := new(sync.Map) for _, u := range *userList { userMap.Store(fmt.Sprintf("%s|%s|%d", tag, u.Email, u.UID), UserInfo{ UID: u.UID, SpeedLimit: u.SpeedLimit, DeviceLimit: u.DeviceLimit, }) } inboundInfo.UserInfo = userMap l.InboundInfo.Store(tag, inboundInfo) // Replace the old inbound info return nil } func (l *Limiter) UpdateInboundLimiter(tag string, updatedUserList *[]api.UserInfo) error { if value, ok := l.InboundInfo.Load(tag); ok { inboundInfo := value.(*InboundInfo) // Update User info for _, u := range *updatedUserList { inboundInfo.UserInfo.Store(fmt.Sprintf("%s|%s|%d", tag, u.Email, u.UID), UserInfo{ UID: u.UID, SpeedLimit: u.SpeedLimit, DeviceLimit: u.DeviceLimit, }) // Update old limiter bucket limit := determineRate(inboundInfo.NodeSpeedLimit, u.SpeedLimit) if limit > 0 { if bucket, ok := inboundInfo.BucketHub.Load(fmt.Sprintf("%s|%s|%d", tag, u.Email, u.UID)); ok { limiter := bucket.(*rate.Limiter) limiter.SetLimit(rate.Limit(limit)) limiter.SetBurst(int(limit)) } } else { inboundInfo.BucketHub.Delete(fmt.Sprintf("%s|%s|%d", tag, u.Email, u.UID)) } } } else { return fmt.Errorf("no such inbound in limiter: %s", tag) } return nil } func (l *Limiter) DeleteInboundLimiter(tag string) error { l.InboundInfo.Delete(tag) return nil } func (l *Limiter) GetOnlineDevice(tag string) (*[]api.OnlineUser, error) { var onlineUser []api.OnlineUser if value, ok := l.InboundInfo.Load(tag); ok { inboundInfo := value.(*InboundInfo) // Clear Speed Limiter bucket for users who are not online inboundInfo.BucketHub.Range(func(key, value interface{}) bool { email := key.(string) if _, exists := inboundInfo.UserOnlineIP.Load(email); !exists { inboundInfo.BucketHub.Delete(email) } return true }) inboundInfo.UserOnlineIP.Range(func(key, value interface{}) bool { email := key.(string) ipMap := value.(*sync.Map) ipMap.Range(func(key, value interface{}) bool { uid := value.(int) ip := key.(string) onlineUser = append(onlineUser, api.OnlineUser{UID: uid, IP: ip}) return true }) inboundInfo.UserOnlineIP.Delete(email) // Reset online device return true }) } else { return nil, fmt.Errorf("no such inbound in limiter: %s", tag) } return &onlineUser, nil } func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *rate.Limiter, SpeedLimit bool, Reject bool) { if value, ok := l.InboundInfo.Load(tag); ok { var ( userLimit uint64 = 0 deviceLimit, uid int ) inboundInfo := value.(*InboundInfo) nodeLimit := inboundInfo.NodeSpeedLimit if v, ok := inboundInfo.UserInfo.Load(email); ok { u := v.(UserInfo) uid = u.UID userLimit = u.SpeedLimit deviceLimit = u.DeviceLimit } // Local device limit ipMap := new(sync.Map) ipMap.Store(ip, uid) // If any device is online if v, ok := inboundInfo.UserOnlineIP.LoadOrStore(email, ipMap); ok { ipMap := v.(*sync.Map) // If this is a new ip if _, ok := ipMap.LoadOrStore(ip, uid); !ok { counter := 0 ipMap.Range(func(key, value interface{}) bool { counter++ return true }) if counter > deviceLimit && deviceLimit > 0 { ipMap.Delete(ip) return nil, false, true } } } // GlobalLimit if inboundInfo.GlobalLimit.config != nil && inboundInfo.GlobalLimit.config.Enable { if reject := globalLimit(inboundInfo, email, uid, ip, deviceLimit); reject { return nil, false, true } } // Speed limit limit := determineRate(nodeLimit, userLimit) // Determine the speed limit rate if limit > 0 { limiter := rate.NewLimiter(rate.Limit(limit), int(limit)) // Byte/s if v, ok := inboundInfo.BucketHub.LoadOrStore(email, limiter); ok { bucket := v.(*rate.Limiter) return bucket, true, false } else { return limiter, true, false } } else { return nil, false, false } } else { newError("Get Inbound Limiter information failed").AtDebug().WriteToLog() return nil, false, false } } // Global device limit func globalLimit(inboundInfo *InboundInfo, email string, uid int, ip string, deviceLimit int) bool { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(inboundInfo.GlobalLimit.config.Timeout)*time.Second) defer cancel() // reformat email for unique key uniqueKey := strings.Replace(email, inboundInfo.Tag, strconv.Itoa(deviceLimit), 1) v, err := inboundInfo.GlobalLimit.globalOnlineIP.Get(ctx, uniqueKey, new(map[string]int)) if err != nil { if _, ok := err.(*store.NotFound); ok { // If the email is a new device go pushIP(inboundInfo, uniqueKey, &map[string]int{ip: uid}) } else { newError("cache service").Base(err).AtError().WriteToLog() } return false } ipMap := v.(*map[string]int) // Reject device reach limit directly if deviceLimit > 0 && len(*ipMap) > deviceLimit { return true } // If the ip is not in cache if _, ok := (*ipMap)[ip]; !ok { (*ipMap)[ip] = uid go pushIP(inboundInfo, uniqueKey, ipMap) } return false } // push the ip to cache func pushIP(inboundInfo *InboundInfo, uniqueKey string, ipMap *map[string]int) { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(inboundInfo.GlobalLimit.config.Timeout)*time.Second) defer cancel() if err := inboundInfo.GlobalLimit.globalOnlineIP.Set(ctx, uniqueKey, ipMap); err != nil { newError("cache service").Base(err).AtError().WriteToLog() } } // determineRate returns the minimum non-zero rate func determineRate(nodeLimit, userLimit uint64) (limit uint64) { if nodeLimit == 0 || userLimit == 0 { if nodeLimit > userLimit { return nodeLimit } else if nodeLimit < userLimit { return userLimit } else { return 0 } } else { if nodeLimit > userLimit { return userLimit } else if nodeLimit < userLimit { return nodeLimit } else { return nodeLimit } } }