From 787ed14c4db778ce90af7a692a03f3d52ae44bb3 Mon Sep 17 00:00:00 2001 From: Toby Date: Sat, 24 Apr 2021 02:56:17 -0700 Subject: [PATCH 1/8] TCP TProxy implementation, no UDP or ACL support yet --- cmd/client.go | 33 +++++++++++++++++++++ cmd/config.go | 19 +++++++++++-- go.mod | 1 + go.sum | 2 ++ pkg/tproxy/tcp_linux.go | 63 +++++++++++++++++++++++++++++++++++++++++ pkg/tproxy/tcp_stub.go | 21 ++++++++++++++ pkg/utils/pipe.go | 2 +- 7 files changed, 138 insertions(+), 3 deletions(-) create mode 100644 pkg/tproxy/tcp_linux.go create mode 100644 pkg/tproxy/tcp_stub.go diff --git a/cmd/client.go b/cmd/client.go index d7b39e9..8284448 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -13,6 +13,7 @@ import ( "github.com/tobyxdd/hysteria/pkg/obfs" "github.com/tobyxdd/hysteria/pkg/relay" "github.com/tobyxdd/hysteria/pkg/socks5" + "github.com/tobyxdd/hysteria/pkg/tproxy" "io" "io/ioutil" "net" @@ -239,6 +240,38 @@ func client(config *clientConfig) { }() } + if len(config.TCPTProxy.Listen) > 0 { + go func() { + rl, err := tproxy.NewTCPTProxy(client, config.TCPTProxy.Listen, + time.Duration(config.TCPTProxy.Timeout)*time.Second, + func(addr, reqAddr net.Addr) { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + "dst": reqAddr.String(), + }).Debug("TCP TProxy request") + }, + func(addr, reqAddr net.Addr, err error) { + if err != io.EOF { + logrus.WithFields(logrus.Fields{ + "error": err, + "src": addr.String(), + "dst": reqAddr.String(), + }).Info("TCP TProxy error") + } else { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + "dst": reqAddr.String(), + }).Debug("TCP TProxy EOF") + } + }) + if err != nil { + logrus.WithField("error", err).Fatal("Failed to initialize TCP TProxy") + } + logrus.WithField("addr", config.TCPTProxy.Listen).Info("TCP TProxy up and running") + errChan <- rl.ListenAndServe() + }() + } + err = <-errChan logrus.WithField("error", err).Fatal("Client shutdown") } diff --git a/cmd/config.go b/cmd/config.go index dfceb63..328af07 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -90,6 +90,14 @@ type clientConfig struct { Remote string `json:"remote"` Timeout int `json:"timeout"` } `json:"relay_udp"` + TCPTProxy struct { + Listen string `json:"listen"` + Timeout int `json:"timeout"` + } `json:"tproxy_tcp"` + UDPTProxy struct { + Listen string `json:"listen"` + Timeout int `json:"timeout"` + } `json:"tproxy_udp"` ACL string `json:"acl"` Obfs string `json:"obfs"` Auth []byte `json:"auth"` @@ -102,8 +110,9 @@ type clientConfig struct { func (c *clientConfig) Check() error { if len(c.SOCKS5.Listen) == 0 && len(c.HTTP.Listen) == 0 && - len(c.TCPRelay.Listen) == 0 && len(c.UDPRelay.Listen) == 0 { - return errors.New("no SOCKS5, HTTP, TCP relay or UDP relay listen address") + len(c.TCPRelay.Listen) == 0 && len(c.UDPRelay.Listen) == 0 && + len(c.TCPTProxy.Listen) == 0 && len(c.UDPTProxy.Listen) == 0 { + return errors.New("no SOCKS5, HTTP, relay or TProxy listen address") } if len(c.TCPRelay.Listen) > 0 && len(c.TCPRelay.Remote) == 0 { return errors.New("no TCP relay remote address") @@ -123,6 +132,12 @@ func (c *clientConfig) Check() error { if c.UDPRelay.Timeout != 0 && c.UDPRelay.Timeout <= 4 { return errors.New("invalid UDP relay timeout") } + if c.TCPTProxy.Timeout != 0 && c.TCPTProxy.Timeout <= 4 { + return errors.New("invalid TCP TProxy timeout") + } + if c.UDPTProxy.Timeout != 0 && c.UDPTProxy.Timeout <= 4 { + return errors.New("invalid UDP TProxy timeout") + } if len(c.Server) == 0 { return errors.New("no server address") } diff --git a/go.mod b/go.mod index 59c6577..564bfed 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/tobyxdd/hysteria go 1.14 require ( + github.com/LiamHaworth/go-tproxy v0.0.0-20190726054950-ef7efd7f24ed github.com/elazarl/goproxy v0.0.0-20200426045556-49ad98f6dac1 github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2 github.com/hashicorp/golang-lru v0.5.4 diff --git a/go.sum b/go.sum index c40f603..1d2c52e 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= +github.com/LiamHaworth/go-tproxy v0.0.0-20190726054950-ef7efd7f24ed h1:eqa6queieK8SvoszxCu0WwH7lSVeL4/N/f1JwOMw1G4= +github.com/LiamHaworth/go-tproxy v0.0.0-20190726054950-ef7efd7f24ed/go.mod h1:rA52xkgZwql9LRZXWb2arHEFP6qSR48KY2xOfWzEciQ= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= diff --git a/pkg/tproxy/tcp_linux.go b/pkg/tproxy/tcp_linux.go new file mode 100644 index 0000000..bf9163a --- /dev/null +++ b/pkg/tproxy/tcp_linux.go @@ -0,0 +1,63 @@ +package tproxy + +import ( + "github.com/LiamHaworth/go-tproxy" + "github.com/tobyxdd/hysteria/pkg/core" + "github.com/tobyxdd/hysteria/pkg/utils" + "net" + "time" +) + +type TCPTProxy struct { + HyClient *core.Client + ListenAddr *net.TCPAddr + Timeout time.Duration + + ConnFunc func(addr, reqAddr net.Addr) + ErrorFunc func(addr, reqAddr net.Addr, err error) +} + +func NewTCPTProxy(hyClient *core.Client, listen string, timeout time.Duration, + connFunc func(addr, reqAddr net.Addr), errorFunc func(addr, reqAddr net.Addr, err error)) (*TCPTProxy, error) { + tAddr, err := net.ResolveTCPAddr("tcp", listen) + if err != nil { + return nil, err + } + r := &TCPTProxy{ + HyClient: hyClient, + ListenAddr: tAddr, + Timeout: timeout, + ConnFunc: connFunc, + ErrorFunc: errorFunc, + } + return r, nil +} + +func (r *TCPTProxy) ListenAndServe() error { + listener, err := tproxy.ListenTCP("tcp", r.ListenAddr) + if err != nil { + return err + } + defer listener.Close() + for { + c, err := listener.Accept() + if err != nil { + return err + } + go func() { + defer c.Close() + // Under TPROXY mode, we are effectively acting as the remote server + // So our LocalAddr is actually the target to which the user is trying to connect + // and our RemoteAddr is the local address where the user initiates the connection + r.ConnFunc(c.RemoteAddr(), c.LocalAddr()) + rc, err := r.HyClient.DialTCP(c.LocalAddr().String()) + if err != nil { + r.ErrorFunc(c.RemoteAddr(), c.LocalAddr(), err) + return + } + defer rc.Close() + err = utils.PipePairWithTimeout(c, rc, r.Timeout) + r.ErrorFunc(c.RemoteAddr(), c.LocalAddr(), err) + }() + } +} diff --git a/pkg/tproxy/tcp_stub.go b/pkg/tproxy/tcp_stub.go new file mode 100644 index 0000000..d89057e --- /dev/null +++ b/pkg/tproxy/tcp_stub.go @@ -0,0 +1,21 @@ +// +build !linux + +package tproxy + +import ( + "errors" + "github.com/tobyxdd/hysteria/pkg/core" + "net" + "time" +) + +type TCPTProxy struct{} + +func NewTCPTProxy(hyClient *core.Client, listen string, timeout time.Duration, + connFunc func(addr, reqAddr net.Addr), errorFunc func(addr, reqAddr net.Addr, err error)) (*TCPTProxy, error) { + return nil, errors.New("not supported on the current system") +} + +func (r *TCPTProxy) ListenAndServe() error { + return nil +} diff --git a/pkg/utils/pipe.go b/pkg/utils/pipe.go index 14ec68e..caf053b 100644 --- a/pkg/utils/pipe.go +++ b/pkg/utils/pipe.go @@ -46,7 +46,7 @@ func Pipe2Way(rw1, rw2 io.ReadWriter, count func(int)) error { return <-errChan } -func PipePairWithTimeout(conn *net.TCPConn, stream io.ReadWriteCloser, timeout time.Duration) error { +func PipePairWithTimeout(conn net.Conn, stream io.ReadWriteCloser, timeout time.Duration) error { errChan := make(chan error, 2) // TCP to stream go func() { From 70fd2ffc0d61c5eecb2e77743d63d617268044d8 Mon Sep 17 00:00:00 2001 From: Toby Date: Sat, 24 Apr 2021 15:36:19 -0700 Subject: [PATCH 2/8] ACL for TCP TProxy --- cmd/client.go | 9 ++--- pkg/core/client.go | 16 ++------- pkg/http/server.go | 11 +++--- pkg/tproxy/tcp_linux.go | 76 ++++++++++++++++++++++++++++++++++++----- pkg/tproxy/tcp_stub.go | 6 ++-- pkg/utils/misc.go | 18 ++++++++++ 6 files changed, 100 insertions(+), 36 deletions(-) create mode 100644 pkg/utils/misc.go diff --git a/cmd/client.go b/cmd/client.go index 8284448..8591f48 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -243,11 +243,12 @@ func client(config *clientConfig) { if len(config.TCPTProxy.Listen) > 0 { go func() { rl, err := tproxy.NewTCPTProxy(client, config.TCPTProxy.Listen, - time.Duration(config.TCPTProxy.Timeout)*time.Second, - func(addr, reqAddr net.Addr) { + time.Duration(config.TCPTProxy.Timeout)*time.Second, aclEngine, + func(addr, reqAddr net.Addr, action acl.Action, arg string) { logrus.WithFields(logrus.Fields{ - "src": addr.String(), - "dst": reqAddr.String(), + "action": actionToString(action, arg), + "src": addr.String(), + "dst": reqAddr.String(), }).Debug("TCP TProxy request") }, func(addr, reqAddr net.Addr, err error) { diff --git a/pkg/core/client.go b/pkg/core/client.go index 1efaaaf..d5ae791 100644 --- a/pkg/core/client.go +++ b/pkg/core/client.go @@ -195,7 +195,7 @@ func (c *Client) openStreamWithReconnect() (quic.Session, quic.Stream, error) { } func (c *Client) DialTCP(addr string) (net.Conn, error) { - host, port, err := splitHostPort(addr) + host, port, err := utils.SplitHostPort(addr) if err != nil { return nil, err } @@ -366,7 +366,7 @@ func (c *quicPktConn) ReadFrom() ([]byte, string, error) { } func (c *quicPktConn) WriteTo(p []byte, addr string) error { - host, port, err := splitHostPort(addr) + host, port, err := utils.SplitHostPort(addr) if err != nil { return err } @@ -384,15 +384,3 @@ func (c *quicPktConn) Close() error { c.CloseFunc() return c.Stream.Close() } - -func splitHostPort(hostport string) (string, uint16, error) { - host, port, err := net.SplitHostPort(hostport) - if err != nil { - return "", 0, err - } - portUint, err := strconv.ParseUint(port, 10, 16) - if err != nil { - return "", 0, err - } - return host, uint16(portUint), err -} diff --git a/pkg/http/server.go b/pkg/http/server.go index f9c8dbe..81299ce 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -3,6 +3,7 @@ package http import ( "errors" "fmt" + "github.com/tobyxdd/hysteria/pkg/utils" "net" "net/http" "strconv" @@ -24,11 +25,7 @@ func NewProxyHTTPServer(hyClient *core.Client, idleTimeout time.Duration, aclEng proxy.Tr = &http.Transport{ Dial: func(network, addr string) (net.Conn, error) { // Parse addr string - host, port, err := net.SplitHostPort(addr) - if err != nil { - return nil, err - } - portUint, err := strconv.ParseUint(port, 10, 16) + host, port, err := utils.SplitHostPort(addr) if err != nil { return nil, err } @@ -49,7 +46,7 @@ func NewProxyHTTPServer(hyClient *core.Client, idleTimeout time.Duration, aclEng } return net.DialTCP(network, nil, &net.TCPAddr{ IP: ipAddr.IP, - Port: int(portUint), + Port: int(port), Zone: ipAddr.Zone, }) case acl.ActionProxy: @@ -57,7 +54,7 @@ func NewProxyHTTPServer(hyClient *core.Client, idleTimeout time.Duration, aclEng case acl.ActionBlock: return nil, errors.New("blocked by ACL") case acl.ActionHijack: - return net.Dial(network, net.JoinHostPort(arg, port)) + return net.Dial(network, net.JoinHostPort(arg, strconv.Itoa(int(port)))) default: return nil, fmt.Errorf("unknown action %d", action) } diff --git a/pkg/tproxy/tcp_linux.go b/pkg/tproxy/tcp_linux.go index bf9163a..b89f3de 100644 --- a/pkg/tproxy/tcp_linux.go +++ b/pkg/tproxy/tcp_linux.go @@ -1,10 +1,14 @@ package tproxy import ( + "errors" + "fmt" "github.com/LiamHaworth/go-tproxy" + "github.com/tobyxdd/hysteria/pkg/acl" "github.com/tobyxdd/hysteria/pkg/core" "github.com/tobyxdd/hysteria/pkg/utils" "net" + "strconv" "time" ) @@ -12,13 +16,15 @@ type TCPTProxy struct { HyClient *core.Client ListenAddr *net.TCPAddr Timeout time.Duration + ACLEngine *acl.Engine - ConnFunc func(addr, reqAddr net.Addr) + ConnFunc func(addr, reqAddr net.Addr, action acl.Action, arg string) ErrorFunc func(addr, reqAddr net.Addr, err error) } -func NewTCPTProxy(hyClient *core.Client, listen string, timeout time.Duration, - connFunc func(addr, reqAddr net.Addr), errorFunc func(addr, reqAddr net.Addr, err error)) (*TCPTProxy, error) { +func NewTCPTProxy(hyClient *core.Client, listen string, timeout time.Duration, aclEngine *acl.Engine, + connFunc func(addr, reqAddr net.Addr, action acl.Action, arg string), + errorFunc func(addr, reqAddr net.Addr, err error)) (*TCPTProxy, error) { tAddr, err := net.ResolveTCPAddr("tcp", listen) if err != nil { return nil, err @@ -27,6 +33,7 @@ func NewTCPTProxy(hyClient *core.Client, listen string, timeout time.Duration, HyClient: hyClient, ListenAddr: tAddr, Timeout: timeout, + ACLEngine: aclEngine, ConnFunc: connFunc, ErrorFunc: errorFunc, } @@ -49,15 +56,66 @@ func (r *TCPTProxy) ListenAndServe() error { // Under TPROXY mode, we are effectively acting as the remote server // So our LocalAddr is actually the target to which the user is trying to connect // and our RemoteAddr is the local address where the user initiates the connection - r.ConnFunc(c.RemoteAddr(), c.LocalAddr()) - rc, err := r.HyClient.DialTCP(c.LocalAddr().String()) + host, port, err := utils.SplitHostPort(c.LocalAddr().String()) if err != nil { - r.ErrorFunc(c.RemoteAddr(), c.LocalAddr(), err) return } - defer rc.Close() - err = utils.PipePairWithTimeout(c, rc, r.Timeout) - r.ErrorFunc(c.RemoteAddr(), c.LocalAddr(), err) + action, arg := acl.ActionProxy, "" + var ipAddr *net.IPAddr + var resErr error + if r.ACLEngine != nil { + action, arg, ipAddr, resErr = r.ACLEngine.ResolveAndMatch(host) + // Doesn't always matter if the resolution fails, as we may send it through HyClient + } + r.ConnFunc(c.RemoteAddr(), c.LocalAddr(), action, arg) + var closeErr error + defer func() { + r.ErrorFunc(c.RemoteAddr(), c.LocalAddr(), closeErr) + }() + // Handle according to the action + switch action { + case acl.ActionDirect: + if resErr != nil { + closeErr = resErr + return + } + rc, err := net.DialTCP("tcp", nil, &net.TCPAddr{ + IP: ipAddr.IP, + Port: int(port), + Zone: ipAddr.Zone, + }) + if err != nil { + closeErr = err + return + } + defer rc.Close() + closeErr = utils.PipePairWithTimeout(c, rc, r.Timeout) + return + case acl.ActionProxy: + rc, err := r.HyClient.DialTCP(c.LocalAddr().String()) + if err != nil { + closeErr = err + return + } + defer rc.Close() + closeErr = utils.PipePairWithTimeout(c, rc, r.Timeout) + return + case acl.ActionBlock: + closeErr = errors.New("blocked in ACL") + return + case acl.ActionHijack: + rc, err := net.Dial("tcp", net.JoinHostPort(arg, strconv.Itoa(int(port)))) + if err != nil { + closeErr = err + return + } + defer rc.Close() + closeErr = utils.PipePairWithTimeout(c, rc, r.Timeout) + return + default: + closeErr = fmt.Errorf("unknown action %d", action) + return + } }() } } diff --git a/pkg/tproxy/tcp_stub.go b/pkg/tproxy/tcp_stub.go index d89057e..d27706b 100644 --- a/pkg/tproxy/tcp_stub.go +++ b/pkg/tproxy/tcp_stub.go @@ -4,6 +4,7 @@ package tproxy import ( "errors" + "github.com/tobyxdd/hysteria/pkg/acl" "github.com/tobyxdd/hysteria/pkg/core" "net" "time" @@ -11,8 +12,9 @@ import ( type TCPTProxy struct{} -func NewTCPTProxy(hyClient *core.Client, listen string, timeout time.Duration, - connFunc func(addr, reqAddr net.Addr), errorFunc func(addr, reqAddr net.Addr, err error)) (*TCPTProxy, error) { +func NewTCPTProxy(hyClient *core.Client, listen string, timeout time.Duration, aclEngine *acl.Engine, + connFunc func(addr, reqAddr net.Addr, action acl.Action, arg string), + errorFunc func(addr, reqAddr net.Addr, err error)) (*TCPTProxy, error) { return nil, errors.New("not supported on the current system") } diff --git a/pkg/utils/misc.go b/pkg/utils/misc.go new file mode 100644 index 0000000..13db0e7 --- /dev/null +++ b/pkg/utils/misc.go @@ -0,0 +1,18 @@ +package utils + +import ( + "net" + "strconv" +) + +func SplitHostPort(hostport string) (string, uint16, error) { + host, port, err := net.SplitHostPort(hostport) + if err != nil { + return "", 0, err + } + portUint, err := strconv.ParseUint(port, 10, 16) + if err != nil { + return "", 0, err + } + return host, uint16(portUint), err +} From 3b7aefb10d8019f524078ad8f264efefed4e34b9 Mon Sep 17 00:00:00 2001 From: Toby Date: Sat, 24 Apr 2021 17:37:50 -0700 Subject: [PATCH 3/8] Improve UDP Relay timeout handling --- pkg/relay/udp.go | 55 ++++++++++++++++++++---------------------------- 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/pkg/relay/udp.go b/pkg/relay/udp.go index 6dfd589..4992aa3 100644 --- a/pkg/relay/udp.go +++ b/pkg/relay/udp.go @@ -48,9 +48,8 @@ func NewUDPRelay(hyClient *core.Client, listen, remote string, timeout time.Dura } type cmEntry struct { - HyConn core.UDPConn - Addr *net.UDPAddr - LastActiveTime atomic.Value + HyConn core.UDPConn + Deadline atomic.Value } func (r *UDPRelay) ListenAndServe() error { @@ -62,31 +61,6 @@ func (r *UDPRelay) ListenAndServe() error { // src <-> HyClient UDPConn connMap := make(map[string]*cmEntry) var connMapMutex sync.RWMutex - // Timeout cleanup routine - stopChan := make(chan bool) - defer close(stopChan) - go func() { - ticker := time.NewTicker(udpMinTimeout) - defer ticker.Stop() - for { - select { - case <-stopChan: - return - case t := <-ticker.C: - allowedLAT := t.Add(-r.Timeout) - connMapMutex.Lock() - for k, v := range connMap { - if v.LastActiveTime.Load().(time.Time).Before(allowedLAT) { - // Timeout - r.ErrorFunc(v.Addr, ErrTimeout) - _ = v.HyConn.Close() - delete(connMap, k) - } - } - connMapMutex.Unlock() - } - } - }() // Read loop buf := make([]byte, udpBufferSize) for { @@ -97,7 +71,7 @@ func (r *UDPRelay) ListenAndServe() error { connMapMutex.RUnlock() if cme != nil { // Existing conn - cme.LastActiveTime.Store(time.Now()) + cme.Deadline.Store(time.Now().Add(r.Timeout)) _ = cme.HyConn.WriteTo(buf[:n], r.Remote) } else { // New @@ -107,8 +81,8 @@ func (r *UDPRelay) ListenAndServe() error { r.ErrorFunc(rAddr, err) } else { // Add it to the map - ent := &cmEntry{HyConn: hyConn, Addr: rAddr} - ent.LastActiveTime.Store(time.Now()) + ent := &cmEntry{HyConn: hyConn} + ent.Deadline.Store(time.Now().Add(r.Timeout)) connMapMutex.Lock() connMap[rAddr.String()] = ent connMapMutex.Unlock() @@ -119,10 +93,27 @@ func (r *UDPRelay) ListenAndServe() error { if err != nil { break } - ent.LastActiveTime.Store(time.Now()) + ent.Deadline.Store(time.Now().Add(r.Timeout)) _, _ = conn.WriteToUDP(bs, rAddr) } }() + // Timeout cleanup routine + go func() { + for { + ttl := ent.Deadline.Load().(time.Time).Sub(time.Now()) + if ttl < 0 { + // Time to die + connMapMutex.Lock() + _ = hyConn.Close() + delete(connMap, rAddr.String()) + connMapMutex.Unlock() + r.ErrorFunc(rAddr, ErrTimeout) + return + } else { + time.Sleep(ttl) + } + } + }() // Send the packet _ = hyConn.WriteTo(buf[:n], r.Remote) } From 2b4e6603247b713ff80c47d5bd1c8a4960418f3f Mon Sep 17 00:00:00 2001 From: Toby Date: Sat, 24 Apr 2021 18:13:30 -0700 Subject: [PATCH 4/8] Remove pointless udpMinTimeout --- pkg/relay/udp.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/relay/udp.go b/pkg/relay/udp.go index 4992aa3..cc37830 100644 --- a/pkg/relay/udp.go +++ b/pkg/relay/udp.go @@ -11,8 +11,6 @@ import ( const udpBufferSize = 65535 -const udpMinTimeout = 4 * time.Second - var ErrTimeout = errors.New("inactivity timeout") type UDPRelay struct { @@ -41,8 +39,6 @@ func NewUDPRelay(hyClient *core.Client, listen, remote string, timeout time.Dura } if timeout == 0 { r.Timeout = 1 * time.Minute - } else if timeout < udpMinTimeout { - r.Timeout = udpMinTimeout } return r, nil } @@ -101,7 +97,7 @@ func (r *UDPRelay) ListenAndServe() error { go func() { for { ttl := ent.Deadline.Load().(time.Time).Sub(time.Now()) - if ttl < 0 { + if ttl <= 0 { // Time to die connMapMutex.Lock() _ = hyConn.Close() From 7b3e1a5b414069945ed9c8266a2aa1defeca6e3d Mon Sep 17 00:00:00 2001 From: Toby Date: Sun, 25 Apr 2021 00:38:07 -0700 Subject: [PATCH 5/8] UDP TProxy implementation, no ACL support yet --- cmd/client.go | 29 ++++++++++ pkg/relay/udp.go | 6 +- pkg/tproxy/udp_linux.go | 124 ++++++++++++++++++++++++++++++++++++++++ pkg/tproxy/udp_stub.go | 24 ++++++++ 4 files changed, 180 insertions(+), 3 deletions(-) create mode 100644 pkg/tproxy/udp_linux.go create mode 100644 pkg/tproxy/udp_stub.go diff --git a/cmd/client.go b/cmd/client.go index 8591f48..8d3dfe1 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -273,6 +273,35 @@ func client(config *clientConfig) { }() } + if len(config.UDPTProxy.Listen) > 0 { + go func() { + rl, err := tproxy.NewUDPTProxy(client, config.UDPTProxy.Listen, + time.Duration(config.UDPTProxy.Timeout)*time.Second, aclEngine, + func(addr net.Addr) { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + }).Debug("UDP TProxy request") + }, + func(addr net.Addr, err error) { + if err != tproxy.ErrTimeout { + logrus.WithFields(logrus.Fields{ + "error": err, + "src": addr.String(), + }).Info("UDP TProxy error") + } else { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + }).Debug("UDP TProxy session closed") + } + }) + if err != nil { + logrus.WithField("error", err).Fatal("Failed to initialize UDP TProxy") + } + logrus.WithField("addr", config.UDPTProxy.Listen).Info("UDP TProxy up and running") + errChan <- rl.ListenAndServe() + }() + } + err = <-errChan logrus.WithField("error", err).Fatal("Client shutdown") } diff --git a/pkg/relay/udp.go b/pkg/relay/udp.go index cc37830..d4d3f4a 100644 --- a/pkg/relay/udp.go +++ b/pkg/relay/udp.go @@ -43,7 +43,7 @@ func NewUDPRelay(hyClient *core.Client, listen, remote string, timeout time.Dura return r, nil } -type cmEntry struct { +type connEntry struct { HyConn core.UDPConn Deadline atomic.Value } @@ -55,7 +55,7 @@ func (r *UDPRelay) ListenAndServe() error { } defer conn.Close() // src <-> HyClient UDPConn - connMap := make(map[string]*cmEntry) + connMap := make(map[string]*connEntry) var connMapMutex sync.RWMutex // Read loop buf := make([]byte, udpBufferSize) @@ -77,7 +77,7 @@ func (r *UDPRelay) ListenAndServe() error { r.ErrorFunc(rAddr, err) } else { // Add it to the map - ent := &cmEntry{HyConn: hyConn} + ent := &connEntry{HyConn: hyConn} ent.Deadline.Store(time.Now().Add(r.Timeout)) connMapMutex.Lock() connMap[rAddr.String()] = ent diff --git a/pkg/tproxy/udp_linux.go b/pkg/tproxy/udp_linux.go new file mode 100644 index 0000000..99b1270 --- /dev/null +++ b/pkg/tproxy/udp_linux.go @@ -0,0 +1,124 @@ +package tproxy + +import ( + "errors" + "github.com/LiamHaworth/go-tproxy" + "github.com/tobyxdd/hysteria/pkg/acl" + "github.com/tobyxdd/hysteria/pkg/core" + "net" + "sync" + "sync/atomic" + "time" +) + +const udpBufferSize = 65535 + +var ErrTimeout = errors.New("inactivity timeout") + +type UDPTProxy struct { + HyClient *core.Client + ListenAddr *net.UDPAddr + Timeout time.Duration + ACLEngine *acl.Engine + + ConnFunc func(addr net.Addr) + ErrorFunc func(addr net.Addr, err error) +} + +func NewUDPTProxy(hyClient *core.Client, listen string, timeout time.Duration, aclEngine *acl.Engine, + connFunc func(addr net.Addr), errorFunc func(addr net.Addr, err error)) (*UDPTProxy, error) { + uAddr, err := net.ResolveUDPAddr("udp", listen) + if err != nil { + return nil, err + } + r := &UDPTProxy{ + HyClient: hyClient, + ListenAddr: uAddr, + Timeout: timeout, + ACLEngine: aclEngine, + ConnFunc: connFunc, + ErrorFunc: errorFunc, + } + if timeout == 0 { + r.Timeout = 1 * time.Minute + } + return r, nil +} + +type connEntry struct { + HyConn core.UDPConn + Deadline atomic.Value +} + +func (r *UDPTProxy) ListenAndServe() error { + conn, err := tproxy.ListenUDP("udp", r.ListenAddr) + if err != nil { + return err + } + defer conn.Close() + // src <-> HyClient UDPConn + connMap := make(map[string]*connEntry) + var connMapMutex sync.RWMutex + // Read loop + buf := make([]byte, udpBufferSize) + for { + n, srcAddr, dstAddr, err := tproxy.ReadFromUDP(conn, buf) + if n > 0 { + connMapMutex.RLock() + cme := connMap[srcAddr.String()] + connMapMutex.RUnlock() + if cme != nil { + // Existing conn + cme.Deadline.Store(time.Now().Add(r.Timeout)) + _ = cme.HyConn.WriteTo(buf[:n], dstAddr.String()) + } else { + // New + r.ConnFunc(srcAddr) + hyConn, err := r.HyClient.DialUDP() + if err != nil { + r.ErrorFunc(srcAddr, err) + } else { + // Add it to the map + ent := &connEntry{HyConn: hyConn} + ent.Deadline.Store(time.Now().Add(r.Timeout)) + connMapMutex.Lock() + connMap[srcAddr.String()] = ent + connMapMutex.Unlock() + // Start remote to local + go func() { + for { + bs, _, err := hyConn.ReadFrom() + if err != nil { + break + } + ent.Deadline.Store(time.Now().Add(r.Timeout)) + _, _ = conn.WriteToUDP(bs, srcAddr) + } + }() + // Timeout cleanup routine + go func() { + for { + ttl := ent.Deadline.Load().(time.Time).Sub(time.Now()) + if ttl <= 0 { + // Time to die + connMapMutex.Lock() + _ = hyConn.Close() + delete(connMap, srcAddr.String()) + connMapMutex.Unlock() + r.ErrorFunc(srcAddr, ErrTimeout) + return + } else { + time.Sleep(ttl) + } + } + }() + // Send the packet + _ = hyConn.WriteTo(buf[:n], dstAddr.String()) + } + } + } + if err != nil { + return err + } + } +} diff --git a/pkg/tproxy/udp_stub.go b/pkg/tproxy/udp_stub.go new file mode 100644 index 0000000..3788ce4 --- /dev/null +++ b/pkg/tproxy/udp_stub.go @@ -0,0 +1,24 @@ +// +build !linux + +package tproxy + +import ( + "errors" + "github.com/tobyxdd/hysteria/pkg/acl" + "github.com/tobyxdd/hysteria/pkg/core" + "net" + "time" +) + +var ErrTimeout = errors.New("inactivity timeout") + +type UDPTProxy struct{} + +func NewUDPTProxy(hyClient *core.Client, listen string, timeout time.Duration, aclEngine *acl.Engine, + connFunc func(addr net.Addr), errorFunc func(addr net.Addr, err error)) (*UDPTProxy, error) { + return nil, errors.New("not supported on the current system") +} + +func (r *UDPTProxy) ListenAndServe() error { + return nil +} From a28d5a567716f30036d629c11a799a8bafb5deab Mon Sep 17 00:00:00 2001 From: Toby Date: Sun, 25 Apr 2021 03:08:30 -0700 Subject: [PATCH 6/8] Update READMEs --- README.md | 36 ++++++++++++++++++++++-------------- README.zh.md | 28 ++++++++++++++++++---------- 2 files changed, 40 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 8178256..73a1712 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ [6]: https://t.me/hysteria_github -[中文 README](README.zh.md) +[中文](README.zh.md) Hysteria is a TCP/UDP relay & SOCKS5/HTTP proxy tool optimized for networks of poor quality (e.g. satellite connections, congested public Wi-Fi, connecting from China to servers abroad) powered by a custom version of QUIC protocol. @@ -86,21 +86,13 @@ Same as the server side, create a `config.json` under the root directory of the }, "http": { "listen": "127.0.0.1:8080" - }, - "relay_tcp": { - "listen": "127.0.0.1:2222", - "remote": "123.123.123.123:22" - }, - "relay_udp": { - "listen": "127.0.0.1:5333", - "remote": "8.8.8.8:53" } } ``` -This config enables a SOCKS5 proxy (with both TCP & UDP support), an HTTP proxy, a TCP relay to `123.123.123.123:22` and -a UDP relay to `8.8.8.8:53` -at the same time. Please modify or remove these entries according to your actual needs. +This config enables a SOCKS5 proxy (with both TCP & UDP support), and an HTTP proxy at the same time. There are many +other modes in Hysteria, be sure to check them out in [Advanced usage](#advanced-usage)! To enable or disable a mode, +simply add or remove its entry in the config file. If your server certificate is not issued by a trusted CA, you need to specify the CA used with `"ca": "/path/to/file.ca"` on the client or use `"insecure": true` to ignore all certificate errors (not @@ -223,15 +215,23 @@ hysteria_traffic_uplink_bytes_total{auth="aGFja2VyISE="} 37452 "key": "/home/ubuntu/my_key.crt" // Key file (HTTPS proxy) }, "relay_tcp": { - "listen": "127.0.0.1:2222", // TCP relay Listen address + "listen": "127.0.0.1:2222", // TCP relay listen address "remote": "123.123.123.123:22", // TCP relay remote address "timeout": 300 // TCP timeout in seconds }, "relay_udp": { - "listen": "127.0.0.1:5333", // UDP relay Listen address + "listen": "127.0.0.1:5333", // UDP relay listen address "remote": "8.8.8.8:53", // UDP relay remote address "timeout": 60 // UDP session timeout in seconds }, + "tproxy_tcp": { + "listen": "127.0.0.1:9000", // TCP TProxy listen address + "timeout": 300 // TCP timeout in seconds + }, + "tproxy_udp": { + "listen": "127.0.0.1:9000", // UDP TProxy listen address + "timeout": 60 // UDP session timeout in seconds + }, "acl": "my_list.acl", // See ACL below "obfs": "AMOGUS", // Obfuscation password "auth": "[BASE64]", // Authentication payload in Base64 @@ -243,6 +243,14 @@ hysteria_traffic_uplink_bytes_total{auth="aGFja2VyISE="} 37452 } ``` +#### Transparency proxy + +TPROXY modes (`tproxy_tcp` & `tproxy_udp`) are only available on Linux. + +References: +- https://www.kernel.org/doc/Documentation/networking/tproxy.txt +- https://powerdns.org/tproxydoc/tproxy.md.html + ## ACL [ACL File Format](ACL.md) diff --git a/README.zh.md b/README.zh.md index de5e122..67b2e31 100644 --- a/README.zh.md +++ b/README.zh.md @@ -79,20 +79,12 @@ Hysteria 是专门针对恶劣网络环境进行优化的 TCP/UDP 转发和代 }, "http": { "listen": "127.0.0.1:8080" - }, - "relay_tcp": { - "listen": "127.0.0.1:2222", - "remote": "123.123.123.123:22" - }, - "relay_udp": { - "listen": "127.0.0.1:5333", - "remote": "8.8.8.8:53" } } ``` -这个配置同时开了 SOCK5 (支持 TCP & UDP) 代理,HTTP 代理,到 `123.123.123.123:22` 的 TCP 转发和到 `8.8.8.8:53` 的 UDP 转发。 -请根据自己实际需要修改和删减。 +这个配置同时开了 SOCK5 (支持 TCP & UDP) 代理和 HTTP 代理。Hysteria 还有很多其他模式,请务必前往 [高级用法](#高级用法) 了解一下! +要启用/禁用一个模式,在配置文件中添加/移除对应条目即可。 如果你的服务端证书不是由受信任的 CA 签发的,需要用 `"ca": "/path/to/file.ca"` 指定使用的 CA 或者用 `"insecure": true` 忽略所有 证书错误(不推荐)。 @@ -220,6 +212,14 @@ hysteria_traffic_uplink_bytes_total{auth="aGFja2VyISE="} 37452 "remote": "8.8.8.8:53", // UDP 转发目标地址 "timeout": 60 // UDP 超时秒数 }, + "tproxy_tcp": { + "listen": "127.0.0.1:9000", // TCP 透明代理监听地址 + "timeout": 300 // TCP 超时秒数 + }, + "tproxy_udp": { + "listen": "127.0.0.1:9000", // UDP 透明代理监听地址 + "timeout": 60 // UDP 超时秒数 + }, "acl": "my_list.acl", // 见下文 ACL "obfs": "AMOGUS", // 混淆密码 "auth": "[BASE64]", // Base64 验证密钥 @@ -231,6 +231,14 @@ hysteria_traffic_uplink_bytes_total{auth="aGFja2VyISE="} 37452 } ``` +#### 透明代理 + +TPROXY 模式 (`tproxy_tcp` 和 `tproxy_udp`) 只在 Linux 下可用。 + +参考阅读: +- https://www.kernel.org/doc/Documentation/networking/tproxy.txt +- https://powerdns.org/tproxydoc/tproxy.md.html + ## 关于 ACL [ACL 文件格式](ACL.zh.md) From 7ba81612f821d731115b0db3acae81318241f936 Mon Sep 17 00:00:00 2001 From: Toby Date: Sun, 25 Apr 2021 20:41:39 -0700 Subject: [PATCH 7/8] bump DefaultMaxIncomingStreams to 4096 --- cmd/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/config.go b/cmd/config.go index 328af07..dd704a6 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -11,7 +11,7 @@ const ( DefaultMaxReceiveStreamFlowControlWindow = 33554432 DefaultMaxReceiveConnectionFlowControlWindow = 67108864 - DefaultMaxIncomingStreams = 1024 + DefaultMaxIncomingStreams = 4096 tlsProtocolName = "hysteria" ) From b3d149a72f34163b2366c411fc862a172bd41236 Mon Sep 17 00:00:00 2001 From: Toby Date: Sun, 25 Apr 2021 23:45:45 -0700 Subject: [PATCH 8/8] UDP TProxy ACL & Fix SOCKS5 hijack bug --- pkg/relay/udp.go | 18 ++--- pkg/socks5/server.go | 2 +- pkg/tproxy/udp_linux.go | 142 ++++++++++++++++++++++++++++++---------- 3 files changed, 116 insertions(+), 46 deletions(-) diff --git a/pkg/relay/udp.go b/pkg/relay/udp.go index d4d3f4a..c1181d7 100644 --- a/pkg/relay/udp.go +++ b/pkg/relay/udp.go @@ -63,12 +63,12 @@ func (r *UDPRelay) ListenAndServe() error { n, rAddr, err := conn.ReadFromUDP(buf) if n > 0 { connMapMutex.RLock() - cme := connMap[rAddr.String()] + entry := connMap[rAddr.String()] connMapMutex.RUnlock() - if cme != nil { + if entry != nil { // Existing conn - cme.Deadline.Store(time.Now().Add(r.Timeout)) - _ = cme.HyConn.WriteTo(buf[:n], r.Remote) + entry.Deadline.Store(time.Now().Add(r.Timeout)) + _ = entry.HyConn.WriteTo(buf[:n], r.Remote) } else { // New r.ConnFunc(rAddr) @@ -77,10 +77,10 @@ func (r *UDPRelay) ListenAndServe() error { r.ErrorFunc(rAddr, err) } else { // Add it to the map - ent := &connEntry{HyConn: hyConn} - ent.Deadline.Store(time.Now().Add(r.Timeout)) + entry := &connEntry{HyConn: hyConn} + entry.Deadline.Store(time.Now().Add(r.Timeout)) connMapMutex.Lock() - connMap[rAddr.String()] = ent + connMap[rAddr.String()] = entry connMapMutex.Unlock() // Start remote to local go func() { @@ -89,14 +89,14 @@ func (r *UDPRelay) ListenAndServe() error { if err != nil { break } - ent.Deadline.Store(time.Now().Add(r.Timeout)) + entry.Deadline.Store(time.Now().Add(r.Timeout)) _, _ = conn.WriteToUDP(bs, rAddr) } }() // Timeout cleanup routine go func() { for { - ttl := ent.Deadline.Load().(time.Time).Sub(time.Now()) + ttl := entry.Deadline.Load().(time.Time).Sub(time.Now()) if ttl <= 0 { // Time to die connMapMutex.Lock() diff --git a/pkg/socks5/server.go b/pkg/socks5/server.go index 202c903..996bfae 100644 --- a/pkg/socks5/server.go +++ b/pkg/socks5/server.go @@ -370,7 +370,7 @@ func (s *Server) udpServer(clientConn *net.UDPConn, localRelayConn *net.UDPConn, case acl.ActionBlock: // Do nothing case acl.ActionHijack: - hijackAddr := net.JoinHostPort(arg, net.JoinHostPort(arg, strconv.Itoa(int(port)))) + hijackAddr := net.JoinHostPort(arg, strconv.Itoa(int(port))) rAddr, err := net.ResolveUDPAddr("udp", hijackAddr) if err == nil { _, _ = localRelayConn.WriteToUDP(d.Data, rAddr) diff --git a/pkg/tproxy/udp_linux.go b/pkg/tproxy/udp_linux.go index 99b1270..c2b5ce8 100644 --- a/pkg/tproxy/udp_linux.go +++ b/pkg/tproxy/udp_linux.go @@ -5,7 +5,9 @@ import ( "github.com/LiamHaworth/go-tproxy" "github.com/tobyxdd/hysteria/pkg/acl" "github.com/tobyxdd/hysteria/pkg/core" + "github.com/tobyxdd/hysteria/pkg/utils" "net" + "strconv" "sync" "sync/atomic" "time" @@ -46,8 +48,52 @@ func NewUDPTProxy(hyClient *core.Client, listen string, timeout time.Duration, a } type connEntry struct { - HyConn core.UDPConn - Deadline atomic.Value + HyConn core.UDPConn + LocalConn *net.UDPConn + Deadline atomic.Value +} + +func (r *UDPTProxy) sendPacket(entry *connEntry, dstAddr *net.UDPAddr, data []byte) error { + entry.Deadline.Store(time.Now().Add(r.Timeout)) + host, port, err := utils.SplitHostPort(dstAddr.String()) + if err != nil { + return err + } + action, arg := acl.ActionProxy, "" + var ipAddr *net.IPAddr + var resErr error + if r.ACLEngine != nil && entry.LocalConn != nil { + action, arg, ipAddr, resErr = r.ACLEngine.ResolveAndMatch(host) + // Doesn't always matter if the resolution fails, as we may send it through HyClient + } + switch action { + case acl.ActionDirect: + if resErr != nil { + return resErr + } + _, err = entry.LocalConn.WriteToUDP(data, &net.UDPAddr{ + IP: ipAddr.IP, + Port: int(port), + Zone: ipAddr.Zone, + }) + return err + case acl.ActionProxy: + return entry.HyConn.WriteTo(data, dstAddr.String()) + case acl.ActionBlock: + // Do nothing + return nil + case acl.ActionHijack: + hijackAddr := net.JoinHostPort(arg, strconv.Itoa(int(port))) + rAddr, err := net.ResolveUDPAddr("udp", hijackAddr) + if err != nil { + return err + } + _, err = entry.LocalConn.WriteToUDP(data, rAddr) + return err + default: + // Do nothing + return nil + } } func (r *UDPTProxy) ListenAndServe() error { @@ -65,56 +111,80 @@ func (r *UDPTProxy) ListenAndServe() error { n, srcAddr, dstAddr, err := tproxy.ReadFromUDP(conn, buf) if n > 0 { connMapMutex.RLock() - cme := connMap[srcAddr.String()] + entry := connMap[srcAddr.String()] connMapMutex.RUnlock() - if cme != nil { + if entry != nil { // Existing conn - cme.Deadline.Store(time.Now().Add(r.Timeout)) - _ = cme.HyConn.WriteTo(buf[:n], dstAddr.String()) + _ = r.sendPacket(entry, dstAddr, buf[:n]) } else { // New r.ConnFunc(srcAddr) hyConn, err := r.HyClient.DialUDP() if err != nil { r.ErrorFunc(srcAddr, err) - } else { - // Add it to the map - ent := &connEntry{HyConn: hyConn} - ent.Deadline.Store(time.Now().Add(r.Timeout)) - connMapMutex.Lock() - connMap[srcAddr.String()] = ent - connMapMutex.Unlock() - // Start remote to local + continue + } + var localConn *net.UDPConn + if r.ACLEngine != nil { + localConn, err = net.ListenUDP("udp", nil) + if err != nil { + r.ErrorFunc(srcAddr, err) + continue + } + } + // Send + entry := &connEntry{HyConn: hyConn, LocalConn: localConn} + _ = r.sendPacket(entry, dstAddr, buf[:n]) + // Add it to the map + connMapMutex.Lock() + connMap[srcAddr.String()] = entry + connMapMutex.Unlock() + // Start remote to local + go func() { + for { + bs, _, err := hyConn.ReadFrom() + if err != nil { + break + } + entry.Deadline.Store(time.Now().Add(r.Timeout)) + _, _ = conn.WriteToUDP(bs, srcAddr) + } + }() + if localConn != nil { go func() { + buf := make([]byte, udpBufferSize) for { - bs, _, err := hyConn.ReadFrom() + n, _, err := localConn.ReadFrom(buf) + if n > 0 { + entry.Deadline.Store(time.Now().Add(r.Timeout)) + _, _ = conn.WriteToUDP(buf[:n], srcAddr) + } if err != nil { break } - ent.Deadline.Store(time.Now().Add(r.Timeout)) - _, _ = conn.WriteToUDP(bs, srcAddr) } }() - // Timeout cleanup routine - go func() { - for { - ttl := ent.Deadline.Load().(time.Time).Sub(time.Now()) - if ttl <= 0 { - // Time to die - connMapMutex.Lock() - _ = hyConn.Close() - delete(connMap, srcAddr.String()) - connMapMutex.Unlock() - r.ErrorFunc(srcAddr, ErrTimeout) - return - } else { - time.Sleep(ttl) - } - } - }() - // Send the packet - _ = hyConn.WriteTo(buf[:n], dstAddr.String()) } + // Timeout cleanup routine + go func() { + for { + ttl := entry.Deadline.Load().(time.Time).Sub(time.Now()) + if ttl <= 0 { + // Time to die + connMapMutex.Lock() + _ = hyConn.Close() + if localConn != nil { + _ = localConn.Close() + } + delete(connMap, srcAddr.String()) + connMapMutex.Unlock() + r.ErrorFunc(srcAddr, ErrTimeout) + return + } else { + time.Sleep(ttl) + } + } + }() } } if err != nil {