feat: provide a TrafficCounter interface on server, remove prometheus client dependency from core

This commit is contained in:
tobyxdd
2023-01-07 13:59:30 -08:00
parent 29459d768d
commit f7dffd027f
6 changed files with 105 additions and 490 deletions

View File

@@ -14,7 +14,6 @@ import (
"github.com/apernet/hysteria/core/utils"
"github.com/lucas-clemente/quic-go"
"github.com/lunixbochs/struc"
"github.com/prometheus/client_golang/prometheus"
)
const udpBufferSize = 4096
@@ -23,6 +22,7 @@ type serverClient struct {
CC quic.Connection
Transport *transport.ServerTransport
Auth []byte
AuthLabel string // Base64 encoded auth
DisableUDP bool
ACLEngine *acl.Engine
CTCPRequestFunc TCPRequestFunc
@@ -30,8 +30,7 @@ type serverClient struct {
CUDPRequestFunc UDPRequestFunc
CUDPErrorFunc UDPErrorFunc
UpCounter, DownCounter prometheus.Counter
ConnGauge prometheus.Gauge
TrafficCounter TrafficCounter
udpSessionMutex sync.RWMutex
udpSessionMap map[uint32]transport.STPacketConn
@@ -42,27 +41,22 @@ type serverClient struct {
func newServerClient(cc quic.Connection, tr *transport.ServerTransport, auth []byte, disableUDP bool, ACLEngine *acl.Engine,
CTCPRequestFunc TCPRequestFunc, CTCPErrorFunc TCPErrorFunc,
CUDPRequestFunc UDPRequestFunc, CUDPErrorFunc UDPErrorFunc,
UpCounterVec, DownCounterVec *prometheus.CounterVec,
ConnGaugeVec *prometheus.GaugeVec,
TrafficCounter TrafficCounter,
) *serverClient {
sc := &serverClient{
CC: cc,
Transport: tr,
Auth: auth,
AuthLabel: base64.StdEncoding.EncodeToString(auth),
DisableUDP: disableUDP,
ACLEngine: ACLEngine,
CTCPRequestFunc: CTCPRequestFunc,
CTCPErrorFunc: CTCPErrorFunc,
CUDPRequestFunc: CUDPRequestFunc,
CUDPErrorFunc: CUDPErrorFunc,
TrafficCounter: TrafficCounter,
udpSessionMap: make(map[uint32]transport.STPacketConn),
}
if UpCounterVec != nil && DownCounterVec != nil && ConnGaugeVec != nil {
authB64 := base64.StdEncoding.EncodeToString(auth)
sc.UpCounter = UpCounterVec.WithLabelValues(authB64)
sc.DownCounter = DownCounterVec.WithLabelValues(authB64)
sc.ConnGauge = ConnGaugeVec.WithLabelValues(authB64)
}
return sc
}
@@ -89,15 +83,15 @@ func (c *serverClient) Run() error {
if err != nil {
return err
}
if c.ConnGauge != nil {
c.ConnGauge.Inc()
if c.TrafficCounter != nil {
c.TrafficCounter.IncConn(c.AuthLabel)
}
go func() {
stream := &qStream{stream}
c.handleStream(stream)
_ = stream.Close()
if c.ConnGauge != nil {
c.ConnGauge.Dec()
if c.TrafficCounter != nil {
c.TrafficCounter.DecConn(c.AuthLabel)
}
}()
}
@@ -162,8 +156,8 @@ func (c *serverClient) handleMessage(msg []byte) {
addrEx.Domain = dfMsg.Host
}
_, _ = conn.WriteTo(dfMsg.Data, addrEx)
if c.UpCounter != nil {
c.UpCounter.Add(float64(len(dfMsg.Data)))
if c.TrafficCounter != nil {
c.TrafficCounter.Tx(c.AuthLabel, len(dfMsg.Data))
}
case acl.ActionBlock:
// Do nothing
@@ -178,8 +172,8 @@ func (c *serverClient) handleMessage(msg []byte) {
addrEx.Domain = arg
}
_, _ = conn.WriteTo(dfMsg.Data, addrEx)
if c.UpCounter != nil {
c.UpCounter.Add(float64(len(dfMsg.Data)))
if c.TrafficCounter != nil {
c.TrafficCounter.Tx(c.AuthLabel, len(dfMsg.Data))
}
}
default:
@@ -275,12 +269,12 @@ func (c *serverClient) handleTCP(stream quic.Stream, host string, port uint16) {
if err != nil {
return
}
if c.UpCounter != nil && c.DownCounter != nil {
if c.TrafficCounter != nil {
err = utils.Pipe2Way(stream, conn, func(i int) {
if i > 0 {
c.UpCounter.Add(float64(i))
c.TrafficCounter.Tx(c.AuthLabel, i)
} else {
c.DownCounter.Add(float64(-i))
c.TrafficCounter.Rx(c.AuthLabel, -i)
}
})
} else {
@@ -347,8 +341,8 @@ func (c *serverClient) handleUDP(stream quic.Stream) {
}
}
}
if c.DownCounter != nil {
c.DownCounter.Add(float64(n))
if c.TrafficCounter != nil {
c.TrafficCounter.Rx(c.AuthLabel, n)
}
}
if err != nil {