diff --git a/cmd/client.go b/cmd/client.go index 533f1dd..d35f3b3 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -102,7 +102,7 @@ func client(config *clientConfig) { "protocol": config.Protocol, }).Fatal("Unsupported protocol") } - pktConnFunc := pktConnFuncFactory(config.Obfs) + pktConnFunc := pktConnFuncFactory(config.Obfs, time.Duration(config.HopInterval)*time.Second) // Resolve preference if len(config.ResolvePreference) > 0 { pref, err := transport.ResolvePreferenceFromString(config.ResolvePreference) diff --git a/cmd/config.go b/cmd/config.go index 78f1461..ad6f1b5 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -24,6 +24,8 @@ const ( ServerMaxIdleTimeoutSec = 60 DefaultClientIdleTimeoutSec = 20 + + DefaultClientHopIntervalSec = 10 ) var rateStringRegexp = regexp.MustCompile(`^(\d+)\s*([KMGT]?)([Bb])ps$`) @@ -168,6 +170,7 @@ type clientConfig struct { QuitOnDisconnect bool `json:"quit_on_disconnect"` HandshakeTimeout int `json:"handshake_timeout"` IdleTimeout int `json:"idle_timeout"` + HopInterval int `json:"hop_interval"` SOCKS5 struct { Listen string `json:"listen"` Timeout int `json:"timeout"` @@ -258,6 +261,9 @@ func (c *clientConfig) Check() error { if c.IdleTimeout != 0 && c.IdleTimeout < 4 { return errors.New("invalid idle timeout") } + if c.HopInterval != 0 && c.HopInterval < 8 { + return errors.New("invalid hop interval") + } if c.SOCKS5.Timeout != 0 && c.SOCKS5.Timeout < 4 { return errors.New("invalid SOCKS5 timeout") } @@ -333,6 +339,9 @@ func (c *clientConfig) Fill() { if c.IdleTimeout == 0 { c.IdleTimeout = DefaultClientIdleTimeoutSec } + if c.HopInterval == 0 { + c.HopInterval = DefaultClientHopIntervalSec + } } func (c *clientConfig) String() string { diff --git a/go.mod b/go.mod index b249b1e..b2d13e3 100644 --- a/go.mod +++ b/go.mod @@ -89,6 +89,6 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -replace github.com/lucas-clemente/quic-go => github.com/HyNetwork/quic-go v0.30.1-0.20221031062428-2bee5c4b1bf8 +replace github.com/lucas-clemente/quic-go => github.com/HyNetwork/quic-go v0.30.1-0.20221105180419-83715d7269a8 replace github.com/LiamHaworth/go-tproxy => github.com/HyNetwork/go-tproxy v0.0.0-20221025153553-ed04a2935f88 diff --git a/go.sum b/go.sum index 38e97ec..842463e 100644 --- a/go.sum +++ b/go.sum @@ -76,8 +76,8 @@ github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3 github.com/Dreamacro/go-shadowsocks2 v0.1.7/go.mod h1:8p5G4cAj5ZlXwUR+Ww63gfSikr8kvw8uw3TDwLAJpUc= github.com/HyNetwork/go-tproxy v0.0.0-20221025153553-ed04a2935f88 h1:gZbrSMb8ojmSGxC0C4L3gFwgYFf1xvAnU2gCyDmsbSA= github.com/HyNetwork/go-tproxy v0.0.0-20221025153553-ed04a2935f88/go.mod h1:u7+cv3PYlgsz2jDM/qi/zl17zi03OO78Bhe5nlLOrVc= -github.com/HyNetwork/quic-go v0.30.1-0.20221031062428-2bee5c4b1bf8 h1:r9GrPFoBINn+vKK91P8TGg+Z1N7yhZtK68Kpkg4rdyQ= -github.com/HyNetwork/quic-go v0.30.1-0.20221031062428-2bee5c4b1bf8/go.mod h1:ssOrRsOmdxa768Wr78vnh2B8JozgLsMzG/g+0qEC7uk= +github.com/HyNetwork/quic-go v0.30.1-0.20221105180419-83715d7269a8 h1:FBo40lMrk1bZZzJRJx8U+bQUPhLDGTUJ/Q5NV5BbO4Q= +github.com/HyNetwork/quic-go v0.30.1-0.20221105180419-83715d7269a8/go.mod h1:ssOrRsOmdxa768Wr78vnh2B8JozgLsMzG/g+0qEC7uk= github.com/Microsoft/go-winio v0.4.16-0.20201130162521-d1ffc52c7331/go.mod h1:XB6nPKklQyQ7GC9LdcBEcBl8PF76WugXOPRXwdLnMv0= github.com/Microsoft/go-winio v0.5.1/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/hcsshim v0.8.14/go.mod h1:NtVKoYxQuTLx6gEq0L96c9Ju4JbRJ4nY2ow3VK6a9Lg= diff --git a/pkg/core/client.go b/pkg/core/client.go index 397bd7c..7b16440 100644 --- a/pkg/core/client.go +++ b/pkg/core/client.go @@ -26,7 +26,6 @@ var ErrClosed = errors.New("closed") type Client struct { serverAddr string - serverName string // QUIC SNI sendBPS, recvBPS uint64 auth []byte @@ -52,18 +51,8 @@ func NewClient(serverAddr string, auth []byte, tlsConfig *tls.Config, quicConfig pktConnFunc pktconns.ClientPacketConnFunc, sendBPS uint64, recvBPS uint64, quicReconnectFunc func(err error), ) (*Client, error) { quicConfig.DisablePathMTUDiscovery = quicConfig.DisablePathMTUDiscovery || pmtud.DisablePathMTUDiscovery - // QUIC wants server name, but our serverAddr is usually host:port, - // so we try to extract it from serverAddr. - serverName, _, err := net.SplitHostPort(serverAddr) - if err != nil { - // It's possible that we have some weird serverAddr combined with weird PacketConn implementation, - // that doesn't follow the standard host:port format. So it's ok if we run into error here. - // Server name should be set in tlsConfig in that case. - serverName = "" - } c := &Client{ serverAddr: serverAddr, - serverName: serverName, sendBPS: sendBPS, recvBPS: recvBPS, auth: auth, @@ -92,7 +81,7 @@ func (c *Client) connect() error { return err } // Dial QUIC - quicConn, err := quic.Dial(pktConn, sAddr, c.serverName, c.tlsConfig, c.quicConfig) + quicConn, err := quic.Dial(pktConn, sAddr, c.serverAddr, c.tlsConfig, c.quicConfig) if err != nil { _ = pktConn.Close() return err diff --git a/pkg/core/server_client.go b/pkg/core/server_client.go index 244d465..6f9639c 100644 --- a/pkg/core/server_client.go +++ b/pkg/core/server_client.go @@ -17,7 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -const udpBufferSize = 65535 +const udpBufferSize = 4096 type serverClient struct { CC quic.Connection diff --git a/pkg/relay/udp.go b/pkg/relay/udp.go index c6b34ce..f02d227 100644 --- a/pkg/relay/udp.go +++ b/pkg/relay/udp.go @@ -10,7 +10,7 @@ import ( "github.com/HyNetwork/hysteria/pkg/core" ) -const udpBufferSize = 65535 +const udpBufferSize = 4096 var ErrTimeout = errors.New("inactivity timeout") diff --git a/pkg/socks5/server.go b/pkg/socks5/server.go index 4cbd4dd..95c8d10 100644 --- a/pkg/socks5/server.go +++ b/pkg/socks5/server.go @@ -19,7 +19,7 @@ import ( "github.com/txthinking/socks5" ) -const udpBufferSize = 65535 +const udpBufferSize = 4096 var ( ErrUnsupportedCmd = errors.New("unsupported command") diff --git a/pkg/tproxy/udp_linux.go b/pkg/tproxy/udp_linux.go index 0b684c8..1c07f49 100644 --- a/pkg/tproxy/udp_linux.go +++ b/pkg/tproxy/udp_linux.go @@ -8,7 +8,7 @@ import ( "github.com/LiamHaworth/go-tproxy" ) -const udpBufferSize = 65535 +const udpBufferSize = 4096 type UDPTProxy struct { HyClient *core.Client diff --git a/pkg/transport/pktconns/faketcp/obfs.go b/pkg/transport/pktconns/faketcp/obfs.go index 2b44ebf..608503e 100644 --- a/pkg/transport/pktconns/faketcp/obfs.go +++ b/pkg/transport/pktconns/faketcp/obfs.go @@ -9,7 +9,7 @@ import ( "github.com/HyNetwork/hysteria/pkg/transport/pktconns/obfs" ) -const udpBufferSize = 65535 +const udpBufferSize = 4096 type ObfsFakeTCPPacketConn struct { orig *TCPConn diff --git a/pkg/transport/pktconns/funcs.go b/pkg/transport/pktconns/funcs.go index a7b55bc..0a30597 100644 --- a/pkg/transport/pktconns/funcs.go +++ b/pkg/transport/pktconns/funcs.go @@ -2,6 +2,8 @@ package pktconns import ( "net" + "strings" + "time" "github.com/HyNetwork/hysteria/pkg/transport/pktconns/faketcp" "github.com/HyNetwork/hysteria/pkg/transport/pktconns/obfs" @@ -15,13 +17,16 @@ type ( ) type ( - ClientPacketConnFuncFactory func(obfsPassword string) ClientPacketConnFunc + ClientPacketConnFuncFactory func(obfsPassword string, hopInterval time.Duration) ClientPacketConnFunc ServerPacketConnFuncFactory func(obfsPassword string) ServerPacketConnFunc ) -func NewClientUDPConnFunc(obfsPassword string) ClientPacketConnFunc { +func NewClientUDPConnFunc(obfsPassword string, hopInterval time.Duration) ClientPacketConnFunc { if obfsPassword == "" { return func(server string) (net.PacketConn, net.Addr, error) { + if isMultiPortAddr(server) { + return udp.NewObfsUDPHopClientPacketConn(server, hopInterval, nil) + } sAddr, err := net.ResolveUDPAddr("udp", server) if err != nil { return nil, nil, err @@ -31,6 +36,10 @@ func NewClientUDPConnFunc(obfsPassword string) ClientPacketConnFunc { } } else { return func(server string) (net.PacketConn, net.Addr, error) { + if isMultiPortAddr(server) { + ob := obfs.NewXPlusObfuscator([]byte(obfsPassword)) + return udp.NewObfsUDPHopClientPacketConn(server, hopInterval, ob) + } sAddr, err := net.ResolveUDPAddr("udp", server) if err != nil { return nil, nil, err @@ -45,7 +54,7 @@ func NewClientUDPConnFunc(obfsPassword string) ClientPacketConnFunc { } } -func NewClientWeChatConnFunc(obfsPassword string) ClientPacketConnFunc { +func NewClientWeChatConnFunc(obfsPassword string, hopInterval time.Duration) ClientPacketConnFunc { if obfsPassword == "" { return func(server string) (net.PacketConn, net.Addr, error) { sAddr, err := net.ResolveUDPAddr("udp", server) @@ -74,7 +83,7 @@ func NewClientWeChatConnFunc(obfsPassword string) ClientPacketConnFunc { } } -func NewClientFakeTCPConnFunc(obfsPassword string) ClientPacketConnFunc { +func NewClientFakeTCPConnFunc(obfsPassword string, hopInterval time.Duration) ClientPacketConnFunc { if obfsPassword == "" { return func(server string) (net.PacketConn, net.Addr, error) { sAddr, err := net.ResolveTCPAddr("tcp", server) @@ -170,3 +179,11 @@ func NewServerFakeTCPConnFunc(obfsPassword string) ServerPacketConnFunc { } } } + +func isMultiPortAddr(addr string) bool { + _, portStr, err := net.SplitHostPort(addr) + if err == nil && (strings.Contains(portStr, ",") || strings.Contains(portStr, "-")) { + return true + } + return false +} diff --git a/pkg/transport/pktconns/udp/hop.go b/pkg/transport/pktconns/udp/hop.go new file mode 100644 index 0000000..7e3d0cd --- /dev/null +++ b/pkg/transport/pktconns/udp/hop.go @@ -0,0 +1,287 @@ +package udp + +import ( + "math/rand" + "net" + "strconv" + "strings" + "sync" + "time" + + "github.com/HyNetwork/hysteria/pkg/transport/pktconns/obfs" +) + +const ( + packetQueueSize = 1024 +) + +// ObfsUDPHopClientPacketConn is the UDP port-hopping packet connection for client side. +// It hops to a different local & server port every once in a while. +type ObfsUDPHopClientPacketConn struct { + serverAddr net.Addr // Combined udpHopAddr + serverAddrs []net.Addr + hopInterval time.Duration + + obfs obfs.Obfuscator + + connMutex sync.RWMutex + prevConn net.PacketConn + currentConn net.PacketConn + addrIndex int + + recvQueue chan *udpPacket + closeChan chan struct{} + closed bool + + bufPool sync.Pool +} + +type udpHopAddr string + +func (a *udpHopAddr) Network() string { + return "udp-hop" +} + +func (a *udpHopAddr) String() string { + return string(*a) +} + +type udpPacket struct { + buf []byte + n int + addr net.Addr +} + +func NewObfsUDPHopClientPacketConn(server string, hopInterval time.Duration, obfs obfs.Obfuscator) (*ObfsUDPHopClientPacketConn, net.Addr, error) { + host, ports, err := parseAddr(server) + if err != nil { + return nil, nil, err + } + // Resolve the server IP address, then attach the ports to UDP addresses + ip, err := net.ResolveIPAddr("ip", host) + if err != nil { + return nil, nil, err + } + serverAddrs := make([]net.Addr, len(ports)) + for i, port := range ports { + serverAddrs[i] = &net.UDPAddr{ + IP: ip.IP, + Port: int(port), + } + } + hopAddr := udpHopAddr(server) + conn := &ObfsUDPHopClientPacketConn{ + serverAddr: &hopAddr, + serverAddrs: serverAddrs, + hopInterval: hopInterval, + obfs: obfs, + addrIndex: rand.Intn(len(serverAddrs)), + recvQueue: make(chan *udpPacket, packetQueueSize), + closeChan: make(chan struct{}), + bufPool: sync.Pool{ + New: func() interface{} { + return make([]byte, udpBufferSize) + }, + }, + } + curConn, err := net.ListenUDP("udp", nil) + if err != nil { + return nil, nil, err + } + if obfs != nil { + conn.currentConn = NewObfsUDPConn(curConn, obfs) + } else { + conn.currentConn = curConn + } + go conn.recvRoutine(conn.currentConn) + go conn.hopRoutine() + return conn, conn.serverAddr, nil +} + +func (c *ObfsUDPHopClientPacketConn) recvRoutine(conn net.PacketConn) { + for { + buf := c.bufPool.Get().([]byte) + n, addr, err := conn.ReadFrom(buf) + if err != nil { + return + } + select { + case c.recvQueue <- &udpPacket{buf, n, addr}: + default: + // Drop the packet if the queue is full + c.bufPool.Put(buf) + } + } +} + +func (c *ObfsUDPHopClientPacketConn) hopRoutine() { + ticker := time.NewTicker(c.hopInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + c.hop() + case <-c.closeChan: + return + } + } +} + +func (c *ObfsUDPHopClientPacketConn) hop() { + c.connMutex.Lock() + defer c.connMutex.Unlock() + if c.closed { + return + } + newConn, err := net.ListenUDP("udp", nil) + if err != nil { + // Skip this hop if failed to listen + return + } + // Close prevConn, + // prevConn <- currentConn + // currentConn <- newConn + // update addrIndex + // + // We need to keep receiving packets from the previous connection, + // because otherwise there will be packet loss due to the time gap + // between we hop to a new port and the server acknowledges this change. + if c.prevConn != nil { + _ = c.prevConn.Close() // recvRoutine will exit on error + } + c.prevConn = c.currentConn + if c.obfs != nil { + c.currentConn = NewObfsUDPConn(newConn, c.obfs) + } else { + c.currentConn = newConn + } + go c.recvRoutine(c.currentConn) + c.addrIndex = rand.Intn(len(c.serverAddrs)) +} + +func (c *ObfsUDPHopClientPacketConn) ReadFrom(b []byte) (int, net.Addr, error) { + for { + select { + case p := <-c.recvQueue: + /* + // Check if the packet is from one of the server addresses + for _, addr := range c.serverAddrs { + if addr.String() == p.addr.String() { + // Copy the packet to the buffer + n := copy(b, p.buf[:p.n]) + c.bufPool.Put(p.buf) + return n, c.serverAddr, nil + } + } + // Drop the packet, continue + c.bufPool.Put(p.buf) + */ + // The above code was causing performance issues when the range is large, + // so we skip the check for now. Should probably still check by using a map + // or something in the future. + n := copy(b, p.buf[:p.n]) + c.bufPool.Put(p.buf) + return n, c.serverAddr, nil + case <-c.closeChan: + return 0, nil, net.ErrClosed + } + // Ignore packets from other addresses + } +} + +func (c *ObfsUDPHopClientPacketConn) WriteTo(b []byte, addr net.Addr) (int, error) { + c.connMutex.RLock() + defer c.connMutex.RUnlock() + /* + // Check if the address is the server address + if addr.String() != c.serverAddr.String() { + return 0, net.ErrWriteToConnected + } + */ + // Skip the check for now, always write to the server + return c.currentConn.WriteTo(b, c.serverAddrs[c.addrIndex]) +} + +func (c *ObfsUDPHopClientPacketConn) Close() error { + c.connMutex.Lock() + defer c.connMutex.Unlock() + if c.closed { + return nil + } + // Close prevConn and currentConn + // Close closeChan to unblock ReadFrom & hopRoutine + // Set closed flag to true to prevent double close + if c.prevConn != nil { + _ = c.prevConn.Close() + } + err := c.currentConn.Close() + close(c.closeChan) + c.closed = true + return err +} + +func (c *ObfsUDPHopClientPacketConn) LocalAddr() net.Addr { + c.connMutex.RLock() + defer c.connMutex.RUnlock() + return c.currentConn.LocalAddr() +} + +func (c *ObfsUDPHopClientPacketConn) SetReadDeadline(t time.Time) error { + // Not supported + return nil +} + +func (c *ObfsUDPHopClientPacketConn) SetWriteDeadline(t time.Time) error { + // Not supported + return nil +} + +func (c *ObfsUDPHopClientPacketConn) SetDeadline(t time.Time) error { + err := c.SetReadDeadline(t) + if err != nil { + return err + } + return c.SetWriteDeadline(t) +} + +// parseAddr parses the multi-port server address and returns the host and ports. +// Supports both comma-separated single ports and dash-separated port ranges. +// Format: "host:port1,port2-port3,port4" +func parseAddr(addr string) (host string, ports []uint16, err error) { + host, portStr, err := net.SplitHostPort(addr) + if err != nil { + return "", nil, err + } + portStrs := strings.Split(portStr, ",") + for _, portStr := range portStrs { + if strings.Contains(portStr, "-") { + // Port range + portRange := strings.Split(portStr, "-") + if len(portRange) != 2 { + return "", nil, net.InvalidAddrError("invalid port range") + } + start, err := strconv.ParseUint(portRange[0], 10, 16) + if err != nil { + return "", nil, net.InvalidAddrError("invalid port range") + } + end, err := strconv.ParseUint(portRange[1], 10, 16) + if err != nil { + return "", nil, net.InvalidAddrError("invalid port range") + } + if start > end { + start, end = end, start + } + for i := start; i <= end; i++ { + ports = append(ports, uint16(i)) + } + } else { + // Single port + port, err := strconv.ParseUint(portStr, 10, 16) + if err != nil { + return "", nil, net.InvalidAddrError("invalid port") + } + ports = append(ports, uint16(port)) + } + } + return host, ports, nil +} diff --git a/pkg/transport/pktconns/udp/hop_test.go b/pkg/transport/pktconns/udp/hop_test.go new file mode 100644 index 0000000..cacf5a5 --- /dev/null +++ b/pkg/transport/pktconns/udp/hop_test.go @@ -0,0 +1,102 @@ +package udp + +import ( + "reflect" + "testing" +) + +func Test_parseAddr(t *testing.T) { + tests := []struct { + name string + addr string + wantHost string + wantPorts []uint16 + wantErr bool + }{ + { + name: "empty", + addr: "", + wantHost: "", + wantPorts: nil, + wantErr: true, + }, + { + name: "host only", + addr: "example.com", + wantHost: "", + wantPorts: nil, + wantErr: true, + }, + { + name: "single port", + addr: "example.com:1234", + wantHost: "example.com", + wantPorts: []uint16{1234}, + wantErr: false, + }, + { + name: "multi ports", + addr: "example.com:1234,5678,9999", + wantHost: "example.com", + wantPorts: []uint16{1234, 5678, 9999}, + wantErr: false, + }, + { + name: "multi ports with range", + addr: "example.com:1234,5678-5685,9999", + wantHost: "example.com", + wantPorts: []uint16{1234, 5678, 5679, 5680, 5681, 5682, 5683, 5684, 5685, 9999}, + wantErr: false, + }, + { + name: "range single port", + addr: "example.com:1234-1234", + wantHost: "example.com", + wantPorts: []uint16{1234}, + wantErr: false, + }, + { + name: "range reversed", + addr: "example.com:8003-8000", + wantHost: "example.com", + wantPorts: []uint16{8000, 8001, 8002, 8003}, + wantErr: false, + }, + { + name: "invalid port", + addr: "example.com:1234,5678,9999,invalid", + wantHost: "", + wantPorts: nil, + wantErr: true, + }, + { + name: "invalid port range", + addr: "example.com:1234,5678,9999,8000-8002-8004", + wantHost: "", + wantPorts: nil, + wantErr: true, + }, + { + name: "invalid port range 2", + addr: "example.com:1234,5678,9999,8000-woot", + wantHost: "", + wantPorts: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotHost, gotPorts, err := parseAddr(tt.addr) + if (err != nil) != tt.wantErr { + t.Errorf("parseAddr() error = %v, wantErr %v", err, tt.wantErr) + return + } + if gotHost != tt.wantHost { + t.Errorf("parseAddr() gotHost = %v, want %v", gotHost, tt.wantHost) + } + if !reflect.DeepEqual(gotPorts, tt.wantPorts) { + t.Errorf("parseAddr() gotPorts = %v, want %v", gotPorts, tt.wantPorts) + } + }) + } +} diff --git a/pkg/transport/pktconns/udp/obfs.go b/pkg/transport/pktconns/udp/obfs.go index 073f786..d985e40 100644 --- a/pkg/transport/pktconns/udp/obfs.go +++ b/pkg/transport/pktconns/udp/obfs.go @@ -10,7 +10,7 @@ import ( "github.com/HyNetwork/hysteria/pkg/transport/pktconns/obfs" ) -const udpBufferSize = 65535 +const udpBufferSize = 4096 type ObfsUDPPacketConn struct { orig *net.UDPConn diff --git a/pkg/transport/pktconns/wechat/obfs.go b/pkg/transport/pktconns/wechat/obfs.go index 6e13696..2714b20 100644 --- a/pkg/transport/pktconns/wechat/obfs.go +++ b/pkg/transport/pktconns/wechat/obfs.go @@ -12,7 +12,7 @@ import ( "github.com/HyNetwork/hysteria/pkg/transport/pktconns/obfs" ) -const udpBufferSize = 65535 +const udpBufferSize = 4096 // ObfsWeChatUDPPacketConn is still a UDP packet conn, but it adds WeChat video call header to each packet. // Obfs in this case can be nil diff --git a/pkg/tun/udp.go b/pkg/tun/udp.go index 8bfb030..f824699 100644 --- a/pkg/tun/udp.go +++ b/pkg/tun/udp.go @@ -13,7 +13,7 @@ import ( "github.com/xjasonlyu/tun2socks/v2/core/adapter" ) -const udpBufferSize = 65535 +const udpBufferSize = 4096 func (s *Server) HandleUDP(conn adapter.UDPConn) { go s.handleUDPConn(conn) diff --git a/pkg/utils/pipe.go b/pkg/utils/pipe.go index 828b205..55c95ee 100644 --- a/pkg/utils/pipe.go +++ b/pkg/utils/pipe.go @@ -6,7 +6,7 @@ import ( "time" ) -const PipeBufferSize = 65535 +const PipeBufferSize = 32 * 1024 func Pipe(src, dst io.ReadWriter, count func(int)) error { buf := make([]byte, PipeBufferSize)