feat: dynamic update speedlimit bucket

This commit is contained in:
pocketW 2022-10-13 08:37:59 +11:00
parent 2f0461ddda
commit 79528d3e17
6 changed files with 35 additions and 44 deletions

View File

@ -1,20 +0,0 @@
name: Sync to Gitlab
on:
push:
delete:
workflow_dispatch:
jobs:
to_gitlab:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- uses: pixta-dev/repository-mirroring-action@v1
with:
target_repo_url:
git@gitlab.com:xrayr-project/XrayR.git
ssh_private_key:
${{ secrets.SSH_PRIVATEKEY }}

View File

@ -4,10 +4,9 @@ package limiter
import (
"fmt"
"sync"
"time"
"github.com/XrayR-project/XrayR/api"
"github.com/juju/ratelimit"
"golang.org/x/time/rate"
)
type UserInfo struct {
@ -20,7 +19,7 @@ type InboundInfo struct {
Tag string
NodeSpeedLimit uint64
UserInfo *sync.Map // Key: Email value: UserInfo
BucketHub *sync.Map // key: Email, value: *ratelimit.Bucket
BucketHub *sync.Map // key: Email, value: *rate.Limiter
UserOnlineIP *sync.Map // Key: Email Value: *sync.Map: Key: IP, Value: UID
}
@ -65,7 +64,17 @@ func (l *Limiter) UpdateInboundLimiter(tag string, updatedUserList *[]api.UserIn
SpeedLimit: u.SpeedLimit,
DeviceLimit: u.DeviceLimit,
})
inboundInfo.BucketHub.Delete(fmt.Sprintf("%s|%s|%d", tag, u.Email, u.UID)) // Delete old limiter bucket
// Update old limiter bucket
limit := determineRate(inboundInfo.NodeSpeedLimit, u.SpeedLimit)
if limit > 0 {
if bucket, ok := inboundInfo.BucketHub.Load(fmt.Sprintf("%s|%s|%d", tag, u.Email, u.UID)); ok {
limiter := bucket.(*rate.Limiter)
limiter.SetLimit(rate.Limit(limit))
limiter.SetBurst(int(limit))
}
} else {
inboundInfo.BucketHub.Delete(fmt.Sprintf("%s|%s|%d", tag, u.Email, u.UID))
}
}
} else {
return fmt.Errorf("no such inbound in limiter: %s", tag)
@ -108,7 +117,7 @@ func (l *Limiter) GetOnlineDevice(tag string) (*[]api.OnlineUser, error) {
return &onlineUser, nil
}
func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *ratelimit.Bucket, SpeedLimit bool, Reject bool) {
func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *rate.Limiter, SpeedLimit bool, Reject bool) {
if value, ok := l.InboundInfo.Load(tag); ok {
inboundInfo := value.(*InboundInfo)
nodeLimit := inboundInfo.NodeSpeedLimit
@ -142,9 +151,9 @@ func (l *Limiter) GetUserBucket(tag string, email string, ip string) (limiter *r
}
limit := determineRate(nodeLimit, userLimit) // If need the Speed limit
if limit > 0 {
limiter := ratelimit.NewBucketWithQuantum(time.Duration(int64(time.Second)), int64(limit), int64(limit)) // Byte/s
limiter := rate.NewLimiter(rate.Limit(limit), int(limit)) // Byte/s
if v, ok := inboundInfo.BucketHub.LoadOrStore(email, limiter); ok {
bucket := v.(*ratelimit.Bucket)
bucket := v.(*rate.Limiter)
return bucket, true, false
} else {
return limiter, true, false

View File

@ -3,18 +3,20 @@ package limiter
import (
"io"
"github.com/juju/ratelimit"
"context"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"golang.org/x/time/rate"
)
type Writer struct {
writer buf.Writer
limiter *ratelimit.Bucket
limiter *rate.Limiter
w io.Writer
}
func (l *Limiter) RateWriter(writer buf.Writer, limiter *ratelimit.Bucket) buf.Writer {
func (l *Limiter) RateWriter(writer buf.Writer, limiter *rate.Limiter) buf.Writer {
return &Writer{
writer: writer,
limiter: limiter,
@ -26,6 +28,7 @@ func (w *Writer) Close() error {
}
func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error {
w.limiter.Wait(int64(mb.Len()))
ctx := context.Background()
w.limiter.WaitN(ctx, int(mb.Len()))
return w.writer.WriteMultiBuffer(mb)
}

3
go.mod
View File

@ -9,7 +9,6 @@ require (
github.com/go-acme/lego/v4 v4.9.0
github.com/go-resty/resty/v2 v2.7.0
github.com/imdario/mergo v0.3.13
github.com/juju/ratelimit v1.0.2
github.com/r3labs/diff/v2 v2.15.1
github.com/shirou/gopsutil/v3 v3.22.9
github.com/spf13/viper v1.13.0
@ -17,6 +16,7 @@ require (
github.com/urfave/cli v1.22.10
github.com/xtls/xray-core v1.6.0
golang.org/x/net v0.0.0-20220909164309-bea034e7d591
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
google.golang.org/protobuf v1.28.1
)
@ -164,7 +164,6 @@ require (
golang.org/x/oauth2 v0.0.0-20220909003341-f21342109be1 // indirect
golang.org/x/sys v0.0.0-20220915200043-7b5979e65e41 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
golang.org/x/tools v0.1.12 // indirect
google.golang.org/api v0.84.0 // indirect
google.golang.org/appengine v1.6.7 // indirect

2
go.sum
View File

@ -434,8 +434,6 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/juju/ratelimit v1.0.2 h1:sRxmtRiajbvrcLQT7S+JbqU0ntsb9W2yhSdNN8tWfaI=
github.com/juju/ratelimit v1.0.2/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213 h1:qGQQKEcAR99REcMpsXCp3lJ03zYT1PkRd3kQGPn9GVg=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=

View File

@ -20,8 +20,9 @@ import (
)
type LimitInfo struct {
end int64
originSpeedLimit uint64
end int64
currentSpeedLimit int
originSpeedLimit uint64
}
type Controller struct {
@ -425,11 +426,12 @@ func compareUserList(old, new *[]api.UserInfo) (deleted, added []api.UserInfo) {
func limitUser(c *Controller, user api.UserInfo, silentUsers *[]api.UserInfo) {
c.limitedUsers[user] = LimitInfo{
end: time.Now().Unix() + int64(c.config.AutoSpeedLimitConfig.LimitDuration*60),
originSpeedLimit: user.SpeedLimit,
end: time.Now().Unix() + int64(c.config.AutoSpeedLimitConfig.LimitDuration*60),
currentSpeedLimit: c.config.AutoSpeedLimitConfig.LimitSpeed,
originSpeedLimit: user.SpeedLimit,
}
log.Printf(" User: %s Speed: %d End: %s", user.Email, user.SpeedLimit, time.Unix(c.limitedUsers[user].end, 0).Format("01-02 15:04:05"))
user.SpeedLimit = uint64(c.config.AutoSpeedLimitConfig.LimitSpeed) * 1024 * 1024 / 8
log.Printf("Limit User: %s Speed: %d End: %s", c.buildUserTag(&user), c.config.AutoSpeedLimitConfig.LimitSpeed, time.Unix(c.limitedUsers[user].end, 0).Format("01-02 15:04:05"))
user.SpeedLimit = uint64((c.config.AutoSpeedLimitConfig.LimitSpeed * 1000000) / 8)
*silentUsers = append(*silentUsers, user)
}
@ -457,10 +459,10 @@ func (c *Controller) userInfoMonitor() (err error) {
if time.Now().Unix() > limitInfo.end {
user.SpeedLimit = limitInfo.originSpeedLimit
toReleaseUsers = append(toReleaseUsers, user)
log.Printf(" User: %s Speed: %d End: nil (Unlimit)", user.Email, user.SpeedLimit)
log.Printf("User: %s Speed: %d End: nil (Unlimit)", c.buildUserTag(&user), user.SpeedLimit)
delete(c.limitedUsers, user)
} else {
log.Printf(" User: %s Speed: %d End: %s", user.Email, user.SpeedLimit, time.Unix(c.limitedUsers[user].end, 0).Format("01-02 15:04:05"))
log.Printf("User: %s Speed: %d End: %s", c.buildUserTag(&user), limitInfo.currentSpeedLimit, time.Unix(c.limitedUsers[user].end, 0).Format("01-02 15:04:05"))
}
}
if len(toReleaseUsers) > 0 {
@ -482,7 +484,7 @@ func (c *Controller) userInfoMonitor() (err error) {
if up > 0 || down > 0 {
// Over speed users
if AutoSpeedLimit > 0 {
if down > AutoSpeedLimit*1024*1024*UpdatePeriodic/8 {
if down > AutoSpeedLimit*1000000*UpdatePeriodic/8 || up > AutoSpeedLimit*1000000*UpdatePeriodic/8 {
if _, ok := c.limitedUsers[user]; !ok {
if c.config.AutoSpeedLimitConfig.WarnTimes == 0 {
limitUser(c, user, &limitedUsers)