From e3c308859623bc719cd4fad7f7c7179f812dd727 Mon Sep 17 00:00:00 2001 From: Toby Date: Sat, 22 Oct 2022 11:45:46 -0700 Subject: [PATCH] wip: core client & server rework --- cmd/client.go | 27 +++- cmd/server.go | 26 +++- pkg/core/client.go | 86 +++++++---- pkg/core/server.go | 21 ++- pkg/obfs/dummy.go | 18 --- pkg/obfs/obfs.go | 6 - pkg/obfs/xplus.go | 52 ------- pkg/transport/client.go | 63 -------- .../pktconns}/faketcp/LICENSE | 0 .../pktconns}/faketcp/obfs.go | 28 ++-- .../pktconns}/faketcp/tcp_linux.go | 0 .../pktconns}/faketcp/tcp_stub.go | 0 .../pktconns}/faketcp/tcp_test.go | 0 pkg/transport/pktconns/func.go | 146 ++++++++++++++++++ pkg/transport/pktconns/obfs/obfs.go | 58 +++++++ .../pktconns/obfs/obfs_test.go} | 0 pkg/{conns => transport/pktconns}/udp/obfs.go | 30 ++-- .../pktconns}/wechat/obfs.go | 46 ++++-- pkg/transport/server.go | 65 -------- 19 files changed, 368 insertions(+), 304 deletions(-) delete mode 100644 pkg/obfs/dummy.go delete mode 100644 pkg/obfs/obfs.go delete mode 100644 pkg/obfs/xplus.go rename pkg/{conns => transport/pktconns}/faketcp/LICENSE (100%) rename pkg/{conns => transport/pktconns}/faketcp/obfs.go (60%) rename pkg/{conns => transport/pktconns}/faketcp/tcp_linux.go (100%) rename pkg/{conns => transport/pktconns}/faketcp/tcp_stub.go (100%) rename pkg/{conns => transport/pktconns}/faketcp/tcp_test.go (100%) create mode 100644 pkg/transport/pktconns/func.go create mode 100644 pkg/transport/pktconns/obfs/obfs.go rename pkg/{obfs/xplus_test.go => transport/pktconns/obfs/obfs_test.go} (100%) rename pkg/{conns => transport/pktconns}/udp/obfs.go (60%) rename pkg/{conns => transport/pktconns}/wechat/obfs.go (54%) diff --git a/cmd/client.go b/cmd/client.go index f36e8c5..6ec5b8c 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -11,6 +11,8 @@ import ( "os" "time" + "github.com/HyNetwork/hysteria/pkg/transport/pktconns" + "github.com/HyNetwork/hysteria/pkg/pmtud_fix" "github.com/HyNetwork/hysteria/pkg/redirect" "github.com/oschwald/geoip2-golang" @@ -19,7 +21,6 @@ import ( "github.com/HyNetwork/hysteria/pkg/acl" "github.com/HyNetwork/hysteria/pkg/core" hyHTTP "github.com/HyNetwork/hysteria/pkg/http" - "github.com/HyNetwork/hysteria/pkg/obfs" "github.com/HyNetwork/hysteria/pkg/relay" "github.com/HyNetwork/hysteria/pkg/socks5" "github.com/HyNetwork/hysteria/pkg/tproxy" @@ -28,6 +29,14 @@ import ( "github.com/sirupsen/logrus" ) +var clientPacketConnFuncFactoryMap = map[string]pktconns.ClientPacketConnFuncFactory{ + "": pktconns.NewClientUDPConnFunc, + "udp": pktconns.NewClientUDPConnFunc, + "wechat": pktconns.NewClientWeChatConnFunc, + "wechat-video": pktconns.NewClientWeChatConnFunc, + "faketcp": pktconns.NewClientFakeTCPConnFunc, +} + func client(config *clientConfig) { logrus.WithField("config", config.String()).Info("Client configuration loaded") // Resolver @@ -102,11 +111,14 @@ func client(config *clientConfig) { } else { auth = []byte(config.AuthString) } - // Obfuscator - var obfuscator obfs.Obfuscator - if len(config.Obfs) > 0 { - obfuscator = obfs.NewXPlusObfuscator([]byte(config.Obfs)) + // Packet conn + pktConnFuncFactory := clientPacketConnFuncFactoryMap[config.Protocol] + if pktConnFuncFactory == nil { + logrus.WithFields(logrus.Fields{ + "protocol": config.Protocol, + }).Fatal("Unsupported protocol") } + pktConnFunc := pktConnFuncFactory(config.Obfs) // Resolve preference if len(config.ResolvePreference) > 0 { pref, err := transport.ResolvePreferenceFromString(config.ResolvePreference) @@ -142,8 +154,7 @@ func client(config *clientConfig) { up, down, _ := config.Speed() for { try += 1 - c, err := core.NewClient(config.Server, config.Protocol, auth, tlsConfig, quicConfig, - transport.DefaultClientTransport, up, down, obfuscator, + c, err := core.NewClient(config.Server, auth, tlsConfig, quicConfig, pktConnFunc, up, down, func(err error) { if config.QuitOnDisconnect { logrus.WithFields(logrus.Fields{ @@ -154,7 +165,7 @@ func client(config *clientConfig) { logrus.WithFields(logrus.Fields{ "addr": config.Server, "error": err, - }).Info("Connection to server lost, reconnecting...") + }).Error("Connection to server lost, reconnecting...") } }) if err != nil { diff --git a/cmd/server.go b/cmd/server.go index 47041e5..de25ea4 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -8,10 +8,11 @@ import ( "net/http" "time" + "github.com/HyNetwork/hysteria/pkg/transport/pktconns" + "github.com/HyNetwork/hysteria/cmd/auth" "github.com/HyNetwork/hysteria/pkg/acl" "github.com/HyNetwork/hysteria/pkg/core" - "github.com/HyNetwork/hysteria/pkg/obfs" "github.com/HyNetwork/hysteria/pkg/pmtud_fix" "github.com/HyNetwork/hysteria/pkg/sockopt" "github.com/HyNetwork/hysteria/pkg/transport" @@ -23,6 +24,14 @@ import ( "github.com/yosuke-furukawa/json5/encoding/json5" ) +var serverPacketConnFuncFactoryMap = map[string]pktconns.ServerPacketConnFuncFactory{ + "": pktconns.NewServerUDPConnFunc, + "udp": pktconns.NewServerUDPConnFunc, + "wechat": pktconns.NewServerWeChatConnFunc, + "wechat-video": pktconns.NewServerWeChatConnFunc, + "faketcp": pktconns.NewServerFakeTCPConnFunc, +} + func server(config *serverConfig) { logrus.WithField("config", config.String()).Info("Server configuration loaded") // Resolver @@ -141,11 +150,12 @@ func server(config *serverConfig) { } return ok, msg } - // Obfuscator - var obfuscator obfs.Obfuscator - if len(config.Obfs) > 0 { - obfuscator = obfs.NewXPlusObfuscator([]byte(config.Obfs)) + // Packet conn + pktConnFuncFactory := serverPacketConnFuncFactoryMap[config.Protocol] + if pktConnFuncFactory == nil { + logrus.WithField("protocol", config.Protocol).Fatal("Unsupported protocol") } + pktConnFunc := pktConnFuncFactory(config.Obfs) // Resolve preference if len(config.ResolvePreference) > 0 { pref, err := transport.ResolvePreferenceFromString(config.ResolvePreference) @@ -221,9 +231,9 @@ func server(config *serverConfig) { }() } up, down, _ := config.Speed() - server, err := core.NewServer(config.Listen, config.Protocol, tlsConfig, quicConfig, transport.DefaultServerTransport, - up, down, config.DisableUDP, aclEngine, obfuscator, connectFunc, disconnectFunc, - tcpRequestFunc, tcpErrorFunc, udpRequestFunc, udpErrorFunc, promReg) + server, err := core.NewServer(config.Listen, tlsConfig, quicConfig, pktConnFunc, + transport.DefaultServerTransport, up, down, config.DisableUDP, aclEngine, + connectFunc, disconnectFunc, tcpRequestFunc, tcpErrorFunc, udpRequestFunc, udpErrorFunc, promReg) if err != nil { logrus.WithField("error", err).Fatal("Failed to initialize server") } diff --git a/pkg/core/client.go b/pkg/core/client.go index 3a2fa86..e413a58 100644 --- a/pkg/core/client.go +++ b/pkg/core/client.go @@ -12,11 +12,11 @@ import ( "sync" "time" + "github.com/HyNetwork/hysteria/pkg/transport/pktconns" + "github.com/HyNetwork/hysteria/pkg/congestion" - "github.com/HyNetwork/hysteria/pkg/obfs" "github.com/HyNetwork/hysteria/pkg/pmtud_fix" - "github.com/HyNetwork/hysteria/pkg/transport" "github.com/HyNetwork/hysteria/pkg/utils" "github.com/lucas-clemente/quic-go" "github.com/lunixbochs/struc" @@ -25,18 +25,18 @@ import ( var ErrClosed = errors.New("closed") type Client struct { - transport *transport.ClientTransport serverAddr string - protocol string sendBPS, recvBPS uint64 auth []byte - obfuscator obfs.Obfuscator tlsConfig *tls.Config quicConfig *quic.Config - quicSession quic.Connection + pktConnFunc pktconns.ClientPacketConnFunc + reconnectMutex sync.Mutex + pktConn net.PacketConn + quicConn quic.Connection closed bool udpSessionMutex sync.RWMutex @@ -46,59 +46,78 @@ type Client struct { quicReconnectFunc func(err error) } -func NewClient(serverAddr string, protocol string, auth []byte, tlsConfig *tls.Config, quicConfig *quic.Config, - transport *transport.ClientTransport, sendBPS uint64, recvBPS uint64, obfuscator obfs.Obfuscator, - quicReconnectFunc func(err error), +func NewClient(serverAddr string, auth []byte, tlsConfig *tls.Config, quicConfig *quic.Config, + pktConnFunc pktconns.ClientPacketConnFunc, sendBPS uint64, recvBPS uint64, quicReconnectFunc func(err error), ) (*Client, error) { quicConfig.DisablePathMTUDiscovery = quicConfig.DisablePathMTUDiscovery || pmtud_fix.DisablePathMTUDiscovery c := &Client{ - transport: transport, serverAddr: serverAddr, - protocol: protocol, sendBPS: sendBPS, recvBPS: recvBPS, auth: auth, - obfuscator: obfuscator, tlsConfig: tlsConfig, quicConfig: quicConfig, + pktConnFunc: pktConnFunc, quicReconnectFunc: quicReconnectFunc, } - if err := c.connectToServer(); err != nil { + if err := c.connect(); err != nil { return nil, err } return c, nil } -func (c *Client) connectToServer() error { - qs, err := c.transport.QUICDial(c.protocol, c.serverAddr, c.tlsConfig, c.quicConfig, c.obfuscator) +func (c *Client) connect() error { + // Clear previous connection + if c.quicConn != nil { + _ = c.quicConn.CloseWithError(0, "") + } + if c.pktConn != nil { + _ = c.pktConn.Close() + } + // New connection + pktConn, err := c.pktConnFunc(c.serverAddr) if err != nil { return err } + serverUDPAddr, err := net.ResolveUDPAddr("udp", c.serverAddr) + if err != nil { + _ = pktConn.Close() + return err + } + quicConn, err := quic.Dial(pktConn, serverUDPAddr, c.serverAddr, c.tlsConfig, c.quicConfig) + if err != nil { + _ = pktConn.Close() + return err + } // Control stream ctx, ctxCancel := context.WithTimeout(context.Background(), protocolTimeout) - stream, err := qs.OpenStreamSync(ctx) + stream, err := quicConn.OpenStreamSync(ctx) ctxCancel() if err != nil { - _ = qs.CloseWithError(closeErrorCodeProtocol, "protocol error") + _ = quicConn.CloseWithError(closeErrorCodeProtocol, "protocol error") + _ = pktConn.Close() return err } - ok, msg, err := c.handleControlStream(qs, stream) + ok, msg, err := c.handleControlStream(quicConn, stream) if err != nil { - _ = qs.CloseWithError(closeErrorCodeProtocol, "protocol error") + _ = quicConn.CloseWithError(closeErrorCodeProtocol, "protocol error") + _ = pktConn.Close() return err } if !ok { - _ = qs.CloseWithError(closeErrorCodeAuth, "auth error") + _ = quicConn.CloseWithError(closeErrorCodeAuth, "auth error") + _ = pktConn.Close() return fmt.Errorf("auth error: %s", msg) } // All good c.udpSessionMap = make(map[uint32]chan *udpMessage) - go c.handleMessage(qs) - c.quicSession = qs + go c.handleMessage(quicConn) + c.pktConn = pktConn + c.quicConn = quicConn return nil } -func (c *Client) handleControlStream(qs quic.Connection, stream quic.Stream) (bool, string, error) { +func (c *Client) handleControlStream(qc quic.Connection, stream quic.Stream) (bool, string, error) { // Send protocol version _, err := stream.Write([]byte{protocolVersion}) if err != nil { @@ -123,14 +142,14 @@ func (c *Client) handleControlStream(qs quic.Connection, stream quic.Stream) (bo } // Set the congestion accordingly if sh.OK { - qs.SetCongestionControl(congestion.NewBrutalSender(sh.Rate.RecvBPS)) + qc.SetCongestionControl(congestion.NewBrutalSender(sh.Rate.RecvBPS)) } return sh.OK, sh.Message, nil } -func (c *Client) handleMessage(qs quic.Connection) { +func (c *Client) handleMessage(qc quic.Connection) { for { - msg, err := qs.ReceiveMessage() + msg, err := qc.ReceiveMessage() if err != nil { break } @@ -163,10 +182,10 @@ func (c *Client) openStreamWithReconnect() (quic.Connection, quic.Stream, error) if c.closed { return nil, nil, ErrClosed } - stream, err := c.quicSession.OpenStream() + stream, err := c.quicConn.OpenStream() if err == nil { // All good - return c.quicSession, &wrappedQUICStream{stream}, nil + return c.quicConn, &wrappedQUICStream{stream}, nil } // Something is wrong if nErr, ok := err.(net.Error); ok && nErr.Temporary() { @@ -175,13 +194,13 @@ func (c *Client) openStreamWithReconnect() (quic.Connection, quic.Stream, error) } c.quicReconnectFunc(err) // Permanent error, need to reconnect - if err := c.connectToServer(); err != nil { + if err := c.connect(); err != nil { // Still error, oops return nil, nil, err } // We are not going to try again even if it still fails the second time - stream, err = c.quicSession.OpenStream() - return c.quicSession, &wrappedQUICStream{stream}, err + stream, err = c.quicConn.OpenStream() + return c.quicConn, &wrappedQUICStream{stream}, err } func (c *Client) DialTCP(addr string) (net.Conn, error) { @@ -250,7 +269,7 @@ func (c *Client) DialUDP() (UDPConn, error) { c.udpSessionMutex.Lock() nCh := make(chan *udpMessage, 1024) // Store the current session map for CloseFunc below - // to ensures that we are adding and removing sessions on the same map, + // to ensure that we are adding and removing sessions on the same map, // as reconnecting will reassign the map sessionMap := c.udpSessionMap sessionMap[sr.UDPSessionID] = nCh @@ -277,7 +296,8 @@ func (c *Client) DialUDP() (UDPConn, error) { func (c *Client) Close() error { c.reconnectMutex.Lock() defer c.reconnectMutex.Unlock() - err := c.quicSession.CloseWithError(closeErrorCodeGeneric, "") + err := c.quicConn.CloseWithError(closeErrorCodeGeneric, "") + _ = c.pktConn.Close() c.closed = true return err } diff --git a/pkg/core/server.go b/pkg/core/server.go index 043d21f..8fb9930 100644 --- a/pkg/core/server.go +++ b/pkg/core/server.go @@ -7,10 +7,11 @@ import ( "fmt" "net" + "github.com/HyNetwork/hysteria/pkg/transport/pktconns" + "github.com/HyNetwork/hysteria/pkg/congestion" "github.com/HyNetwork/hysteria/pkg/acl" - "github.com/HyNetwork/hysteria/pkg/obfs" "github.com/HyNetwork/hysteria/pkg/pmtud_fix" "github.com/HyNetwork/hysteria/pkg/transport" "github.com/lucas-clemente/quic-go" @@ -43,21 +44,29 @@ type Server struct { upCounterVec, downCounterVec *prometheus.CounterVec connGaugeVec *prometheus.GaugeVec + pktConn net.PacketConn listener quic.Listener } -func NewServer(addr string, protocol string, tlsConfig *tls.Config, quicConfig *quic.Config, transport *transport.ServerTransport, +func NewServer(addr string, tlsConfig *tls.Config, quicConfig *quic.Config, + pktConnFunc pktconns.ServerPacketConnFunc, transport *transport.ServerTransport, sendBPS uint64, recvBPS uint64, disableUDP bool, aclEngine *acl.Engine, - obfuscator obfs.Obfuscator, connectFunc ConnectFunc, disconnectFunc DisconnectFunc, + connectFunc ConnectFunc, disconnectFunc DisconnectFunc, tcpRequestFunc TCPRequestFunc, tcpErrorFunc TCPErrorFunc, udpRequestFunc UDPRequestFunc, udpErrorFunc UDPErrorFunc, promRegistry *prometheus.Registry, ) (*Server, error) { quicConfig.DisablePathMTUDiscovery = quicConfig.DisablePathMTUDiscovery || pmtud_fix.DisablePathMTUDiscovery - listener, err := transport.QUICListen(protocol, addr, tlsConfig, quicConfig, obfuscator) + pktConn, err := pktConnFunc(addr) if err != nil { return nil, err } + listener, err := quic.Listen(pktConn, tlsConfig, quicConfig) + if err != nil { + _ = pktConn.Close() + return nil, err + } s := &Server{ + pktConn: pktConn, listener: listener, transport: transport, sendBPS: sendBPS, @@ -97,7 +106,9 @@ func (s *Server) Serve() error { } func (s *Server) Close() error { - return s.listener.Close() + err := s.listener.Close() + _ = s.pktConn.Close() + return err } func (s *Server) handleClient(cs quic.Connection) { diff --git a/pkg/obfs/dummy.go b/pkg/obfs/dummy.go deleted file mode 100644 index 3edff10..0000000 --- a/pkg/obfs/dummy.go +++ /dev/null @@ -1,18 +0,0 @@ -package obfs - -type DummyObfuscator struct{} - -func NewDummyObfuscator() *DummyObfuscator { - return &DummyObfuscator{} -} - -func (x *DummyObfuscator) Deobfuscate(in []byte, out []byte) int { - if len(out) < len(in) { - return 0 - } - return copy(out, in) -} - -func (x *DummyObfuscator) Obfuscate(in []byte, out []byte) int { - return copy(out, in) -} diff --git a/pkg/obfs/obfs.go b/pkg/obfs/obfs.go deleted file mode 100644 index cb108a3..0000000 --- a/pkg/obfs/obfs.go +++ /dev/null @@ -1,6 +0,0 @@ -package obfs - -type Obfuscator interface { - Deobfuscate(in []byte, out []byte) int - Obfuscate(in []byte, out []byte) int -} diff --git a/pkg/obfs/xplus.go b/pkg/obfs/xplus.go deleted file mode 100644 index dd63645..0000000 --- a/pkg/obfs/xplus.go +++ /dev/null @@ -1,52 +0,0 @@ -package obfs - -import ( - "crypto/sha256" - "math/rand" - "sync" - "time" -) - -// [salt][obfuscated payload] - -const saltLen = 16 - -type XPlusObfuscator struct { - Key []byte - RandSrc *rand.Rand - - lk sync.Mutex -} - -func NewXPlusObfuscator(key []byte) *XPlusObfuscator { - return &XPlusObfuscator{ - Key: key, - RandSrc: rand.New(rand.NewSource(time.Now().UnixNano())), - } -} - -func (x *XPlusObfuscator) Deobfuscate(in []byte, out []byte) int { - pLen := len(in) - saltLen - if pLen <= 0 || len(out) < pLen { - // Invalid - return 0 - } - key := sha256.Sum256(append(x.Key, in[:saltLen]...)) - // Deobfuscate the payload - for i, c := range in[saltLen:] { - out[i] = c ^ key[i%sha256.Size] - } - return pLen -} - -func (x *XPlusObfuscator) Obfuscate(in []byte, out []byte) int { - x.lk.Lock() - _, _ = x.RandSrc.Read(out[:saltLen]) // salt - x.lk.Unlock() - // Obfuscate the payload - key := sha256.Sum256(append(x.Key, out[:saltLen]...)) - for i, c := range in { - out[i+saltLen] = c ^ key[i%sha256.Size] - } - return len(in) + saltLen -} diff --git a/pkg/transport/client.go b/pkg/transport/client.go index bfd0340..49ab83e 100644 --- a/pkg/transport/client.go +++ b/pkg/transport/client.go @@ -1,16 +1,8 @@ package transport import ( - "crypto/tls" - "fmt" "net" "time" - - "github.com/HyNetwork/hysteria/pkg/conns/faketcp" - "github.com/HyNetwork/hysteria/pkg/conns/udp" - "github.com/HyNetwork/hysteria/pkg/conns/wechat" - obfsPkg "github.com/HyNetwork/hysteria/pkg/obfs" - "github.com/lucas-clemente/quic-go" ) type ClientTransport struct { @@ -25,61 +17,6 @@ var DefaultClientTransport = &ClientTransport{ ResolvePreference: ResolvePreferenceDefault, } -func (ct *ClientTransport) quicPacketConn(proto string, server string, obfs obfsPkg.Obfuscator) (net.PacketConn, error) { - if len(proto) == 0 || proto == "udp" { - conn, err := net.ListenUDP("udp", nil) - if err != nil { - return nil, err - } - if obfs != nil { - oc := udp.NewObfsUDPConn(conn, obfs) - return oc, nil - } else { - return conn, nil - } - } else if proto == "wechat-video" { - conn, err := net.ListenUDP("udp", nil) - if err != nil { - return nil, err - } - if obfs == nil { - obfs = obfsPkg.NewDummyObfuscator() - } - return wechat.NewObfsWeChatUDPConn(conn, obfs), nil - } else if proto == "faketcp" { - var conn *faketcp.TCPConn - conn, err := faketcp.Dial("tcp", server) - if err != nil { - return nil, err - } - if obfs != nil { - oc := faketcp.NewObfsFakeTCPConn(conn, obfs) - return oc, nil - } else { - return conn, nil - } - } else { - return nil, fmt.Errorf("unsupported protocol: %s", proto) - } -} - -func (ct *ClientTransport) QUICDial(proto string, server string, tlsConfig *tls.Config, quicConfig *quic.Config, obfs obfsPkg.Obfuscator) (quic.Connection, error) { - serverUDPAddr, err := net.ResolveUDPAddr("udp", server) - if err != nil { - return nil, err - } - pktConn, err := ct.quicPacketConn(proto, server, obfs) - if err != nil { - return nil, err - } - qs, err := quic.Dial(pktConn, serverUDPAddr, server, tlsConfig, quicConfig) - if err != nil { - _ = pktConn.Close() - return nil, err - } - return qs, nil -} - func (ct *ClientTransport) ResolveIPAddr(address string) (*net.IPAddr, error) { return resolveIPAddrWithPreference(address, ct.ResolvePreference) } diff --git a/pkg/conns/faketcp/LICENSE b/pkg/transport/pktconns/faketcp/LICENSE similarity index 100% rename from pkg/conns/faketcp/LICENSE rename to pkg/transport/pktconns/faketcp/LICENSE diff --git a/pkg/conns/faketcp/obfs.go b/pkg/transport/pktconns/faketcp/obfs.go similarity index 60% rename from pkg/conns/faketcp/obfs.go rename to pkg/transport/pktconns/faketcp/obfs.go index cc5cbf8..2b44ebf 100644 --- a/pkg/conns/faketcp/obfs.go +++ b/pkg/transport/pktconns/faketcp/obfs.go @@ -6,12 +6,12 @@ import ( "syscall" "time" - "github.com/HyNetwork/hysteria/pkg/obfs" + "github.com/HyNetwork/hysteria/pkg/transport/pktconns/obfs" ) const udpBufferSize = 65535 -type ObfsFakeTCPConn struct { +type ObfsFakeTCPPacketConn struct { orig *TCPConn obfs obfs.Obfuscator @@ -21,8 +21,8 @@ type ObfsFakeTCPConn struct { writeMutex sync.Mutex } -func NewObfsFakeTCPConn(orig *TCPConn, obfs obfs.Obfuscator) *ObfsFakeTCPConn { - return &ObfsFakeTCPConn{ +func NewObfsFakeTCPConn(orig *TCPConn, obfs obfs.Obfuscator) *ObfsFakeTCPPacketConn { + return &ObfsFakeTCPPacketConn{ orig: orig, obfs: obfs, readBuf: make([]byte, udpBufferSize), @@ -30,7 +30,7 @@ func NewObfsFakeTCPConn(orig *TCPConn, obfs obfs.Obfuscator) *ObfsFakeTCPConn { } } -func (c *ObfsFakeTCPConn) ReadFrom(p []byte) (int, net.Addr, error) { +func (c *ObfsFakeTCPPacketConn) ReadFrom(p []byte) (int, net.Addr, error) { for { c.readMutex.Lock() n, addr, err := c.orig.ReadFrom(c.readBuf) @@ -50,7 +50,7 @@ func (c *ObfsFakeTCPConn) ReadFrom(p []byte) (int, net.Addr, error) { } } -func (c *ObfsFakeTCPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { +func (c *ObfsFakeTCPPacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { c.writeMutex.Lock() bn := c.obfs.Obfuscate(p, c.writeBuf) _, err = c.orig.WriteTo(c.writeBuf[:bn], addr) @@ -62,34 +62,34 @@ func (c *ObfsFakeTCPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { } } -func (c *ObfsFakeTCPConn) Close() error { +func (c *ObfsFakeTCPPacketConn) Close() error { return c.orig.Close() } -func (c *ObfsFakeTCPConn) LocalAddr() net.Addr { +func (c *ObfsFakeTCPPacketConn) LocalAddr() net.Addr { return c.orig.LocalAddr() } -func (c *ObfsFakeTCPConn) SetDeadline(t time.Time) error { +func (c *ObfsFakeTCPPacketConn) SetDeadline(t time.Time) error { return c.orig.SetDeadline(t) } -func (c *ObfsFakeTCPConn) SetReadDeadline(t time.Time) error { +func (c *ObfsFakeTCPPacketConn) SetReadDeadline(t time.Time) error { return c.orig.SetReadDeadline(t) } -func (c *ObfsFakeTCPConn) SetWriteDeadline(t time.Time) error { +func (c *ObfsFakeTCPPacketConn) SetWriteDeadline(t time.Time) error { return c.orig.SetWriteDeadline(t) } -func (c *ObfsFakeTCPConn) SetReadBuffer(bytes int) error { +func (c *ObfsFakeTCPPacketConn) SetReadBuffer(bytes int) error { return c.orig.SetReadBuffer(bytes) } -func (c *ObfsFakeTCPConn) SetWriteBuffer(bytes int) error { +func (c *ObfsFakeTCPPacketConn) SetWriteBuffer(bytes int) error { return c.orig.SetWriteBuffer(bytes) } -func (c *ObfsFakeTCPConn) SyscallConn() (syscall.RawConn, error) { +func (c *ObfsFakeTCPPacketConn) SyscallConn() (syscall.RawConn, error) { return c.orig.SyscallConn() } diff --git a/pkg/conns/faketcp/tcp_linux.go b/pkg/transport/pktconns/faketcp/tcp_linux.go similarity index 100% rename from pkg/conns/faketcp/tcp_linux.go rename to pkg/transport/pktconns/faketcp/tcp_linux.go diff --git a/pkg/conns/faketcp/tcp_stub.go b/pkg/transport/pktconns/faketcp/tcp_stub.go similarity index 100% rename from pkg/conns/faketcp/tcp_stub.go rename to pkg/transport/pktconns/faketcp/tcp_stub.go diff --git a/pkg/conns/faketcp/tcp_test.go b/pkg/transport/pktconns/faketcp/tcp_test.go similarity index 100% rename from pkg/conns/faketcp/tcp_test.go rename to pkg/transport/pktconns/faketcp/tcp_test.go diff --git a/pkg/transport/pktconns/func.go b/pkg/transport/pktconns/func.go new file mode 100644 index 0000000..5b9d07c --- /dev/null +++ b/pkg/transport/pktconns/func.go @@ -0,0 +1,146 @@ +package pktconns + +import ( + "net" + + "github.com/HyNetwork/hysteria/pkg/transport/pktconns/faketcp" + "github.com/HyNetwork/hysteria/pkg/transport/pktconns/obfs" + "github.com/HyNetwork/hysteria/pkg/transport/pktconns/udp" + "github.com/HyNetwork/hysteria/pkg/transport/pktconns/wechat" +) + +type ( + ClientPacketConnFunc func(server string) (net.PacketConn, error) + ServerPacketConnFunc func(listen string) (net.PacketConn, error) +) + +type ( + ClientPacketConnFuncFactory func(obfsPassword string) ClientPacketConnFunc + ServerPacketConnFuncFactory func(obfsPassword string) ServerPacketConnFunc +) + +func NewClientUDPConnFunc(obfsPassword string) ClientPacketConnFunc { + if obfsPassword == "" { + return func(server string) (net.PacketConn, error) { + return net.ListenUDP("udp", nil) + } + } else { + return func(server string) (net.PacketConn, error) { + obfs := obfs.NewXPlusObfuscator([]byte(obfsPassword)) + udpConn, err := net.ListenUDP("udp", nil) + if err != nil { + return nil, err + } + return udp.NewObfsUDPConn(udpConn, obfs), nil + } + } +} + +func NewClientWeChatConnFunc(obfsPassword string) ClientPacketConnFunc { + if obfsPassword == "" { + return func(server string) (net.PacketConn, error) { + udpConn, err := net.ListenUDP("udp", nil) + if err != nil { + return nil, err + } + return wechat.NewObfsWeChatUDPConn(udpConn, nil), nil + } + } else { + return func(server string) (net.PacketConn, error) { + obfs := obfs.NewXPlusObfuscator([]byte(obfsPassword)) + udpConn, err := net.ListenUDP("udp", nil) + if err != nil { + return nil, err + } + return wechat.NewObfsWeChatUDPConn(udpConn, obfs), nil + } + } +} + +func NewClientFakeTCPConnFunc(obfsPassword string) ClientPacketConnFunc { + if obfsPassword == "" { + return func(server string) (net.PacketConn, error) { + return faketcp.Dial("tcp", server) + } + } else { + return func(server string) (net.PacketConn, error) { + obfs := obfs.NewXPlusObfuscator([]byte(obfsPassword)) + fakeTCPConn, err := faketcp.Dial("tcp", server) + if err != nil { + return nil, err + } + return faketcp.NewObfsFakeTCPConn(fakeTCPConn, obfs), nil + } + } +} + +func NewServerUDPConnFunc(obfsPassword string) ServerPacketConnFunc { + if obfsPassword == "" { + return func(listen string) (net.PacketConn, error) { + laddrU, err := net.ResolveUDPAddr("udp", listen) + if err != nil { + return nil, err + } + return net.ListenUDP("udp", laddrU) + } + } else { + return func(listen string) (net.PacketConn, error) { + obfs := obfs.NewXPlusObfuscator([]byte(obfsPassword)) + laddrU, err := net.ResolveUDPAddr("udp", listen) + if err != nil { + return nil, err + } + udpConn, err := net.ListenUDP("udp", laddrU) + if err != nil { + return nil, err + } + return udp.NewObfsUDPConn(udpConn, obfs), nil + } + } +} + +func NewServerWeChatConnFunc(obfsPassword string) ServerPacketConnFunc { + if obfsPassword == "" { + return func(listen string) (net.PacketConn, error) { + laddrU, err := net.ResolveUDPAddr("udp", listen) + if err != nil { + return nil, err + } + udpConn, err := net.ListenUDP("udp", laddrU) + if err != nil { + return nil, err + } + return wechat.NewObfsWeChatUDPConn(udpConn, nil), nil + } + } else { + return func(listen string) (net.PacketConn, error) { + obfs := obfs.NewXPlusObfuscator([]byte(obfsPassword)) + laddrU, err := net.ResolveUDPAddr("udp", listen) + if err != nil { + return nil, err + } + udpConn, err := net.ListenUDP("udp", laddrU) + if err != nil { + return nil, err + } + return wechat.NewObfsWeChatUDPConn(udpConn, obfs), nil + } + } +} + +func NewServerFakeTCPConnFunc(obfsPassword string) ServerPacketConnFunc { + if obfsPassword == "" { + return func(listen string) (net.PacketConn, error) { + return faketcp.Listen("tcp", listen) + } + } else { + return func(listen string) (net.PacketConn, error) { + obfs := obfs.NewXPlusObfuscator([]byte(obfsPassword)) + fakeTCPListener, err := faketcp.Listen("tcp", listen) + if err != nil { + return nil, err + } + return faketcp.NewObfsFakeTCPConn(fakeTCPListener, obfs), nil + } + } +} diff --git a/pkg/transport/pktconns/obfs/obfs.go b/pkg/transport/pktconns/obfs/obfs.go new file mode 100644 index 0000000..2829560 --- /dev/null +++ b/pkg/transport/pktconns/obfs/obfs.go @@ -0,0 +1,58 @@ +package obfs + +import ( + "crypto/sha256" + "math/rand" + "sync" + "time" +) + +type Obfuscator interface { + Deobfuscate(in []byte, out []byte) int + Obfuscate(in []byte, out []byte) int +} + +const xpSaltLen = 16 + +// XPlusObfuscator obfuscates payload using one-time keys generated from hashing a pre-shared key and random salt. +// Packet format: [salt][obfuscated payload] +type XPlusObfuscator struct { + Key []byte + RandSrc *rand.Rand + + lk sync.Mutex +} + +func NewXPlusObfuscator(key []byte) *XPlusObfuscator { + return &XPlusObfuscator{ + Key: key, + RandSrc: rand.New(rand.NewSource(time.Now().UnixNano())), + } +} + +func (x *XPlusObfuscator) Deobfuscate(in []byte, out []byte) int { + outLen := len(in) - xpSaltLen + if outLen <= 0 || len(out) < outLen { + return 0 + } + key := sha256.Sum256(append(x.Key, in[:xpSaltLen]...)) + for i, c := range in[xpSaltLen:] { + out[i] = c ^ key[i%sha256.Size] + } + return outLen +} + +func (x *XPlusObfuscator) Obfuscate(in []byte, out []byte) int { + outLen := len(in) + xpSaltLen + if len(out) < outLen { + return 0 + } + x.lk.Lock() + _, _ = x.RandSrc.Read(out[:xpSaltLen]) + x.lk.Unlock() + key := sha256.Sum256(append(x.Key, out[:xpSaltLen]...)) + for i, c := range in { + out[i+xpSaltLen] = c ^ key[i%sha256.Size] + } + return outLen +} diff --git a/pkg/obfs/xplus_test.go b/pkg/transport/pktconns/obfs/obfs_test.go similarity index 100% rename from pkg/obfs/xplus_test.go rename to pkg/transport/pktconns/obfs/obfs_test.go diff --git a/pkg/conns/udp/obfs.go b/pkg/transport/pktconns/udp/obfs.go similarity index 60% rename from pkg/conns/udp/obfs.go rename to pkg/transport/pktconns/udp/obfs.go index 03bfc2c..073f786 100644 --- a/pkg/conns/udp/obfs.go +++ b/pkg/transport/pktconns/udp/obfs.go @@ -7,12 +7,12 @@ import ( "syscall" "time" - "github.com/HyNetwork/hysteria/pkg/obfs" + "github.com/HyNetwork/hysteria/pkg/transport/pktconns/obfs" ) const udpBufferSize = 65535 -type ObfsUDPConn struct { +type ObfsUDPPacketConn struct { orig *net.UDPConn obfs obfs.Obfuscator @@ -22,8 +22,8 @@ type ObfsUDPConn struct { writeMutex sync.Mutex } -func NewObfsUDPConn(orig *net.UDPConn, obfs obfs.Obfuscator) *ObfsUDPConn { - return &ObfsUDPConn{ +func NewObfsUDPConn(orig *net.UDPConn, obfs obfs.Obfuscator) *ObfsUDPPacketConn { + return &ObfsUDPPacketConn{ orig: orig, obfs: obfs, readBuf: make([]byte, udpBufferSize), @@ -31,7 +31,7 @@ func NewObfsUDPConn(orig *net.UDPConn, obfs obfs.Obfuscator) *ObfsUDPConn { } } -func (c *ObfsUDPConn) ReadFrom(p []byte) (int, net.Addr, error) { +func (c *ObfsUDPPacketConn) ReadFrom(p []byte) (int, net.Addr, error) { for { c.readMutex.Lock() n, addr, err := c.orig.ReadFrom(c.readBuf) @@ -51,7 +51,7 @@ func (c *ObfsUDPConn) ReadFrom(p []byte) (int, net.Addr, error) { } } -func (c *ObfsUDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { +func (c *ObfsUDPPacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { c.writeMutex.Lock() bn := c.obfs.Obfuscate(p, c.writeBuf) _, err = c.orig.WriteTo(c.writeBuf[:bn], addr) @@ -63,38 +63,38 @@ func (c *ObfsUDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { } } -func (c *ObfsUDPConn) Close() error { +func (c *ObfsUDPPacketConn) Close() error { return c.orig.Close() } -func (c *ObfsUDPConn) LocalAddr() net.Addr { +func (c *ObfsUDPPacketConn) LocalAddr() net.Addr { return c.orig.LocalAddr() } -func (c *ObfsUDPConn) SetDeadline(t time.Time) error { +func (c *ObfsUDPPacketConn) SetDeadline(t time.Time) error { return c.orig.SetDeadline(t) } -func (c *ObfsUDPConn) SetReadDeadline(t time.Time) error { +func (c *ObfsUDPPacketConn) SetReadDeadline(t time.Time) error { return c.orig.SetReadDeadline(t) } -func (c *ObfsUDPConn) SetWriteDeadline(t time.Time) error { +func (c *ObfsUDPPacketConn) SetWriteDeadline(t time.Time) error { return c.orig.SetWriteDeadline(t) } -func (c *ObfsUDPConn) SetReadBuffer(bytes int) error { +func (c *ObfsUDPPacketConn) SetReadBuffer(bytes int) error { return c.orig.SetReadBuffer(bytes) } -func (c *ObfsUDPConn) SetWriteBuffer(bytes int) error { +func (c *ObfsUDPPacketConn) SetWriteBuffer(bytes int) error { return c.orig.SetWriteBuffer(bytes) } -func (c *ObfsUDPConn) SyscallConn() (syscall.RawConn, error) { +func (c *ObfsUDPPacketConn) SyscallConn() (syscall.RawConn, error) { return c.orig.SyscallConn() } -func (c *ObfsUDPConn) File() (f *os.File, err error) { +func (c *ObfsUDPPacketConn) File() (f *os.File, err error) { return c.orig.File() } diff --git a/pkg/conns/wechat/obfs.go b/pkg/transport/pktconns/wechat/obfs.go similarity index 54% rename from pkg/conns/wechat/obfs.go rename to pkg/transport/pktconns/wechat/obfs.go index 99aa4a8..6e13696 100644 --- a/pkg/conns/wechat/obfs.go +++ b/pkg/transport/pktconns/wechat/obfs.go @@ -9,12 +9,14 @@ import ( "syscall" "time" - "github.com/HyNetwork/hysteria/pkg/obfs" + "github.com/HyNetwork/hysteria/pkg/transport/pktconns/obfs" ) const udpBufferSize = 65535 -type ObfsWeChatUDPConn struct { +// ObfsWeChatUDPPacketConn is still a UDP packet conn, but it adds WeChat video call header to each packet. +// Obfs in this case can be nil +type ObfsWeChatUDPPacketConn struct { orig *net.UDPConn obfs obfs.Obfuscator @@ -25,8 +27,8 @@ type ObfsWeChatUDPConn struct { sn uint32 } -func NewObfsWeChatUDPConn(orig *net.UDPConn, obfs obfs.Obfuscator) *ObfsWeChatUDPConn { - return &ObfsWeChatUDPConn{ +func NewObfsWeChatUDPConn(orig *net.UDPConn, obfs obfs.Obfuscator) *ObfsWeChatUDPPacketConn { + return &ObfsWeChatUDPPacketConn{ orig: orig, obfs: obfs, readBuf: make([]byte, udpBufferSize), @@ -35,7 +37,7 @@ func NewObfsWeChatUDPConn(orig *net.UDPConn, obfs obfs.Obfuscator) *ObfsWeChatUD } } -func (c *ObfsWeChatUDPConn) ReadFrom(p []byte) (int, net.Addr, error) { +func (c *ObfsWeChatUDPPacketConn) ReadFrom(p []byte) (int, net.Addr, error) { for { c.readMutex.Lock() n, addr, err := c.orig.ReadFrom(c.readBuf) @@ -43,7 +45,12 @@ func (c *ObfsWeChatUDPConn) ReadFrom(p []byte) (int, net.Addr, error) { c.readMutex.Unlock() return 0, addr, err } - newN := c.obfs.Deobfuscate(c.readBuf[13:n], p) + var newN int + if c.obfs != nil { + newN = c.obfs.Deobfuscate(c.readBuf[13:n], p) + } else { + newN = copy(p, c.readBuf[13:n]) + } c.readMutex.Unlock() if newN > 0 { // Valid packet @@ -55,7 +62,7 @@ func (c *ObfsWeChatUDPConn) ReadFrom(p []byte) (int, net.Addr, error) { } } -func (c *ObfsWeChatUDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { +func (c *ObfsWeChatUDPPacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { c.writeMutex.Lock() c.writeBuf[0] = 0xa1 c.writeBuf[1] = 0x08 @@ -68,7 +75,12 @@ func (c *ObfsWeChatUDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) c.writeBuf[10] = 0x30 c.writeBuf[11] = 0x22 c.writeBuf[12] = 0x30 - bn := c.obfs.Obfuscate(p, c.writeBuf[13:]) + var bn int + if c.obfs != nil { + bn = c.obfs.Obfuscate(p, c.writeBuf[13:]) + } else { + bn = copy(c.writeBuf[13:], p) + } _, err = c.orig.WriteTo(c.writeBuf[:13+bn], addr) c.writeMutex.Unlock() if err != nil { @@ -78,38 +90,38 @@ func (c *ObfsWeChatUDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) } } -func (c *ObfsWeChatUDPConn) Close() error { +func (c *ObfsWeChatUDPPacketConn) Close() error { return c.orig.Close() } -func (c *ObfsWeChatUDPConn) LocalAddr() net.Addr { +func (c *ObfsWeChatUDPPacketConn) LocalAddr() net.Addr { return c.orig.LocalAddr() } -func (c *ObfsWeChatUDPConn) SetDeadline(t time.Time) error { +func (c *ObfsWeChatUDPPacketConn) SetDeadline(t time.Time) error { return c.orig.SetDeadline(t) } -func (c *ObfsWeChatUDPConn) SetReadDeadline(t time.Time) error { +func (c *ObfsWeChatUDPPacketConn) SetReadDeadline(t time.Time) error { return c.orig.SetReadDeadline(t) } -func (c *ObfsWeChatUDPConn) SetWriteDeadline(t time.Time) error { +func (c *ObfsWeChatUDPPacketConn) SetWriteDeadline(t time.Time) error { return c.orig.SetWriteDeadline(t) } -func (c *ObfsWeChatUDPConn) SetReadBuffer(bytes int) error { +func (c *ObfsWeChatUDPPacketConn) SetReadBuffer(bytes int) error { return c.orig.SetReadBuffer(bytes) } -func (c *ObfsWeChatUDPConn) SetWriteBuffer(bytes int) error { +func (c *ObfsWeChatUDPPacketConn) SetWriteBuffer(bytes int) error { return c.orig.SetWriteBuffer(bytes) } -func (c *ObfsWeChatUDPConn) SyscallConn() (syscall.RawConn, error) { +func (c *ObfsWeChatUDPPacketConn) SyscallConn() (syscall.RawConn, error) { return c.orig.SyscallConn() } -func (c *ObfsWeChatUDPConn) File() (f *os.File, err error) { +func (c *ObfsWeChatUDPPacketConn) File() (f *os.File, err error) { return c.orig.File() } diff --git a/pkg/transport/server.go b/pkg/transport/server.go index 24b3ffa..f0ffe00 100644 --- a/pkg/transport/server.go +++ b/pkg/transport/server.go @@ -1,19 +1,12 @@ package transport import ( - "crypto/tls" - "fmt" "net" "strconv" "time" - "github.com/HyNetwork/hysteria/pkg/conns/faketcp" - "github.com/HyNetwork/hysteria/pkg/conns/udp" - "github.com/HyNetwork/hysteria/pkg/conns/wechat" - obfsPkg "github.com/HyNetwork/hysteria/pkg/obfs" "github.com/HyNetwork/hysteria/pkg/sockopt" "github.com/HyNetwork/hysteria/pkg/utils" - "github.com/lucas-clemente/quic-go" ) type ServerTransport struct { @@ -76,64 +69,6 @@ var DefaultServerTransport = &ServerTransport{ ResolvePreference: ResolvePreferenceDefault, } -func (st *ServerTransport) quicPacketConn(proto string, laddr string, obfs obfsPkg.Obfuscator) (net.PacketConn, error) { - if len(proto) == 0 || proto == "udp" { - laddrU, err := net.ResolveUDPAddr("udp", laddr) - if err != nil { - return nil, err - } - conn, err := net.ListenUDP("udp", laddrU) - if err != nil { - return nil, err - } - if obfs != nil { - oc := udp.NewObfsUDPConn(conn, obfs) - return oc, nil - } else { - return conn, nil - } - } else if proto == "wechat-video" { - laddrU, err := net.ResolveUDPAddr("udp", laddr) - if err != nil { - return nil, err - } - conn, err := net.ListenUDP("udp", laddrU) - if err != nil { - return nil, err - } - if obfs == nil { - obfs = obfsPkg.NewDummyObfuscator() - } - return wechat.NewObfsWeChatUDPConn(conn, obfs), nil - } else if proto == "faketcp" { - conn, err := faketcp.Listen("tcp", laddr) - if err != nil { - return nil, err - } - if obfs != nil { - oc := faketcp.NewObfsFakeTCPConn(conn, obfs) - return oc, nil - } else { - return conn, nil - } - } else { - return nil, fmt.Errorf("unsupported protocol: %s", proto) - } -} - -func (st *ServerTransport) QUICListen(proto string, listen string, tlsConfig *tls.Config, quicConfig *quic.Config, obfs obfsPkg.Obfuscator) (quic.Listener, error) { - pktConn, err := st.quicPacketConn(proto, listen, obfs) - if err != nil { - return nil, err - } - l, err := quic.Listen(pktConn, tlsConfig, quicConfig) - if err != nil { - _ = pktConn.Close() - return nil, err - } - return l, nil -} - func (st *ServerTransport) ResolveIPAddr(address string) (*net.IPAddr, bool, error) { ip, zone := utils.ParseIPZone(address) if ip != nil {