UDP TProxy implementation, no ACL support yet

This commit is contained in:
Toby 2021-04-25 00:38:07 -07:00
parent 2b4e660324
commit 7b3e1a5b41
4 changed files with 180 additions and 3 deletions

View File

@ -273,6 +273,35 @@ func client(config *clientConfig) {
}()
}
if len(config.UDPTProxy.Listen) > 0 {
go func() {
rl, err := tproxy.NewUDPTProxy(client, config.UDPTProxy.Listen,
time.Duration(config.UDPTProxy.Timeout)*time.Second, aclEngine,
func(addr net.Addr) {
logrus.WithFields(logrus.Fields{
"src": addr.String(),
}).Debug("UDP TProxy request")
},
func(addr net.Addr, err error) {
if err != tproxy.ErrTimeout {
logrus.WithFields(logrus.Fields{
"error": err,
"src": addr.String(),
}).Info("UDP TProxy error")
} else {
logrus.WithFields(logrus.Fields{
"src": addr.String(),
}).Debug("UDP TProxy session closed")
}
})
if err != nil {
logrus.WithField("error", err).Fatal("Failed to initialize UDP TProxy")
}
logrus.WithField("addr", config.UDPTProxy.Listen).Info("UDP TProxy up and running")
errChan <- rl.ListenAndServe()
}()
}
err = <-errChan
logrus.WithField("error", err).Fatal("Client shutdown")
}

View File

@ -43,7 +43,7 @@ func NewUDPRelay(hyClient *core.Client, listen, remote string, timeout time.Dura
return r, nil
}
type cmEntry struct {
type connEntry struct {
HyConn core.UDPConn
Deadline atomic.Value
}
@ -55,7 +55,7 @@ func (r *UDPRelay) ListenAndServe() error {
}
defer conn.Close()
// src <-> HyClient UDPConn
connMap := make(map[string]*cmEntry)
connMap := make(map[string]*connEntry)
var connMapMutex sync.RWMutex
// Read loop
buf := make([]byte, udpBufferSize)
@ -77,7 +77,7 @@ func (r *UDPRelay) ListenAndServe() error {
r.ErrorFunc(rAddr, err)
} else {
// Add it to the map
ent := &cmEntry{HyConn: hyConn}
ent := &connEntry{HyConn: hyConn}
ent.Deadline.Store(time.Now().Add(r.Timeout))
connMapMutex.Lock()
connMap[rAddr.String()] = ent

124
pkg/tproxy/udp_linux.go Normal file
View File

@ -0,0 +1,124 @@
package tproxy
import (
"errors"
"github.com/LiamHaworth/go-tproxy"
"github.com/tobyxdd/hysteria/pkg/acl"
"github.com/tobyxdd/hysteria/pkg/core"
"net"
"sync"
"sync/atomic"
"time"
)
const udpBufferSize = 65535
var ErrTimeout = errors.New("inactivity timeout")
type UDPTProxy struct {
HyClient *core.Client
ListenAddr *net.UDPAddr
Timeout time.Duration
ACLEngine *acl.Engine
ConnFunc func(addr net.Addr)
ErrorFunc func(addr net.Addr, err error)
}
func NewUDPTProxy(hyClient *core.Client, listen string, timeout time.Duration, aclEngine *acl.Engine,
connFunc func(addr net.Addr), errorFunc func(addr net.Addr, err error)) (*UDPTProxy, error) {
uAddr, err := net.ResolveUDPAddr("udp", listen)
if err != nil {
return nil, err
}
r := &UDPTProxy{
HyClient: hyClient,
ListenAddr: uAddr,
Timeout: timeout,
ACLEngine: aclEngine,
ConnFunc: connFunc,
ErrorFunc: errorFunc,
}
if timeout == 0 {
r.Timeout = 1 * time.Minute
}
return r, nil
}
type connEntry struct {
HyConn core.UDPConn
Deadline atomic.Value
}
func (r *UDPTProxy) ListenAndServe() error {
conn, err := tproxy.ListenUDP("udp", r.ListenAddr)
if err != nil {
return err
}
defer conn.Close()
// src <-> HyClient UDPConn
connMap := make(map[string]*connEntry)
var connMapMutex sync.RWMutex
// Read loop
buf := make([]byte, udpBufferSize)
for {
n, srcAddr, dstAddr, err := tproxy.ReadFromUDP(conn, buf)
if n > 0 {
connMapMutex.RLock()
cme := connMap[srcAddr.String()]
connMapMutex.RUnlock()
if cme != nil {
// Existing conn
cme.Deadline.Store(time.Now().Add(r.Timeout))
_ = cme.HyConn.WriteTo(buf[:n], dstAddr.String())
} else {
// New
r.ConnFunc(srcAddr)
hyConn, err := r.HyClient.DialUDP()
if err != nil {
r.ErrorFunc(srcAddr, err)
} else {
// Add it to the map
ent := &connEntry{HyConn: hyConn}
ent.Deadline.Store(time.Now().Add(r.Timeout))
connMapMutex.Lock()
connMap[srcAddr.String()] = ent
connMapMutex.Unlock()
// Start remote to local
go func() {
for {
bs, _, err := hyConn.ReadFrom()
if err != nil {
break
}
ent.Deadline.Store(time.Now().Add(r.Timeout))
_, _ = conn.WriteToUDP(bs, srcAddr)
}
}()
// Timeout cleanup routine
go func() {
for {
ttl := ent.Deadline.Load().(time.Time).Sub(time.Now())
if ttl <= 0 {
// Time to die
connMapMutex.Lock()
_ = hyConn.Close()
delete(connMap, srcAddr.String())
connMapMutex.Unlock()
r.ErrorFunc(srcAddr, ErrTimeout)
return
} else {
time.Sleep(ttl)
}
}
}()
// Send the packet
_ = hyConn.WriteTo(buf[:n], dstAddr.String())
}
}
}
if err != nil {
return err
}
}
}

24
pkg/tproxy/udp_stub.go Normal file
View File

@ -0,0 +1,24 @@
// +build !linux
package tproxy
import (
"errors"
"github.com/tobyxdd/hysteria/pkg/acl"
"github.com/tobyxdd/hysteria/pkg/core"
"net"
"time"
)
var ErrTimeout = errors.New("inactivity timeout")
type UDPTProxy struct{}
func NewUDPTProxy(hyClient *core.Client, listen string, timeout time.Duration, aclEngine *acl.Engine,
connFunc func(addr net.Addr), errorFunc func(addr net.Addr, err error)) (*UDPTProxy, error) {
return nil, errors.New("not supported on the current system")
}
func (r *UDPTProxy) ListenAndServe() error {
return nil
}