Prometheus Active Conn Gauge

This commit is contained in:
Toby 2021-04-27 20:38:43 -07:00
parent 56f35d94cc
commit 8c5b701a4b
2 changed files with 18 additions and 6 deletions

View File

@ -34,6 +34,7 @@ type Server struct {
udpErrorFunc UDPErrorFunc udpErrorFunc UDPErrorFunc
upCounterVec, downCounterVec *prometheus.CounterVec upCounterVec, downCounterVec *prometheus.CounterVec
connGaugeVec *prometheus.GaugeVec
listener quic.Listener listener quic.Listener
} }
@ -93,7 +94,10 @@ func NewServer(addr string, tlsConfig *tls.Config, quicConfig *quic.Config, tran
s.downCounterVec = prometheus.NewCounterVec(prometheus.CounterOpts{ s.downCounterVec = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "hysteria_traffic_downlink_bytes_total", Name: "hysteria_traffic_downlink_bytes_total",
}, []string{"auth"}) }, []string{"auth"})
promRegistry.MustRegister(s.upCounterVec, s.downCounterVec) s.connGaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "hysteria_active_conn",
}, []string{"auth"})
promRegistry.MustRegister(s.upCounterVec, s.downCounterVec, s.connGaugeVec)
} }
return s, nil return s, nil
} }
@ -133,7 +137,8 @@ func (s *Server) handleClient(cs quic.Session) {
} }
// Start accepting streams and messages // Start accepting streams and messages
sc := newServerClient(cs, s.transport, auth, s.disableUDP, s.aclEngine, sc := newServerClient(cs, s.transport, auth, s.disableUDP, s.aclEngine,
s.tcpRequestFunc, s.tcpErrorFunc, s.udpRequestFunc, s.udpErrorFunc, s.upCounterVec, s.downCounterVec) s.tcpRequestFunc, s.tcpErrorFunc, s.udpRequestFunc, s.udpErrorFunc,
s.upCounterVec, s.downCounterVec, s.connGaugeVec)
sc.Run() sc.Run()
_ = cs.CloseWithError(closeErrorCodeGeneric, "") _ = cs.CloseWithError(closeErrorCodeGeneric, "")
} }

View File

@ -30,6 +30,7 @@ type serverClient struct {
CUDPErrorFunc UDPErrorFunc CUDPErrorFunc UDPErrorFunc
UpCounter, DownCounter prometheus.Counter UpCounter, DownCounter prometheus.Counter
ConnGauge prometheus.Gauge
udpSessionMutex sync.RWMutex udpSessionMutex sync.RWMutex
udpSessionMap map[uint32]*net.UDPConn udpSessionMap map[uint32]*net.UDPConn
@ -39,7 +40,8 @@ type serverClient struct {
func newServerClient(cs quic.Session, transport transport.Transport, auth []byte, disableUDP bool, ACLEngine *acl.Engine, func newServerClient(cs quic.Session, transport transport.Transport, auth []byte, disableUDP bool, ACLEngine *acl.Engine,
CTCPRequestFunc TCPRequestFunc, CTCPErrorFunc TCPErrorFunc, CTCPRequestFunc TCPRequestFunc, CTCPErrorFunc TCPErrorFunc,
CUDPRequestFunc UDPRequestFunc, CUDPErrorFunc UDPErrorFunc, CUDPRequestFunc UDPRequestFunc, CUDPErrorFunc UDPErrorFunc,
UpCounterVec, DownCounterVec *prometheus.CounterVec) *serverClient { UpCounterVec, DownCounterVec *prometheus.CounterVec,
ConnGaugeVec *prometheus.GaugeVec) *serverClient {
sc := &serverClient{ sc := &serverClient{
CS: cs, CS: cs,
Transport: transport, Transport: transport,
@ -53,10 +55,11 @@ func newServerClient(cs quic.Session, transport transport.Transport, auth []byte
CUDPErrorFunc: CUDPErrorFunc, CUDPErrorFunc: CUDPErrorFunc,
udpSessionMap: make(map[uint32]*net.UDPConn), udpSessionMap: make(map[uint32]*net.UDPConn),
} }
if UpCounterVec != nil && DownCounterVec != nil { if UpCounterVec != nil && DownCounterVec != nil && ConnGaugeVec != nil {
authB64 := base64.StdEncoding.EncodeToString(auth) authB64 := base64.StdEncoding.EncodeToString(auth)
sc.UpCounter = UpCounterVec.WithLabelValues(authB64) sc.UpCounter = UpCounterVec.WithLabelValues(authB64)
sc.DownCounter = DownCounterVec.WithLabelValues(authB64) sc.DownCounter = DownCounterVec.WithLabelValues(authB64)
sc.ConnGauge = ConnGaugeVec.WithLabelValues(authB64)
} }
return sc return sc
} }
@ -78,12 +81,16 @@ func (c *serverClient) Run() {
if err != nil { if err != nil {
break break
} }
go c.handleStream(stream) c.ConnGauge.Inc()
go func() {
c.handleStream(stream)
_ = stream.Close()
c.ConnGauge.Dec()
}()
} }
} }
func (c *serverClient) handleStream(stream quic.Stream) { func (c *serverClient) handleStream(stream quic.Stream) {
defer stream.Close()
// Read request // Read request
var req clientRequest var req clientRequest
err := struc.Unpack(stream, &req) err := struc.Unpack(stream, &req)