diff --git a/cmd/client.go b/cmd/client.go index 8591f48..8d3dfe1 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -273,6 +273,35 @@ func client(config *clientConfig) { }() } + if len(config.UDPTProxy.Listen) > 0 { + go func() { + rl, err := tproxy.NewUDPTProxy(client, config.UDPTProxy.Listen, + time.Duration(config.UDPTProxy.Timeout)*time.Second, aclEngine, + func(addr net.Addr) { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + }).Debug("UDP TProxy request") + }, + func(addr net.Addr, err error) { + if err != tproxy.ErrTimeout { + logrus.WithFields(logrus.Fields{ + "error": err, + "src": addr.String(), + }).Info("UDP TProxy error") + } else { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + }).Debug("UDP TProxy session closed") + } + }) + if err != nil { + logrus.WithField("error", err).Fatal("Failed to initialize UDP TProxy") + } + logrus.WithField("addr", config.UDPTProxy.Listen).Info("UDP TProxy up and running") + errChan <- rl.ListenAndServe() + }() + } + err = <-errChan logrus.WithField("error", err).Fatal("Client shutdown") } diff --git a/pkg/relay/udp.go b/pkg/relay/udp.go index cc37830..d4d3f4a 100644 --- a/pkg/relay/udp.go +++ b/pkg/relay/udp.go @@ -43,7 +43,7 @@ func NewUDPRelay(hyClient *core.Client, listen, remote string, timeout time.Dura return r, nil } -type cmEntry struct { +type connEntry struct { HyConn core.UDPConn Deadline atomic.Value } @@ -55,7 +55,7 @@ func (r *UDPRelay) ListenAndServe() error { } defer conn.Close() // src <-> HyClient UDPConn - connMap := make(map[string]*cmEntry) + connMap := make(map[string]*connEntry) var connMapMutex sync.RWMutex // Read loop buf := make([]byte, udpBufferSize) @@ -77,7 +77,7 @@ func (r *UDPRelay) ListenAndServe() error { r.ErrorFunc(rAddr, err) } else { // Add it to the map - ent := &cmEntry{HyConn: hyConn} + ent := &connEntry{HyConn: hyConn} ent.Deadline.Store(time.Now().Add(r.Timeout)) connMapMutex.Lock() connMap[rAddr.String()] = ent diff --git a/pkg/tproxy/udp_linux.go b/pkg/tproxy/udp_linux.go new file mode 100644 index 0000000..99b1270 --- /dev/null +++ b/pkg/tproxy/udp_linux.go @@ -0,0 +1,124 @@ +package tproxy + +import ( + "errors" + "github.com/LiamHaworth/go-tproxy" + "github.com/tobyxdd/hysteria/pkg/acl" + "github.com/tobyxdd/hysteria/pkg/core" + "net" + "sync" + "sync/atomic" + "time" +) + +const udpBufferSize = 65535 + +var ErrTimeout = errors.New("inactivity timeout") + +type UDPTProxy struct { + HyClient *core.Client + ListenAddr *net.UDPAddr + Timeout time.Duration + ACLEngine *acl.Engine + + ConnFunc func(addr net.Addr) + ErrorFunc func(addr net.Addr, err error) +} + +func NewUDPTProxy(hyClient *core.Client, listen string, timeout time.Duration, aclEngine *acl.Engine, + connFunc func(addr net.Addr), errorFunc func(addr net.Addr, err error)) (*UDPTProxy, error) { + uAddr, err := net.ResolveUDPAddr("udp", listen) + if err != nil { + return nil, err + } + r := &UDPTProxy{ + HyClient: hyClient, + ListenAddr: uAddr, + Timeout: timeout, + ACLEngine: aclEngine, + ConnFunc: connFunc, + ErrorFunc: errorFunc, + } + if timeout == 0 { + r.Timeout = 1 * time.Minute + } + return r, nil +} + +type connEntry struct { + HyConn core.UDPConn + Deadline atomic.Value +} + +func (r *UDPTProxy) ListenAndServe() error { + conn, err := tproxy.ListenUDP("udp", r.ListenAddr) + if err != nil { + return err + } + defer conn.Close() + // src <-> HyClient UDPConn + connMap := make(map[string]*connEntry) + var connMapMutex sync.RWMutex + // Read loop + buf := make([]byte, udpBufferSize) + for { + n, srcAddr, dstAddr, err := tproxy.ReadFromUDP(conn, buf) + if n > 0 { + connMapMutex.RLock() + cme := connMap[srcAddr.String()] + connMapMutex.RUnlock() + if cme != nil { + // Existing conn + cme.Deadline.Store(time.Now().Add(r.Timeout)) + _ = cme.HyConn.WriteTo(buf[:n], dstAddr.String()) + } else { + // New + r.ConnFunc(srcAddr) + hyConn, err := r.HyClient.DialUDP() + if err != nil { + r.ErrorFunc(srcAddr, err) + } else { + // Add it to the map + ent := &connEntry{HyConn: hyConn} + ent.Deadline.Store(time.Now().Add(r.Timeout)) + connMapMutex.Lock() + connMap[srcAddr.String()] = ent + connMapMutex.Unlock() + // Start remote to local + go func() { + for { + bs, _, err := hyConn.ReadFrom() + if err != nil { + break + } + ent.Deadline.Store(time.Now().Add(r.Timeout)) + _, _ = conn.WriteToUDP(bs, srcAddr) + } + }() + // Timeout cleanup routine + go func() { + for { + ttl := ent.Deadline.Load().(time.Time).Sub(time.Now()) + if ttl <= 0 { + // Time to die + connMapMutex.Lock() + _ = hyConn.Close() + delete(connMap, srcAddr.String()) + connMapMutex.Unlock() + r.ErrorFunc(srcAddr, ErrTimeout) + return + } else { + time.Sleep(ttl) + } + } + }() + // Send the packet + _ = hyConn.WriteTo(buf[:n], dstAddr.String()) + } + } + } + if err != nil { + return err + } + } +} diff --git a/pkg/tproxy/udp_stub.go b/pkg/tproxy/udp_stub.go new file mode 100644 index 0000000..3788ce4 --- /dev/null +++ b/pkg/tproxy/udp_stub.go @@ -0,0 +1,24 @@ +// +build !linux + +package tproxy + +import ( + "errors" + "github.com/tobyxdd/hysteria/pkg/acl" + "github.com/tobyxdd/hysteria/pkg/core" + "net" + "time" +) + +var ErrTimeout = errors.New("inactivity timeout") + +type UDPTProxy struct{} + +func NewUDPTProxy(hyClient *core.Client, listen string, timeout time.Duration, aclEngine *acl.Engine, + connFunc func(addr net.Addr), errorFunc func(addr net.Addr, err error)) (*UDPTProxy, error) { + return nil, errors.New("not supported on the current system") +} + +func (r *UDPTProxy) ListenAndServe() error { + return nil +}