Merge pull request #56 from HyNetwork/wip-tproxy

TProxy
This commit is contained in:
Toby 2021-04-27 20:23:52 -07:00 committed by GitHub
commit bbfbb3d6c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 541 additions and 92 deletions

View File

@ -14,7 +14,7 @@
[6]: https://t.me/hysteria_github
[中文 README](README.zh.md)
[中文](README.zh.md)
Hysteria is a TCP/UDP relay & SOCKS5/HTTP proxy tool optimized for networks of poor quality (e.g. satellite connections,
congested public Wi-Fi, connecting from China to servers abroad) powered by a custom version of QUIC protocol.
@ -86,21 +86,13 @@ Same as the server side, create a `config.json` under the root directory of the
},
"http": {
"listen": "127.0.0.1:8080"
},
"relay_tcp": {
"listen": "127.0.0.1:2222",
"remote": "123.123.123.123:22"
},
"relay_udp": {
"listen": "127.0.0.1:5333",
"remote": "8.8.8.8:53"
}
}
```
This config enables a SOCKS5 proxy (with both TCP & UDP support), an HTTP proxy, a TCP relay to `123.123.123.123:22` and
a UDP relay to `8.8.8.8:53`
at the same time. Please modify or remove these entries according to your actual needs.
This config enables a SOCKS5 proxy (with both TCP & UDP support), and an HTTP proxy at the same time. There are many
other modes in Hysteria, be sure to check them out in [Advanced usage](#advanced-usage)! To enable or disable a mode,
simply add or remove its entry in the config file.
If your server certificate is not issued by a trusted CA, you need to specify the CA used
with `"ca": "/path/to/file.ca"` on the client or use `"insecure": true` to ignore all certificate errors (not
@ -223,15 +215,23 @@ hysteria_traffic_uplink_bytes_total{auth="aGFja2VyISE="} 37452
"key": "/home/ubuntu/my_key.crt" // Key file (HTTPS proxy)
},
"relay_tcp": {
"listen": "127.0.0.1:2222", // TCP relay Listen address
"listen": "127.0.0.1:2222", // TCP relay listen address
"remote": "123.123.123.123:22", // TCP relay remote address
"timeout": 300 // TCP timeout in seconds
},
"relay_udp": {
"listen": "127.0.0.1:5333", // UDP relay Listen address
"listen": "127.0.0.1:5333", // UDP relay listen address
"remote": "8.8.8.8:53", // UDP relay remote address
"timeout": 60 // UDP session timeout in seconds
},
"tproxy_tcp": {
"listen": "127.0.0.1:9000", // TCP TProxy listen address
"timeout": 300 // TCP timeout in seconds
},
"tproxy_udp": {
"listen": "127.0.0.1:9000", // UDP TProxy listen address
"timeout": 60 // UDP session timeout in seconds
},
"acl": "my_list.acl", // See ACL below
"obfs": "AMOGUS", // Obfuscation password
"auth": "[BASE64]", // Authentication payload in Base64
@ -243,6 +243,14 @@ hysteria_traffic_uplink_bytes_total{auth="aGFja2VyISE="} 37452
}
```
#### Transparency proxy
TPROXY modes (`tproxy_tcp` & `tproxy_udp`) are only available on Linux.
References:
- https://www.kernel.org/doc/Documentation/networking/tproxy.txt
- https://powerdns.org/tproxydoc/tproxy.md.html
## ACL
[ACL File Format](ACL.md)

View File

@ -79,20 +79,12 @@ Hysteria 是专门针对恶劣网络环境进行优化的 TCP/UDP 转发和代
},
"http": {
"listen": "127.0.0.1:8080"
},
"relay_tcp": {
"listen": "127.0.0.1:2222",
"remote": "123.123.123.123:22"
},
"relay_udp": {
"listen": "127.0.0.1:5333",
"remote": "8.8.8.8:53"
}
}
```
这个配置同时开了 SOCK5 (支持 TCP & UDP) 代理HTTP 代理,到 `123.123.123.123:22` 的 TCP 转发和到 `8.8.8.8:53` 的 UDP 转发。
请根据自己实际需要修改和删减
这个配置同时开了 SOCK5 (支持 TCP & UDP) 代理和 HTTP 代理。Hysteria 还有很多其他模式,请务必前往 [高级用法](#高级用法) 了解一下!
要启用/禁用一个模式,在配置文件中添加/移除对应条目即可。
如果你的服务端证书不是由受信任的 CA 签发的,需要用 `"ca": "/path/to/file.ca"` 指定使用的 CA 或者用 `"insecure": true` 忽略所有
证书错误(不推荐)。
@ -220,6 +212,14 @@ hysteria_traffic_uplink_bytes_total{auth="aGFja2VyISE="} 37452
"remote": "8.8.8.8:53", // UDP 转发目标地址
"timeout": 60 // UDP 超时秒数
},
"tproxy_tcp": {
"listen": "127.0.0.1:9000", // TCP 透明代理监听地址
"timeout": 300 // TCP 超时秒数
},
"tproxy_udp": {
"listen": "127.0.0.1:9000", // UDP 透明代理监听地址
"timeout": 60 // UDP 超时秒数
},
"acl": "my_list.acl", // 见下文 ACL
"obfs": "AMOGUS", // 混淆密码
"auth": "[BASE64]", // Base64 验证密钥
@ -231,6 +231,14 @@ hysteria_traffic_uplink_bytes_total{auth="aGFja2VyISE="} 37452
}
```
#### 透明代理
TPROXY 模式 (`tproxy_tcp``tproxy_udp`) 只在 Linux 下可用。
参考阅读:
- https://www.kernel.org/doc/Documentation/networking/tproxy.txt
- https://powerdns.org/tproxydoc/tproxy.md.html
## 关于 ACL
[ACL 文件格式](ACL.zh.md)

View File

@ -13,6 +13,7 @@ import (
"github.com/tobyxdd/hysteria/pkg/obfs"
"github.com/tobyxdd/hysteria/pkg/relay"
"github.com/tobyxdd/hysteria/pkg/socks5"
"github.com/tobyxdd/hysteria/pkg/tproxy"
"io"
"io/ioutil"
"net"
@ -239,6 +240,68 @@ func client(config *clientConfig) {
}()
}
if len(config.TCPTProxy.Listen) > 0 {
go func() {
rl, err := tproxy.NewTCPTProxy(client, config.TCPTProxy.Listen,
time.Duration(config.TCPTProxy.Timeout)*time.Second, aclEngine,
func(addr, reqAddr net.Addr, action acl.Action, arg string) {
logrus.WithFields(logrus.Fields{
"action": actionToString(action, arg),
"src": addr.String(),
"dst": reqAddr.String(),
}).Debug("TCP TProxy request")
},
func(addr, reqAddr net.Addr, err error) {
if err != io.EOF {
logrus.WithFields(logrus.Fields{
"error": err,
"src": addr.String(),
"dst": reqAddr.String(),
}).Info("TCP TProxy error")
} else {
logrus.WithFields(logrus.Fields{
"src": addr.String(),
"dst": reqAddr.String(),
}).Debug("TCP TProxy EOF")
}
})
if err != nil {
logrus.WithField("error", err).Fatal("Failed to initialize TCP TProxy")
}
logrus.WithField("addr", config.TCPTProxy.Listen).Info("TCP TProxy up and running")
errChan <- rl.ListenAndServe()
}()
}
if len(config.UDPTProxy.Listen) > 0 {
go func() {
rl, err := tproxy.NewUDPTProxy(client, config.UDPTProxy.Listen,
time.Duration(config.UDPTProxy.Timeout)*time.Second, aclEngine,
func(addr net.Addr) {
logrus.WithFields(logrus.Fields{
"src": addr.String(),
}).Debug("UDP TProxy request")
},
func(addr net.Addr, err error) {
if err != tproxy.ErrTimeout {
logrus.WithFields(logrus.Fields{
"error": err,
"src": addr.String(),
}).Info("UDP TProxy error")
} else {
logrus.WithFields(logrus.Fields{
"src": addr.String(),
}).Debug("UDP TProxy session closed")
}
})
if err != nil {
logrus.WithField("error", err).Fatal("Failed to initialize UDP TProxy")
}
logrus.WithField("addr", config.UDPTProxy.Listen).Info("UDP TProxy up and running")
errChan <- rl.ListenAndServe()
}()
}
err = <-errChan
logrus.WithField("error", err).Fatal("Client shutdown")
}

View File

@ -11,7 +11,7 @@ const (
DefaultMaxReceiveStreamFlowControlWindow = 33554432
DefaultMaxReceiveConnectionFlowControlWindow = 67108864
DefaultMaxIncomingStreams = 1024
DefaultMaxIncomingStreams = 4096
tlsProtocolName = "hysteria"
)
@ -90,6 +90,14 @@ type clientConfig struct {
Remote string `json:"remote"`
Timeout int `json:"timeout"`
} `json:"relay_udp"`
TCPTProxy struct {
Listen string `json:"listen"`
Timeout int `json:"timeout"`
} `json:"tproxy_tcp"`
UDPTProxy struct {
Listen string `json:"listen"`
Timeout int `json:"timeout"`
} `json:"tproxy_udp"`
ACL string `json:"acl"`
Obfs string `json:"obfs"`
Auth []byte `json:"auth"`
@ -102,8 +110,9 @@ type clientConfig struct {
func (c *clientConfig) Check() error {
if len(c.SOCKS5.Listen) == 0 && len(c.HTTP.Listen) == 0 &&
len(c.TCPRelay.Listen) == 0 && len(c.UDPRelay.Listen) == 0 {
return errors.New("no SOCKS5, HTTP, TCP relay or UDP relay listen address")
len(c.TCPRelay.Listen) == 0 && len(c.UDPRelay.Listen) == 0 &&
len(c.TCPTProxy.Listen) == 0 && len(c.UDPTProxy.Listen) == 0 {
return errors.New("no SOCKS5, HTTP, relay or TProxy listen address")
}
if len(c.TCPRelay.Listen) > 0 && len(c.TCPRelay.Remote) == 0 {
return errors.New("no TCP relay remote address")
@ -123,6 +132,12 @@ func (c *clientConfig) Check() error {
if c.UDPRelay.Timeout != 0 && c.UDPRelay.Timeout <= 4 {
return errors.New("invalid UDP relay timeout")
}
if c.TCPTProxy.Timeout != 0 && c.TCPTProxy.Timeout <= 4 {
return errors.New("invalid TCP TProxy timeout")
}
if c.UDPTProxy.Timeout != 0 && c.UDPTProxy.Timeout <= 4 {
return errors.New("invalid UDP TProxy timeout")
}
if len(c.Server) == 0 {
return errors.New("no server address")
}

1
go.mod
View File

@ -3,6 +3,7 @@ module github.com/tobyxdd/hysteria
go 1.14
require (
github.com/LiamHaworth/go-tproxy v0.0.0-20190726054950-ef7efd7f24ed
github.com/elazarl/goproxy v0.0.0-20200426045556-49ad98f6dac1
github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2
github.com/hashicorp/golang-lru v0.5.4

2
go.sum
View File

@ -9,6 +9,8 @@ dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/LiamHaworth/go-tproxy v0.0.0-20190726054950-ef7efd7f24ed h1:eqa6queieK8SvoszxCu0WwH7lSVeL4/N/f1JwOMw1G4=
github.com/LiamHaworth/go-tproxy v0.0.0-20190726054950-ef7efd7f24ed/go.mod h1:rA52xkgZwql9LRZXWb2arHEFP6qSR48KY2xOfWzEciQ=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=

View File

@ -195,7 +195,7 @@ func (c *Client) openStreamWithReconnect() (quic.Session, quic.Stream, error) {
}
func (c *Client) DialTCP(addr string) (net.Conn, error) {
host, port, err := splitHostPort(addr)
host, port, err := utils.SplitHostPort(addr)
if err != nil {
return nil, err
}
@ -366,7 +366,7 @@ func (c *quicPktConn) ReadFrom() ([]byte, string, error) {
}
func (c *quicPktConn) WriteTo(p []byte, addr string) error {
host, port, err := splitHostPort(addr)
host, port, err := utils.SplitHostPort(addr)
if err != nil {
return err
}
@ -384,15 +384,3 @@ func (c *quicPktConn) Close() error {
c.CloseFunc()
return c.Stream.Close()
}
func splitHostPort(hostport string) (string, uint16, error) {
host, port, err := net.SplitHostPort(hostport)
if err != nil {
return "", 0, err
}
portUint, err := strconv.ParseUint(port, 10, 16)
if err != nil {
return "", 0, err
}
return host, uint16(portUint), err
}

View File

@ -3,6 +3,7 @@ package http
import (
"errors"
"fmt"
"github.com/tobyxdd/hysteria/pkg/utils"
"net"
"net/http"
"strconv"
@ -24,11 +25,7 @@ func NewProxyHTTPServer(hyClient *core.Client, idleTimeout time.Duration, aclEng
proxy.Tr = &http.Transport{
Dial: func(network, addr string) (net.Conn, error) {
// Parse addr string
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
portUint, err := strconv.ParseUint(port, 10, 16)
host, port, err := utils.SplitHostPort(addr)
if err != nil {
return nil, err
}
@ -49,7 +46,7 @@ func NewProxyHTTPServer(hyClient *core.Client, idleTimeout time.Duration, aclEng
}
return net.DialTCP(network, nil, &net.TCPAddr{
IP: ipAddr.IP,
Port: int(portUint),
Port: int(port),
Zone: ipAddr.Zone,
})
case acl.ActionProxy:
@ -57,7 +54,7 @@ func NewProxyHTTPServer(hyClient *core.Client, idleTimeout time.Duration, aclEng
case acl.ActionBlock:
return nil, errors.New("blocked by ACL")
case acl.ActionHijack:
return net.Dial(network, net.JoinHostPort(arg, port))
return net.Dial(network, net.JoinHostPort(arg, strconv.Itoa(int(port))))
default:
return nil, fmt.Errorf("unknown action %d", action)
}

View File

@ -11,8 +11,6 @@ import (
const udpBufferSize = 65535
const udpMinTimeout = 4 * time.Second
var ErrTimeout = errors.New("inactivity timeout")
type UDPRelay struct {
@ -41,16 +39,13 @@ func NewUDPRelay(hyClient *core.Client, listen, remote string, timeout time.Dura
}
if timeout == 0 {
r.Timeout = 1 * time.Minute
} else if timeout < udpMinTimeout {
r.Timeout = udpMinTimeout
}
return r, nil
}
type cmEntry struct {
HyConn core.UDPConn
Addr *net.UDPAddr
LastActiveTime atomic.Value
type connEntry struct {
HyConn core.UDPConn
Deadline atomic.Value
}
func (r *UDPRelay) ListenAndServe() error {
@ -60,45 +55,20 @@ func (r *UDPRelay) ListenAndServe() error {
}
defer conn.Close()
// src <-> HyClient UDPConn
connMap := make(map[string]*cmEntry)
connMap := make(map[string]*connEntry)
var connMapMutex sync.RWMutex
// Timeout cleanup routine
stopChan := make(chan bool)
defer close(stopChan)
go func() {
ticker := time.NewTicker(udpMinTimeout)
defer ticker.Stop()
for {
select {
case <-stopChan:
return
case t := <-ticker.C:
allowedLAT := t.Add(-r.Timeout)
connMapMutex.Lock()
for k, v := range connMap {
if v.LastActiveTime.Load().(time.Time).Before(allowedLAT) {
// Timeout
r.ErrorFunc(v.Addr, ErrTimeout)
_ = v.HyConn.Close()
delete(connMap, k)
}
}
connMapMutex.Unlock()
}
}
}()
// Read loop
buf := make([]byte, udpBufferSize)
for {
n, rAddr, err := conn.ReadFromUDP(buf)
if n > 0 {
connMapMutex.RLock()
cme := connMap[rAddr.String()]
entry := connMap[rAddr.String()]
connMapMutex.RUnlock()
if cme != nil {
if entry != nil {
// Existing conn
cme.LastActiveTime.Store(time.Now())
_ = cme.HyConn.WriteTo(buf[:n], r.Remote)
entry.Deadline.Store(time.Now().Add(r.Timeout))
_ = entry.HyConn.WriteTo(buf[:n], r.Remote)
} else {
// New
r.ConnFunc(rAddr)
@ -107,10 +77,10 @@ func (r *UDPRelay) ListenAndServe() error {
r.ErrorFunc(rAddr, err)
} else {
// Add it to the map
ent := &cmEntry{HyConn: hyConn, Addr: rAddr}
ent.LastActiveTime.Store(time.Now())
entry := &connEntry{HyConn: hyConn}
entry.Deadline.Store(time.Now().Add(r.Timeout))
connMapMutex.Lock()
connMap[rAddr.String()] = ent
connMap[rAddr.String()] = entry
connMapMutex.Unlock()
// Start remote to local
go func() {
@ -119,10 +89,27 @@ func (r *UDPRelay) ListenAndServe() error {
if err != nil {
break
}
ent.LastActiveTime.Store(time.Now())
entry.Deadline.Store(time.Now().Add(r.Timeout))
_, _ = conn.WriteToUDP(bs, rAddr)
}
}()
// Timeout cleanup routine
go func() {
for {
ttl := entry.Deadline.Load().(time.Time).Sub(time.Now())
if ttl <= 0 {
// Time to die
connMapMutex.Lock()
_ = hyConn.Close()
delete(connMap, rAddr.String())
connMapMutex.Unlock()
r.ErrorFunc(rAddr, ErrTimeout)
return
} else {
time.Sleep(ttl)
}
}
}()
// Send the packet
_ = hyConn.WriteTo(buf[:n], r.Remote)
}

View File

@ -370,7 +370,7 @@ func (s *Server) udpServer(clientConn *net.UDPConn, localRelayConn *net.UDPConn,
case acl.ActionBlock:
// Do nothing
case acl.ActionHijack:
hijackAddr := net.JoinHostPort(arg, net.JoinHostPort(arg, strconv.Itoa(int(port))))
hijackAddr := net.JoinHostPort(arg, strconv.Itoa(int(port)))
rAddr, err := net.ResolveUDPAddr("udp", hijackAddr)
if err == nil {
_, _ = localRelayConn.WriteToUDP(d.Data, rAddr)

121
pkg/tproxy/tcp_linux.go Normal file
View File

@ -0,0 +1,121 @@
package tproxy
import (
"errors"
"fmt"
"github.com/LiamHaworth/go-tproxy"
"github.com/tobyxdd/hysteria/pkg/acl"
"github.com/tobyxdd/hysteria/pkg/core"
"github.com/tobyxdd/hysteria/pkg/utils"
"net"
"strconv"
"time"
)
type TCPTProxy struct {
HyClient *core.Client
ListenAddr *net.TCPAddr
Timeout time.Duration
ACLEngine *acl.Engine
ConnFunc func(addr, reqAddr net.Addr, action acl.Action, arg string)
ErrorFunc func(addr, reqAddr net.Addr, err error)
}
func NewTCPTProxy(hyClient *core.Client, listen string, timeout time.Duration, aclEngine *acl.Engine,
connFunc func(addr, reqAddr net.Addr, action acl.Action, arg string),
errorFunc func(addr, reqAddr net.Addr, err error)) (*TCPTProxy, error) {
tAddr, err := net.ResolveTCPAddr("tcp", listen)
if err != nil {
return nil, err
}
r := &TCPTProxy{
HyClient: hyClient,
ListenAddr: tAddr,
Timeout: timeout,
ACLEngine: aclEngine,
ConnFunc: connFunc,
ErrorFunc: errorFunc,
}
return r, nil
}
func (r *TCPTProxy) ListenAndServe() error {
listener, err := tproxy.ListenTCP("tcp", r.ListenAddr)
if err != nil {
return err
}
defer listener.Close()
for {
c, err := listener.Accept()
if err != nil {
return err
}
go func() {
defer c.Close()
// Under TPROXY mode, we are effectively acting as the remote server
// So our LocalAddr is actually the target to which the user is trying to connect
// and our RemoteAddr is the local address where the user initiates the connection
host, port, err := utils.SplitHostPort(c.LocalAddr().String())
if err != nil {
return
}
action, arg := acl.ActionProxy, ""
var ipAddr *net.IPAddr
var resErr error
if r.ACLEngine != nil {
action, arg, ipAddr, resErr = r.ACLEngine.ResolveAndMatch(host)
// Doesn't always matter if the resolution fails, as we may send it through HyClient
}
r.ConnFunc(c.RemoteAddr(), c.LocalAddr(), action, arg)
var closeErr error
defer func() {
r.ErrorFunc(c.RemoteAddr(), c.LocalAddr(), closeErr)
}()
// Handle according to the action
switch action {
case acl.ActionDirect:
if resErr != nil {
closeErr = resErr
return
}
rc, err := net.DialTCP("tcp", nil, &net.TCPAddr{
IP: ipAddr.IP,
Port: int(port),
Zone: ipAddr.Zone,
})
if err != nil {
closeErr = err
return
}
defer rc.Close()
closeErr = utils.PipePairWithTimeout(c, rc, r.Timeout)
return
case acl.ActionProxy:
rc, err := r.HyClient.DialTCP(c.LocalAddr().String())
if err != nil {
closeErr = err
return
}
defer rc.Close()
closeErr = utils.PipePairWithTimeout(c, rc, r.Timeout)
return
case acl.ActionBlock:
closeErr = errors.New("blocked in ACL")
return
case acl.ActionHijack:
rc, err := net.Dial("tcp", net.JoinHostPort(arg, strconv.Itoa(int(port))))
if err != nil {
closeErr = err
return
}
defer rc.Close()
closeErr = utils.PipePairWithTimeout(c, rc, r.Timeout)
return
default:
closeErr = fmt.Errorf("unknown action %d", action)
return
}
}()
}
}

23
pkg/tproxy/tcp_stub.go Normal file
View File

@ -0,0 +1,23 @@
// +build !linux
package tproxy
import (
"errors"
"github.com/tobyxdd/hysteria/pkg/acl"
"github.com/tobyxdd/hysteria/pkg/core"
"net"
"time"
)
type TCPTProxy struct{}
func NewTCPTProxy(hyClient *core.Client, listen string, timeout time.Duration, aclEngine *acl.Engine,
connFunc func(addr, reqAddr net.Addr, action acl.Action, arg string),
errorFunc func(addr, reqAddr net.Addr, err error)) (*TCPTProxy, error) {
return nil, errors.New("not supported on the current system")
}
func (r *TCPTProxy) ListenAndServe() error {
return nil
}

194
pkg/tproxy/udp_linux.go Normal file
View File

@ -0,0 +1,194 @@
package tproxy
import (
"errors"
"github.com/LiamHaworth/go-tproxy"
"github.com/tobyxdd/hysteria/pkg/acl"
"github.com/tobyxdd/hysteria/pkg/core"
"github.com/tobyxdd/hysteria/pkg/utils"
"net"
"strconv"
"sync"
"sync/atomic"
"time"
)
const udpBufferSize = 65535
var ErrTimeout = errors.New("inactivity timeout")
type UDPTProxy struct {
HyClient *core.Client
ListenAddr *net.UDPAddr
Timeout time.Duration
ACLEngine *acl.Engine
ConnFunc func(addr net.Addr)
ErrorFunc func(addr net.Addr, err error)
}
func NewUDPTProxy(hyClient *core.Client, listen string, timeout time.Duration, aclEngine *acl.Engine,
connFunc func(addr net.Addr), errorFunc func(addr net.Addr, err error)) (*UDPTProxy, error) {
uAddr, err := net.ResolveUDPAddr("udp", listen)
if err != nil {
return nil, err
}
r := &UDPTProxy{
HyClient: hyClient,
ListenAddr: uAddr,
Timeout: timeout,
ACLEngine: aclEngine,
ConnFunc: connFunc,
ErrorFunc: errorFunc,
}
if timeout == 0 {
r.Timeout = 1 * time.Minute
}
return r, nil
}
type connEntry struct {
HyConn core.UDPConn
LocalConn *net.UDPConn
Deadline atomic.Value
}
func (r *UDPTProxy) sendPacket(entry *connEntry, dstAddr *net.UDPAddr, data []byte) error {
entry.Deadline.Store(time.Now().Add(r.Timeout))
host, port, err := utils.SplitHostPort(dstAddr.String())
if err != nil {
return err
}
action, arg := acl.ActionProxy, ""
var ipAddr *net.IPAddr
var resErr error
if r.ACLEngine != nil && entry.LocalConn != nil {
action, arg, ipAddr, resErr = r.ACLEngine.ResolveAndMatch(host)
// Doesn't always matter if the resolution fails, as we may send it through HyClient
}
switch action {
case acl.ActionDirect:
if resErr != nil {
return resErr
}
_, err = entry.LocalConn.WriteToUDP(data, &net.UDPAddr{
IP: ipAddr.IP,
Port: int(port),
Zone: ipAddr.Zone,
})
return err
case acl.ActionProxy:
return entry.HyConn.WriteTo(data, dstAddr.String())
case acl.ActionBlock:
// Do nothing
return nil
case acl.ActionHijack:
hijackAddr := net.JoinHostPort(arg, strconv.Itoa(int(port)))
rAddr, err := net.ResolveUDPAddr("udp", hijackAddr)
if err != nil {
return err
}
_, err = entry.LocalConn.WriteToUDP(data, rAddr)
return err
default:
// Do nothing
return nil
}
}
func (r *UDPTProxy) ListenAndServe() error {
conn, err := tproxy.ListenUDP("udp", r.ListenAddr)
if err != nil {
return err
}
defer conn.Close()
// src <-> HyClient UDPConn
connMap := make(map[string]*connEntry)
var connMapMutex sync.RWMutex
// Read loop
buf := make([]byte, udpBufferSize)
for {
n, srcAddr, dstAddr, err := tproxy.ReadFromUDP(conn, buf)
if n > 0 {
connMapMutex.RLock()
entry := connMap[srcAddr.String()]
connMapMutex.RUnlock()
if entry != nil {
// Existing conn
_ = r.sendPacket(entry, dstAddr, buf[:n])
} else {
// New
r.ConnFunc(srcAddr)
hyConn, err := r.HyClient.DialUDP()
if err != nil {
r.ErrorFunc(srcAddr, err)
continue
}
var localConn *net.UDPConn
if r.ACLEngine != nil {
localConn, err = net.ListenUDP("udp", nil)
if err != nil {
r.ErrorFunc(srcAddr, err)
continue
}
}
// Send
entry := &connEntry{HyConn: hyConn, LocalConn: localConn}
_ = r.sendPacket(entry, dstAddr, buf[:n])
// Add it to the map
connMapMutex.Lock()
connMap[srcAddr.String()] = entry
connMapMutex.Unlock()
// Start remote to local
go func() {
for {
bs, _, err := hyConn.ReadFrom()
if err != nil {
break
}
entry.Deadline.Store(time.Now().Add(r.Timeout))
_, _ = conn.WriteToUDP(bs, srcAddr)
}
}()
if localConn != nil {
go func() {
buf := make([]byte, udpBufferSize)
for {
n, _, err := localConn.ReadFrom(buf)
if n > 0 {
entry.Deadline.Store(time.Now().Add(r.Timeout))
_, _ = conn.WriteToUDP(buf[:n], srcAddr)
}
if err != nil {
break
}
}
}()
}
// Timeout cleanup routine
go func() {
for {
ttl := entry.Deadline.Load().(time.Time).Sub(time.Now())
if ttl <= 0 {
// Time to die
connMapMutex.Lock()
_ = hyConn.Close()
if localConn != nil {
_ = localConn.Close()
}
delete(connMap, srcAddr.String())
connMapMutex.Unlock()
r.ErrorFunc(srcAddr, ErrTimeout)
return
} else {
time.Sleep(ttl)
}
}
}()
}
}
if err != nil {
return err
}
}
}

24
pkg/tproxy/udp_stub.go Normal file
View File

@ -0,0 +1,24 @@
// +build !linux
package tproxy
import (
"errors"
"github.com/tobyxdd/hysteria/pkg/acl"
"github.com/tobyxdd/hysteria/pkg/core"
"net"
"time"
)
var ErrTimeout = errors.New("inactivity timeout")
type UDPTProxy struct{}
func NewUDPTProxy(hyClient *core.Client, listen string, timeout time.Duration, aclEngine *acl.Engine,
connFunc func(addr net.Addr), errorFunc func(addr net.Addr, err error)) (*UDPTProxy, error) {
return nil, errors.New("not supported on the current system")
}
func (r *UDPTProxy) ListenAndServe() error {
return nil
}

18
pkg/utils/misc.go Normal file
View File

@ -0,0 +1,18 @@
package utils
import (
"net"
"strconv"
)
func SplitHostPort(hostport string) (string, uint16, error) {
host, port, err := net.SplitHostPort(hostport)
if err != nil {
return "", 0, err
}
portUint, err := strconv.ParseUint(port, 10, 16)
if err != nil {
return "", 0, err
}
return host, uint16(portUint), err
}

View File

@ -46,7 +46,7 @@ func Pipe2Way(rw1, rw2 io.ReadWriter, count func(int)) error {
return <-errChan
}
func PipePairWithTimeout(conn *net.TCPConn, stream io.ReadWriteCloser, timeout time.Duration) error {
func PipePairWithTimeout(conn net.Conn, stream io.ReadWriteCloser, timeout time.Duration) error {
errChan := make(chan error, 2)
// TCP to stream
go func() {