From 565d6593388ae01269802057cb7cf8e7db8b1739 Mon Sep 17 00:00:00 2001 From: Toby Date: Fri, 5 Feb 2021 01:00:44 -0800 Subject: [PATCH] Relay & better logging --- cmd/client.go | 97 +++++++++++++++++++------------------ cmd/server.go | 50 +++++++++++++++---- pkg/core/server.go | 29 +++++++---- pkg/http/server.go | 2 +- pkg/relay/relay.go | 112 +++++++++++++++++++++++++++++++++++++++++++ pkg/socks5/server.go | 97 +++++++++++++++++++------------------ 6 files changed, 270 insertions(+), 117 deletions(-) create mode 100644 pkg/relay/relay.go diff --git a/cmd/client.go b/cmd/client.go index ff6afa7..8a14f7e 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -11,7 +11,9 @@ import ( "github.com/tobyxdd/hysteria/pkg/core" hyHTTP "github.com/tobyxdd/hysteria/pkg/http" "github.com/tobyxdd/hysteria/pkg/obfs" + "github.com/tobyxdd/hysteria/pkg/relay" "github.com/tobyxdd/hysteria/pkg/socks5" + "io" "io/ioutil" "net" "net/http" @@ -101,46 +103,28 @@ func client(config *clientConfig) { return config.SOCKS5.User == user && config.SOCKS5.Password == password } } - socks5server, err := socks5.NewServer(client, config.SOCKS5.Listen, authFunc, config.SOCKS5.Timeout, aclEngine, - config.SOCKS5.DisableUDP, + socks5server, err := socks5.NewServer(client, config.SOCKS5.Listen, authFunc, + time.Duration(config.SOCKS5.Timeout)*time.Second, aclEngine, config.SOCKS5.DisableUDP, func(addr net.Addr, reqAddr string, action acl.Action, arg string) { logrus.WithFields(logrus.Fields{ "action": actionToString(action, arg), "src": addr.String(), "dst": reqAddr, - }).Debug("New SOCKS5 TCP request") + }).Debug("SOCKS5 TCP request") }, func(addr net.Addr, reqAddr string, err error) { - logrus.WithFields(logrus.Fields{ - "error": err, - "src": addr.String(), - "dst": reqAddr, - }).Debug("SOCKS5 TCP request closed") - }, - func(addr net.Addr) { - logrus.WithFields(logrus.Fields{ - "src": addr.String(), - }).Debug("New SOCKS5 UDP associate request") - }, - func(addr net.Addr, err error) { - logrus.WithFields(logrus.Fields{ - "error": err, - "src": addr.String(), - }).Debug("SOCKS5 UDP associate request closed") - }, - func(addr net.Addr, reqAddr string, action acl.Action, arg string) { - logrus.WithFields(logrus.Fields{ - "action": actionToString(action, arg), - "src": addr.String(), - "dst": reqAddr, - }).Debug("New SOCKS5 UDP tunnel") - }, - func(addr net.Addr, reqAddr string, err error) { - logrus.WithFields(logrus.Fields{ - "error": err, - "src": addr.String(), - "dst": reqAddr, - }).Debug("SOCKS5 UDP tunnel closed") + if err != io.EOF { + logrus.WithFields(logrus.Fields{ + "error": err, + "src": addr.String(), + "dst": reqAddr, + }).Info("SOCKS5 TCP error") + } else { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + "dst": reqAddr, + }).Debug("SOCKS5 TCP EOF") + } }) if err != nil { logrus.WithField("error", err).Fatal("Failed to initialize SOCKS5 server") @@ -163,7 +147,7 @@ func client(config *clientConfig) { logrus.WithFields(logrus.Fields{ "action": actionToString(action, arg), "dst": reqAddr, - }).Debug("New HTTP request") + }).Debug("HTTP request") }, authFunc) if err != nil { @@ -179,21 +163,36 @@ func client(config *clientConfig) { }() } + if len(config.Relay.Listen) > 0 { + go func() { + rl, err := relay.NewRelay(client, config.Relay.Listen, config.Relay.Remote, + time.Duration(config.Relay.Timeout)*time.Second, + func(addr net.Addr) { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + }).Debug("TCP relay request") + }, + func(addr net.Addr, err error) { + if err != io.EOF { + logrus.WithFields(logrus.Fields{ + "error": err, + "src": addr.String(), + }).Info("TCP relay error") + } else { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + }).Debug("TCP relay EOF") + } + + }) + if err != nil { + logrus.WithField("error", err).Fatal("Failed to initialize TCP relay") + } + logrus.WithField("addr", config.Relay.Listen).Info("TCP relay up and running") + errChan <- rl.ListenAndServe() + }() + } + err = <-errChan logrus.WithField("error", err).Fatal("Client shutdown") } - -func actionToString(action acl.Action, arg string) string { - switch action { - case acl.ActionDirect: - return "Direct" - case acl.ActionProxy: - return "Proxy" - case acl.ActionBlock: - return "Block" - case acl.ActionHijack: - return "Hijack to " + arg - default: - return "Unknown" - } -} diff --git a/cmd/server.go b/cmd/server.go index dab89a7..206f15f 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -9,6 +9,7 @@ import ( hyCongestion "github.com/tobyxdd/hysteria/pkg/congestion" "github.com/tobyxdd/hysteria/pkg/core" "github.com/tobyxdd/hysteria/pkg/obfs" + "io" "net" "strings" ) @@ -78,16 +79,7 @@ func server(config *serverConfig) { uint64(config.UpMbps)*mbpsToBps, uint64(config.DownMbps)*mbpsToBps, func(refBPS uint64) congestion.CongestionControl { return hyCongestion.NewBrutalSender(congestion.ByteCount(refBPS)) - }, aclEngine, obfuscator, authFunc, func(addr net.Addr, auth []byte, udp bool, reqAddr string) { - if !udp { - logrus.WithFields(logrus.Fields{ - "src": addr.String(), - "dst": reqAddr, - }).Debug("New TCP request") - } else { - // TODO - } - }) + }, aclEngine, obfuscator, authFunc, tcpRequestFunc, tcpErrorFunc) if err != nil { logrus.WithField("error", err).Fatal("Failed to initialize server") } @@ -97,3 +89,41 @@ func server(config *serverConfig) { err = server.Serve() logrus.WithField("error", err).Fatal("Server shutdown") } + +func tcpRequestFunc(addr net.Addr, auth []byte, reqAddr string, action acl.Action, arg string) { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + "dst": reqAddr, + "action": actionToString(action, arg), + }).Debug("TCP request") +} + +func tcpErrorFunc(addr net.Addr, auth []byte, reqAddr string, err error) { + if err != io.EOF { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + "dst": reqAddr, + "error": err, + }).Info("TCP error") + } else { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + "dst": reqAddr, + }).Debug("TCP EOF") + } +} + +func actionToString(action acl.Action, arg string) string { + switch action { + case acl.ActionDirect: + return "Direct" + case acl.ActionProxy: + return "Proxy" + case acl.ActionBlock: + return "Block" + case acl.ActionHijack: + return "Hijack to " + arg + default: + return "Unknown" + } +} diff --git a/pkg/core/server.go b/pkg/core/server.go index da06b6f..49ed004 100644 --- a/pkg/core/server.go +++ b/pkg/core/server.go @@ -15,21 +15,24 @@ import ( const dialTimeout = 10 * time.Second type AuthFunc func(addr net.Addr, auth []byte, sSend uint64, sRecv uint64) (bool, string) -type RequestFunc func(addr net.Addr, auth []byte, udp bool, reqAddr string) +type TCPRequestFunc func(addr net.Addr, auth []byte, reqAddr string, action acl.Action, arg string) +type TCPErrorFunc func(addr net.Addr, auth []byte, reqAddr string, err error) type Server struct { sendBPS, recvBPS uint64 congestionFactory CongestionFactory - authFunc AuthFunc - requestFunc RequestFunc aclEngine *acl.Engine + authFunc AuthFunc + tcpRequestFunc TCPRequestFunc + tcpErrorFunc TCPErrorFunc + listener quic.Listener } func NewServer(addr string, tlsConfig *tls.Config, quicConfig *quic.Config, sendBPS uint64, recvBPS uint64, congestionFactory CongestionFactory, aclEngine *acl.Engine, - obfuscator Obfuscator, authFunc AuthFunc, requestFunc RequestFunc) (*Server, error) { + obfuscator Obfuscator, authFunc AuthFunc, tcpRequestFunc TCPRequestFunc, tcpErrorFunc TCPErrorFunc) (*Server, error) { packetConn, err := net.ListenPacket("udp", addr) if err != nil { return nil, err @@ -50,9 +53,10 @@ func NewServer(addr string, tlsConfig *tls.Config, quicConfig *quic.Config, sendBPS: sendBPS, recvBPS: recvBPS, congestionFactory: congestionFactory, - authFunc: authFunc, - requestFunc: requestFunc, aclEngine: aclEngine, + authFunc: authFunc, + tcpRequestFunc: tcpRequestFunc, + tcpErrorFunc: tcpErrorFunc, } return s, nil } @@ -148,23 +152,23 @@ func (s *Server) handleStream(remoteAddr net.Addr, auth []byte, stream quic.Stre if err != nil { return } - s.requestFunc(remoteAddr, auth, req.UDP, req.Address) if !req.UDP { // TCP connection - s.handleTCP(stream, req.Address) + s.handleTCP(remoteAddr, auth, stream, req.Address) } else { // UDP connection // TODO } } -func (s *Server) handleTCP(stream quic.Stream, reqAddr string) { +func (s *Server) handleTCP(remoteAddr net.Addr, auth []byte, stream quic.Stream, reqAddr string) { host, port, err := net.SplitHostPort(reqAddr) if err != nil { _ = struc.Pack(stream, &serverResponse{ OK: false, Message: "invalid address", }) + s.tcpErrorFunc(remoteAddr, auth, reqAddr, err) return } ip := net.ParseIP(host) @@ -176,6 +180,7 @@ func (s *Server) handleTCP(stream quic.Stream, reqAddr string) { if s.aclEngine != nil { action, arg = s.aclEngine.Lookup(host, ip) } + s.tcpRequestFunc(remoteAddr, auth, reqAddr, action, arg) var conn net.Conn // Connection to be piped switch action { @@ -186,6 +191,7 @@ func (s *Server) handleTCP(stream quic.Stream, reqAddr string) { OK: false, Message: err.Error(), }) + s.tcpErrorFunc(remoteAddr, auth, reqAddr, err) return } case acl.ActionBlock: @@ -202,6 +208,7 @@ func (s *Server) handleTCP(stream quic.Stream, reqAddr string) { OK: false, Message: err.Error(), }) + s.tcpErrorFunc(remoteAddr, auth, reqAddr, err) return } default: @@ -212,11 +219,13 @@ func (s *Server) handleTCP(stream quic.Stream, reqAddr string) { return } // So far so good if we reach here + defer conn.Close() err = struc.Pack(stream, &serverResponse{ OK: true, }) if err != nil { return } - _ = utils.Pipe2Way(stream, conn) + err = utils.Pipe2Way(stream, conn) + s.tcpErrorFunc(remoteAddr, auth, reqAddr, err) } diff --git a/pkg/http/server.go b/pkg/http/server.go index a9c0bbc..ae38639 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -52,7 +52,7 @@ func NewProxyHTTPServer(hyClient *core.Client, idleTimeout time.Duration, aclEng } }, IdleConnTimeout: idleTimeout, - // TODO: Disable HTTP2 support? ref: https://github.com/elazarl/goproxy/issues/361 + // Disable HTTP2 support? ref: https://github.com/elazarl/goproxy/issues/361 } proxy.ConnectDial = nil if basicAuthFunc != nil { diff --git a/pkg/relay/relay.go b/pkg/relay/relay.go new file mode 100644 index 0000000..ca89c28 --- /dev/null +++ b/pkg/relay/relay.go @@ -0,0 +1,112 @@ +package relay + +import ( + "github.com/tobyxdd/hysteria/pkg/core" + "github.com/tobyxdd/hysteria/pkg/utils" + "io" + "net" + "time" +) + +type Relay struct { + HyClient *core.Client + ListenAddr *net.TCPAddr + Remote string + Timeout time.Duration + + ConnFunc func(addr net.Addr) + ErrorFunc func(addr net.Addr, err error) + + tcpListener *net.TCPListener +} + +func NewRelay(hyClient *core.Client, listen, remote string, timeout time.Duration, + connFunc func(addr net.Addr), errorFunc func(addr net.Addr, err error)) (*Relay, error) { + tAddr, err := net.ResolveTCPAddr("tcp", listen) + if err != nil { + return nil, err + } + r := &Relay{ + HyClient: hyClient, + ListenAddr: tAddr, + Remote: remote, + Timeout: timeout, + ConnFunc: connFunc, + ErrorFunc: errorFunc, + } + return r, nil +} + +func (r *Relay) ListenAndServe() error { + var err error + r.tcpListener, err = net.ListenTCP("tcp", r.ListenAddr) + if err != nil { + return err + } + defer r.tcpListener.Close() + for { + c, err := r.tcpListener.AcceptTCP() + if err != nil { + return err + } + go func(c *net.TCPConn) { + defer c.Close() + r.ConnFunc(c.RemoteAddr()) + rc, err := r.HyClient.DialTCP(r.Remote) + if err != nil { + r.ErrorFunc(c.RemoteAddr(), err) + return + } + defer rc.Close() + err = pipePair(c, rc, r.Timeout) + r.ErrorFunc(c.RemoteAddr(), err) + }(c) + } +} + +func pipePair(conn *net.TCPConn, stream io.ReadWriteCloser, timeout time.Duration) error { + errChan := make(chan error, 2) + // TCP to stream + go func() { + buf := make([]byte, utils.PipeBufferSize) + for { + if timeout != 0 { + _ = conn.SetDeadline(time.Now().Add(timeout)) + } + rn, err := conn.Read(buf) + if rn > 0 { + _, err := stream.Write(buf[:rn]) + if err != nil { + errChan <- err + return + } + } + if err != nil { + errChan <- err + return + } + } + }() + // Stream to TCP + go func() { + buf := make([]byte, utils.PipeBufferSize) + for { + rn, err := stream.Read(buf) + if rn > 0 { + _, err := conn.Write(buf[:rn]) + if err != nil { + errChan <- err + return + } + if timeout != 0 { + _ = conn.SetDeadline(time.Now().Add(timeout)) + } + } + if err != nil { + errChan <- err + return + } + } + }() + return <-errChan +} diff --git a/pkg/socks5/server.go b/pkg/socks5/server.go index 603bdc4..094e766 100644 --- a/pkg/socks5/server.go +++ b/pkg/socks5/server.go @@ -23,34 +23,25 @@ var ( ) type Server struct { - HyClient *core.Client - AuthFunc func(username, password string) bool - Method byte - TCPAddr *net.TCPAddr - TCPDeadline int - ACLEngine *acl.Engine - DisableUDP bool + HyClient *core.Client + AuthFunc func(username, password string) bool + Method byte + TCPAddr *net.TCPAddr + TCPTimeout time.Duration + ACLEngine *acl.Engine + DisableUDP bool - NewRequestFunc func(addr net.Addr, reqAddr string, action acl.Action, arg string) - RequestClosedFunc func(addr net.Addr, reqAddr string, err error) - NewUDPAssociateFunc func(addr net.Addr) - UDPAssociateClosedFunc func(addr net.Addr, err error) - NewUDPTunnelFunc func(addr net.Addr, reqAddr string, action acl.Action, arg string) - UDPTunnelClosedFunc func(addr net.Addr, reqAddr string, err error) + TCPRequestFunc func(addr net.Addr, reqAddr string, action acl.Action, arg string) + TCPErrorFunc func(addr net.Addr, reqAddr string, err error) tcpListener *net.TCPListener } -func NewServer(hyClient *core.Client, addr string, authFunc func(username, password string) bool, tcpDeadline int, +func NewServer(hyClient *core.Client, addr string, authFunc func(username, password string) bool, tcpTimeout time.Duration, aclEngine *acl.Engine, disableUDP bool, - newReqFunc func(addr net.Addr, reqAddr string, action acl.Action, arg string), - reqClosedFunc func(addr net.Addr, reqAddr string, err error), - newUDPAssociateFunc func(addr net.Addr), - udpAssociateClosedFunc func(addr net.Addr, err error), - newUDPTunnelFunc func(addr net.Addr, reqAddr string, action acl.Action, arg string), - udpTunnelClosedFunc func(addr net.Addr, reqAddr string, err error)) (*Server, error) { - - taddr, err := net.ResolveTCPAddr("tcp", addr) + tcpReqFunc func(addr net.Addr, reqAddr string, action acl.Action, arg string), + tcpErrorFunc func(addr net.Addr, reqAddr string, err error)) (*Server, error) { + tAddr, err := net.ResolveTCPAddr("tcp", addr) if err != nil { return nil, err } @@ -59,19 +50,15 @@ func NewServer(hyClient *core.Client, addr string, authFunc func(username, passw m = socks5.MethodUsernamePassword } s := &Server{ - HyClient: hyClient, - AuthFunc: authFunc, - Method: m, - TCPAddr: taddr, - TCPDeadline: tcpDeadline, - ACLEngine: aclEngine, - DisableUDP: disableUDP, - NewRequestFunc: newReqFunc, - RequestClosedFunc: reqClosedFunc, - NewUDPAssociateFunc: newUDPAssociateFunc, - UDPAssociateClosedFunc: udpAssociateClosedFunc, - NewUDPTunnelFunc: newUDPTunnelFunc, - UDPTunnelClosedFunc: udpTunnelClosedFunc, + HyClient: hyClient, + AuthFunc: authFunc, + Method: m, + TCPAddr: tAddr, + TCPTimeout: tcpTimeout, + ACLEngine: aclEngine, + DisableUDP: disableUDP, + TCPRequestFunc: tcpReqFunc, + TCPErrorFunc: tcpErrorFunc, } return s, nil } @@ -133,8 +120,8 @@ func (s *Server) ListenAndServe() error { } go func(c *net.TCPConn) { defer c.Close() - if s.TCPDeadline != 0 { - if err := c.SetDeadline(time.Now().Add(time.Duration(s.TCPDeadline) * time.Second)); err != nil { + if s.TCPTimeout != 0 { + if err := c.SetDeadline(time.Now().Add(s.TCPTimeout)); err != nil { return } } @@ -170,10 +157,10 @@ func (s *Server) handleTCP(c *net.TCPConn, r *socks5.Request) error { if s.ACLEngine != nil { action, arg = s.ACLEngine.Lookup(domain, ip) } - s.NewRequestFunc(c.RemoteAddr(), addr, action, arg) + s.TCPRequestFunc(c.RemoteAddr(), addr, action, arg) var closeErr error defer func() { - s.RequestClosedFunc(c.RemoteAddr(), addr, closeErr) + s.TCPErrorFunc(c.RemoteAddr(), addr, closeErr) }() // Handle according to the action switch action { @@ -186,7 +173,7 @@ func (s *Server) handleTCP(c *net.TCPConn, r *socks5.Request) error { } defer rc.Close() _ = sendReply(c, socks5.RepSuccess) - closeErr = pipePair(c, rc, s.TCPDeadline) + closeErr = pipePair(c, rc, s.TCPTimeout) return nil case acl.ActionProxy: rc, err := s.HyClient.DialTCP(addr) @@ -197,7 +184,7 @@ func (s *Server) handleTCP(c *net.TCPConn, r *socks5.Request) error { } defer rc.Close() _ = sendReply(c, socks5.RepSuccess) - closeErr = pipePair(c, rc, s.TCPDeadline) + closeErr = pipePair(c, rc, s.TCPTimeout) return nil case acl.ActionBlock: _ = sendReply(c, socks5.RepHostUnreachable) @@ -212,7 +199,7 @@ func (s *Server) handleTCP(c *net.TCPConn, r *socks5.Request) error { } defer rc.Close() _ = sendReply(c, socks5.RepSuccess) - closeErr = pipePair(c, rc, s.TCPDeadline) + closeErr = pipePair(c, rc, s.TCPTimeout) return nil default: _ = sendReply(c, socks5.RepServerFailure) @@ -237,15 +224,14 @@ func parseRequestAddress(r *socks5.Request) (domain string, ip net.IP, port stri } } -func pipePair(conn *net.TCPConn, stream io.ReadWriteCloser, deadline int) error { - deadlineDuration := time.Duration(deadline) * time.Second +func pipePair(conn *net.TCPConn, stream io.ReadWriteCloser, timeout time.Duration) error { errChan := make(chan error, 2) // TCP to stream go func() { buf := make([]byte, utils.PipeBufferSize) for { - if deadline != 0 { - _ = conn.SetDeadline(time.Now().Add(deadlineDuration)) + if timeout != 0 { + _ = conn.SetDeadline(time.Now().Add(timeout)) } rn, err := conn.Read(buf) if rn > 0 { @@ -263,7 +249,24 @@ func pipePair(conn *net.TCPConn, stream io.ReadWriteCloser, deadline int) error }() // Stream to TCP go func() { - errChan <- utils.Pipe(stream, conn) + buf := make([]byte, utils.PipeBufferSize) + for { + rn, err := stream.Read(buf) + if rn > 0 { + _, err := conn.Write(buf[:rn]) + if err != nil { + errChan <- err + return + } + if timeout != 0 { + _ = conn.SetDeadline(time.Now().Add(timeout)) + } + } + if err != nil { + errChan <- err + return + } + } }() return <-errChan }