From a4da230517768a46743f916d8d8ac1ff6c1d33bf Mon Sep 17 00:00:00 2001 From: Toby Date: Tue, 1 Nov 2022 16:09:38 -0700 Subject: [PATCH] wip feat: experimental udp port hopping support --- cmd/client.go | 1 + cmd/server.go | 1 + pkg/transport/pktconns/funcs.go | 28 ++++ pkg/transport/pktconns/udphop/client.go | 200 ++++++++++++++++++++++++ pkg/transport/pktconns/udphop/parse.go | 29 ++++ pkg/transport/pktconns/udphop/server.go | 188 ++++++++++++++++++++++ 6 files changed, 447 insertions(+) create mode 100644 pkg/transport/pktconns/udphop/client.go create mode 100644 pkg/transport/pktconns/udphop/parse.go create mode 100644 pkg/transport/pktconns/udphop/server.go diff --git a/cmd/client.go b/cmd/client.go index 67b23c2..d81635a 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -32,6 +32,7 @@ import ( var clientPacketConnFuncFactoryMap = map[string]pktconns.ClientPacketConnFuncFactory{ "": pktconns.NewClientUDPConnFunc, "udp": pktconns.NewClientUDPConnFunc, + "udp-hop": pktconns.NewClientUDPHopConnFunc, "wechat": pktconns.NewClientWeChatConnFunc, "wechat-video": pktconns.NewClientWeChatConnFunc, "faketcp": pktconns.NewClientFakeTCPConnFunc, diff --git a/cmd/server.go b/cmd/server.go index 9e3c51f..defbe50 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -26,6 +26,7 @@ import ( var serverPacketConnFuncFactoryMap = map[string]pktconns.ServerPacketConnFuncFactory{ "": pktconns.NewServerUDPConnFunc, "udp": pktconns.NewServerUDPConnFunc, + "udp-hop": pktconns.NewServerUDPHopConnFunc, "wechat": pktconns.NewServerWeChatConnFunc, "wechat-video": pktconns.NewServerWeChatConnFunc, "faketcp": pktconns.NewServerFakeTCPConnFunc, diff --git a/pkg/transport/pktconns/funcs.go b/pkg/transport/pktconns/funcs.go index a7b55bc..23355e5 100644 --- a/pkg/transport/pktconns/funcs.go +++ b/pkg/transport/pktconns/funcs.go @@ -3,6 +3,8 @@ package pktconns import ( "net" + "github.com/HyNetwork/hysteria/pkg/transport/pktconns/udphop" + "github.com/HyNetwork/hysteria/pkg/transport/pktconns/faketcp" "github.com/HyNetwork/hysteria/pkg/transport/pktconns/obfs" "github.com/HyNetwork/hysteria/pkg/transport/pktconns/udp" @@ -45,6 +47,19 @@ func NewClientUDPConnFunc(obfsPassword string) ClientPacketConnFunc { } } +func NewClientUDPHopConnFunc(obfsPassword string) ClientPacketConnFunc { + if obfsPassword == "" { + return func(server string) (net.PacketConn, net.Addr, error) { + return udphop.NewObfsUDPHopClientPacketConn(server, nil) + } + } else { + return func(server string) (net.PacketConn, net.Addr, error) { + ob := obfs.NewXPlusObfuscator([]byte(obfsPassword)) + return udphop.NewObfsUDPHopClientPacketConn(server, ob) + } + } +} + func NewClientWeChatConnFunc(obfsPassword string) ClientPacketConnFunc { if obfsPassword == "" { return func(server string) (net.PacketConn, net.Addr, error) { @@ -125,6 +140,19 @@ func NewServerUDPConnFunc(obfsPassword string) ServerPacketConnFunc { } } +func NewServerUDPHopConnFunc(obfsPassword string) ServerPacketConnFunc { + if obfsPassword == "" { + return func(listen string) (net.PacketConn, error) { + return udphop.NewObfsUDPHopServerPacketConn(listen, nil) + } + } else { + return func(listen string) (net.PacketConn, error) { + ob := obfs.NewXPlusObfuscator([]byte(obfsPassword)) + return udphop.NewObfsUDPHopServerPacketConn(listen, ob) + } + } +} + func NewServerWeChatConnFunc(obfsPassword string) ServerPacketConnFunc { if obfsPassword == "" { return func(listen string) (net.PacketConn, error) { diff --git a/pkg/transport/pktconns/udphop/client.go b/pkg/transport/pktconns/udphop/client.go new file mode 100644 index 0000000..ee048d1 --- /dev/null +++ b/pkg/transport/pktconns/udphop/client.go @@ -0,0 +1,200 @@ +package udphop + +import ( + "log" + "math/rand" + "net" + "sync" + "time" + + "github.com/HyNetwork/hysteria/pkg/transport/pktconns/obfs" + "github.com/HyNetwork/hysteria/pkg/transport/pktconns/udp" +) + +const ( + portHoppingInterval = 30 * time.Second +) + +// ObfsUDPHopClientPacketConn is the UDP port-hopping packet connection for client side. +// It hops to a different local & server port every once in a while (portHoppingInterval). +type ObfsUDPHopClientPacketConn struct { + serverAddr net.Addr // Combined udpHopAddr + serverAddrs []net.Addr + + obfs obfs.Obfuscator + + connMutex sync.RWMutex + prevConn net.PacketConn + currentConn net.PacketConn + addrIndex int + + recvQueue chan *udpPacket + closeChan chan struct{} + + bufPool sync.Pool +} + +func NewObfsUDPHopClientPacketConn(server string, obfs obfs.Obfuscator) (*ObfsUDPHopClientPacketConn, net.Addr, error) { + host, ports, err := parseAddr(server) + if err != nil { + return nil, nil, err + } + // Resolve the server IP address, then attach the ports to UDP addresses + ip, err := net.ResolveIPAddr("ip", host) + if err != nil { + return nil, nil, err + } + serverAddrs := make([]net.Addr, len(ports)) + for i, port := range ports { + serverAddrs[i] = &net.UDPAddr{ + IP: ip.IP, + Port: int(port), + } + log.Printf("udphop: server address %s", serverAddrs[i]) + } + conn := &ObfsUDPHopClientPacketConn{ + serverAddr: &udpHopAddr{server}, + serverAddrs: serverAddrs, + obfs: obfs, + addrIndex: rand.Intn(len(serverAddrs)), + recvQueue: make(chan *udpPacket, packetQueueSize), + closeChan: make(chan struct{}), + bufPool: sync.Pool{ + New: func() interface{} { + return make([]byte, udpBufferSize) + }, + }, + } + curConn, err := net.ListenUDP("udp", nil) + if err != nil { + return nil, nil, err + } + if obfs != nil { + conn.currentConn = udp.NewObfsUDPConn(curConn, obfs) + } else { + conn.currentConn = curConn + } + go conn.recvRoutine(conn.currentConn) + go conn.hopRoutine() + return conn, conn.serverAddr, nil +} + +func (c *ObfsUDPHopClientPacketConn) recvRoutine(conn net.PacketConn) { + for { + buf := c.bufPool.Get().([]byte) + n, addr, err := conn.ReadFrom(buf) + if err != nil { + log.Printf("udphop: read error (local %s): %v", conn.LocalAddr(), err) + return + } + select { + case c.recvQueue <- &udpPacket{buf, n, addr}: + default: + log.Printf("udphop: recv queue full, dropping packet from %s", addr) + c.bufPool.Put(buf) + } + } +} + +func (c *ObfsUDPHopClientPacketConn) hopRoutine() { + ticker := time.NewTicker(portHoppingInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + c.hop() + case <-c.closeChan: + return + } + } +} + +func (c *ObfsUDPHopClientPacketConn) hop() { + c.connMutex.Lock() + defer c.connMutex.Unlock() + if c.prevConn != nil { + _ = c.prevConn.Close() // recvRoutine will exit on error + } + // We need to keep receiving packets from the previous connection, + // or there will be packet loss because there might be packets + // still in flight sent to the old port. + c.prevConn = c.currentConn + c.addrIndex = rand.Intn(len(c.serverAddrs)) + conn, err := net.ListenUDP("udp", nil) + if err != nil { + log.Printf("udphop: failed to listen on %s: %v", conn.LocalAddr(), err) + return + } + if c.obfs != nil { + c.currentConn = udp.NewObfsUDPConn(conn, c.obfs) + } else { + c.currentConn = conn + } + go c.recvRoutine(c.currentConn) + log.Printf("udphop: hopping to %s", c.serverAddrs[c.addrIndex]) +} + +func (c *ObfsUDPHopClientPacketConn) ReadFrom(b []byte) (int, net.Addr, error) { + for { + select { + case p := <-c.recvQueue: + // Check if the packet is from one of the server addresses + for _, addr := range c.serverAddrs { + if addr.String() == p.addr.String() { + // Copy the packet to the buffer + n := copy(b, p.buf[:p.n]) + c.bufPool.Put(p.buf) + return n, c.serverAddr, nil + } + } + // Drop the packet, continue + c.bufPool.Put(p.buf) + case <-c.closeChan: + return 0, nil, net.ErrClosed + } + // Ignore packets from other addresses + } +} + +func (c *ObfsUDPHopClientPacketConn) WriteTo(b []byte, addr net.Addr) (int, error) { + c.connMutex.RLock() + defer c.connMutex.RUnlock() + // Check if the address is the server address + if addr.String() != c.serverAddr.String() { + log.Printf("udphop: invalid write address %s", addr) + return 0, net.ErrWriteToConnected + } + return c.currentConn.WriteTo(b, c.serverAddrs[c.addrIndex]) +} + +func (c *ObfsUDPHopClientPacketConn) Close() error { + c.connMutex.Lock() + defer c.connMutex.Unlock() + if c.prevConn != nil { + _ = c.prevConn.Close() + } + err := c.currentConn.Close() + close(c.closeChan) + return err +} + +func (c *ObfsUDPHopClientPacketConn) LocalAddr() net.Addr { + c.connMutex.RLock() + defer c.connMutex.RUnlock() + return c.currentConn.LocalAddr() +} + +func (c *ObfsUDPHopClientPacketConn) SetDeadline(t time.Time) error { + // Not implemented + return nil +} + +func (c *ObfsUDPHopClientPacketConn) SetReadDeadline(t time.Time) error { + // Not implemented + return nil +} + +func (c *ObfsUDPHopClientPacketConn) SetWriteDeadline(t time.Time) error { + // Not implemented + return nil +} diff --git a/pkg/transport/pktconns/udphop/parse.go b/pkg/transport/pktconns/udphop/parse.go new file mode 100644 index 0000000..386a4d5 --- /dev/null +++ b/pkg/transport/pktconns/udphop/parse.go @@ -0,0 +1,29 @@ +package udphop + +import ( + "net" + "strconv" + "strings" +) + +// parseAddr parses the listen address and returns the host and ports. +// Format: "host:port1,port2,port3,..." +func parseAddr(addr string) (host string, ports []uint16, err error) { + host, portStr, err := net.SplitHostPort(addr) + if err != nil { + return + } + portsStr := strings.Split(portStr, ",") + if len(portsStr) < 2 { + return "", nil, net.InvalidAddrError("at least two ports required") + } + ports = make([]uint16, len(portsStr)) + for i, p := range portsStr { + port, err := strconv.ParseUint(p, 10, 16) + if err != nil { + return "", nil, net.InvalidAddrError("invalid port: " + p) + } + ports[i] = uint16(port) + } + return +} diff --git a/pkg/transport/pktconns/udphop/server.go b/pkg/transport/pktconns/udphop/server.go new file mode 100644 index 0000000..11b95cf --- /dev/null +++ b/pkg/transport/pktconns/udphop/server.go @@ -0,0 +1,188 @@ +package udphop + +import ( + "log" + "net" + "strconv" + "sync" + "time" + + "github.com/HyNetwork/hysteria/pkg/transport/pktconns/obfs" + "github.com/HyNetwork/hysteria/pkg/transport/pktconns/udp" +) + +const ( + packetQueueSize = 1024 + udpBufferSize = 2048 + addrMapEntryTTL = time.Minute +) + +// ObfsUDPHopServerPacketConn is the UDP port-hopping packet connection for server side. +// It listens on multiple UDP ports and replies to a client using the port it received packet from. +type ObfsUDPHopServerPacketConn struct { + localAddr net.Addr + conns []net.PacketConn + + recvQueue chan *udpPacket + closeChan chan struct{} + + addrMapMutex sync.RWMutex + addrMap map[string]addrMapEntry + + bufPool sync.Pool +} + +type udpPacket struct { + buf []byte + n int + addr net.Addr +} + +type addrMapEntry struct { + index int + last time.Time +} + +type udpHopAddr struct { + listen string +} + +func (a *udpHopAddr) Network() string { + return "udp-hop" +} + +func (a *udpHopAddr) String() string { + return a.listen +} + +func NewObfsUDPHopServerPacketConn(listen string, obfs obfs.Obfuscator) (*ObfsUDPHopServerPacketConn, error) { + host, ports, err := parseAddr(listen) + if err != nil { + return nil, err + } + conns := make([]net.PacketConn, len(ports)) + for i, port := range ports { + addr := net.JoinHostPort(host, strconv.FormatUint(uint64(port), 10)) + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, err + } + conn, err := net.ListenUDP("udp", udpAddr) + if err != nil { + return nil, err + } + if obfs != nil { + conns[i] = udp.NewObfsUDPConn(conn, obfs) + } else { + conns[i] = conn + } + } + c := &ObfsUDPHopServerPacketConn{ + localAddr: &udpHopAddr{listen}, + conns: conns, + recvQueue: make(chan *udpPacket, packetQueueSize), + closeChan: make(chan struct{}), + addrMap: make(map[string]addrMapEntry), + bufPool: sync.Pool{ + New: func() interface{} { + return make([]byte, udpBufferSize) + }, + }, + } + c.startRecvRoutines() + go c.addrMapCleanupRoutine() + return c, nil +} + +func (c *ObfsUDPHopServerPacketConn) startRecvRoutines() { + for i, conn := range c.conns { + go c.recvRoutine(i, conn) + } +} + +func (c *ObfsUDPHopServerPacketConn) recvRoutine(i int, conn net.PacketConn) { + log.Printf("udphop: receiving on %s", conn.LocalAddr()) + for { + buf := c.bufPool.Get().([]byte) + n, addr, err := conn.ReadFrom(buf) + if err != nil { + log.Printf("udphop: routine %d read error: %v", i, err) + return + } + select { + case c.recvQueue <- &udpPacket{buf, n, addr}: + // Update addrMap + c.addrMapMutex.Lock() + c.addrMap[addr.String()] = addrMapEntry{i, time.Now()} + c.addrMapMutex.Unlock() + default: + log.Printf("udphop: recv queue full, dropping packet from %s", addr) + c.bufPool.Put(buf) + } + } +} + +func (c *ObfsUDPHopServerPacketConn) addrMapCleanupRoutine() { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + c.addrMapMutex.Lock() + for addr, entry := range c.addrMap { + if time.Since(entry.last) > addrMapEntryTTL { + delete(c.addrMap, addr) + } + } + c.addrMapMutex.Unlock() + case <-c.closeChan: + return + } + } +} + +func (c *ObfsUDPHopServerPacketConn) ReadFrom(b []byte) (int, net.Addr, error) { + select { + case p := <-c.recvQueue: + n := copy(b, p.buf[:p.n]) + c.bufPool.Put(p.buf) + return n, p.addr, nil + case <-c.closeChan: + return 0, nil, net.ErrClosed + } +} + +func (c *ObfsUDPHopServerPacketConn) WriteTo(b []byte, addr net.Addr) (int, error) { + // Find index from addrMap + c.addrMapMutex.RLock() + entry := c.addrMap[addr.String()] + c.addrMapMutex.RUnlock() + return c.conns[entry.index].WriteTo(b, addr) +} + +func (c *ObfsUDPHopServerPacketConn) Close() error { + for _, conn := range c.conns { + _ = conn.Close() // recvRoutines will exit on error + } + close(c.closeChan) + return nil +} + +func (c *ObfsUDPHopServerPacketConn) LocalAddr() net.Addr { + return c.localAddr +} + +func (c *ObfsUDPHopServerPacketConn) SetDeadline(t time.Time) error { + // Not implemented + return nil +} + +func (c *ObfsUDPHopServerPacketConn) SetReadDeadline(t time.Time) error { + // Not implemented + return nil +} + +func (c *ObfsUDPHopServerPacketConn) SetWriteDeadline(t time.Time) error { + // Not implemented + return nil +}