Merge pull request #12 from tobyxdd/wip-http

HTTP proxy implementation
This commit is contained in:
Toby 2020-05-13 20:46:27 -07:00 committed by GitHub
commit d9d119d7ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 252 additions and 80 deletions

View File

@ -31,6 +31,8 @@ Client:
```
This will start a SOCKS5 proxy server on the client's localhost TCP 1080 available for use by other programs.
In addition to SOCKS5, it also supports HTTP proxy (`-http-addr` & `-http-timeout`). Both modes can be turned on simultaneously on different ports.
`-up-mbps 10 -down-mbps 50` tells the server that your bandwidth is 50 Mbps down, 10 Mbps up. Properly setting the client's upload and download speeds based on your network conditions is essential for it to work at optimal performance!
### Relay
@ -84,6 +86,8 @@ The command line program supports loading configurations from both JSON files an
| --- | --- | --- |
| SOCKS5 listen address | socks5_addr | -socks5-addr |
| SOCKS5 connection timeout in seconds | socks5_timeout | -socks5-timeout |
| HTTP listen address | http_addr | -http-addr |
| HTTP connection timeout in seconds | http_timeout | -http-timeout |
| Access control list | acl | -acl |
| Server address | server | -server |
| Authentication username | username | -username |

View File

@ -29,6 +29,8 @@ Hysteria 是专门针对恶劣网络环境(常见于在中国访问海外服
```
在客户端的本地 TCP 1080 上启动一个 SOCKS5 代理服务器供其他程序使用。
除了 SOCKS5 还支持 HTTP 代理 (`-http-addr` & `-http-timeout`)。两个模式可以同时开在不同端口。
`-up-mbps 10 -down-mbps 50` 是告诉服务端你的下行速度为 50 Mbps, 上行 10 Mbps。根据实际网络条件正确设置客户端的上传和下载速度十分重要
### 转发
@ -82,6 +84,8 @@ Hysteria 是专门针对恶劣网络环境(常见于在中国访问海外服
| --- | --- | --- |
| SOCKS5 监听地址 | socks5_addr | -socks5-addr |
| SOCKS5 超时时间(秒) | socks5_timeout | -socks5-timeout |
| HTTP 监听地址 | http_addr | -http-addr |
| HTTP 超时时间(秒) | http_timeout | -http-timeout |
| ACL 规则文件 | acl | -acl |
| 服务端地址 | server | -server |
| 验证用户名 | username | -username |

View File

@ -8,11 +8,14 @@ import (
"github.com/tobyxdd/hysteria/pkg/acl"
hyCongestion "github.com/tobyxdd/hysteria/pkg/congestion"
"github.com/tobyxdd/hysteria/pkg/core"
hyHTTP "github.com/tobyxdd/hysteria/pkg/http"
"github.com/tobyxdd/hysteria/pkg/obfs"
"github.com/tobyxdd/hysteria/pkg/socks5"
"io/ioutil"
"log"
"net"
"net/http"
"time"
)
func proxyClient(args []string) {
@ -79,31 +82,53 @@ func proxyClient(args []string) {
defer client.Close()
log.Println("Connected to", config.ServerAddr)
socks5server, err := socks5.NewServer(client, config.SOCKS5Addr, nil, config.SOCKS5Timeout, aclEngine,
func(addr net.Addr, reqAddr string, action acl.Action, arg string) {
log.Printf("[TCP] [%s] %s <-> %s\n", actionToString(action, arg), addr.String(), reqAddr)
},
func(addr net.Addr, reqAddr string, err error) {
log.Printf("Closed [TCP] %s <-> %s: %s\n", addr.String(), reqAddr, err.Error())
},
func(addr net.Addr) {
log.Printf("[UDP] Associate %s\n", addr.String())
},
func(addr net.Addr, err error) {
log.Printf("Closed [UDP] Associate %s: %s\n", addr.String(), err.Error())
},
func(addr net.Addr, reqAddr string, action acl.Action, arg string) {
log.Printf("[UDP] [%s] %s <-> %s\n", actionToString(action, arg), addr.String(), reqAddr)
},
func(addr net.Addr, reqAddr string, err error) {
log.Printf("Closed [UDP] %s <-> %s: %s\n", addr.String(), reqAddr, err.Error())
})
if err != nil {
log.Fatalln("SOCKS5 server initialization failed:", err)
}
log.Println("SOCKS5 server up and running on", config.SOCKS5Addr)
errChan := make(chan error)
if len(config.SOCKS5Addr) > 0 {
go func() {
socks5server, err := socks5.NewServer(client, config.SOCKS5Addr, nil, config.SOCKS5Timeout, aclEngine,
func(addr net.Addr, reqAddr string, action acl.Action, arg string) {
log.Printf("[TCP] [%s] %s <-> %s\n", actionToString(action, arg), addr.String(), reqAddr)
},
func(addr net.Addr, reqAddr string, err error) {
log.Printf("Closed [TCP] %s <-> %s: %s\n", addr.String(), reqAddr, err.Error())
},
func(addr net.Addr) {
log.Printf("[UDP] Associate %s\n", addr.String())
},
func(addr net.Addr, err error) {
log.Printf("Closed [UDP] Associate %s: %s\n", addr.String(), err.Error())
},
func(addr net.Addr, reqAddr string, action acl.Action, arg string) {
log.Printf("[UDP] [%s] %s <-> %s\n", actionToString(action, arg), addr.String(), reqAddr)
},
func(addr net.Addr, reqAddr string, err error) {
log.Printf("Closed [UDP] %s <-> %s: %s\n", addr.String(), reqAddr, err.Error())
})
if err != nil {
log.Fatalln("SOCKS5 server initialization failed:", err)
}
log.Println("SOCKS5 server up and running on", config.SOCKS5Addr)
errChan <- socks5server.ListenAndServe()
}()
}
if len(config.HTTPAddr) > 0 {
go func() {
proxy, err := hyHTTP.NewProxyHTTPServer(client, time.Duration(config.HTTPTimeout)*time.Second, aclEngine,
func(reqAddr string, action acl.Action, arg string) {
log.Printf("[HTTP] [%s] %s\n", actionToString(action, arg), reqAddr)
})
if err != nil {
log.Fatalln("HTTP server initialization failed:", err)
}
log.Println("HTTP server up and running on", config.HTTPAddr)
errChan <- http.ListenAndServe(config.HTTPAddr, proxy)
}()
}
log.Fatalln(<-errChan)
log.Fatalln(socks5server.ListenAndServe())
}
func actionToString(action acl.Action, arg string) string {

View File

@ -7,6 +7,8 @@ const proxyTLSProtocol = "hysteria-proxy"
type proxyClientConfig struct {
SOCKS5Addr string `json:"socks5_addr" desc:"SOCKS5 listen address"`
SOCKS5Timeout int `json:"socks5_timeout" desc:"SOCKS5 connection timeout in seconds"`
HTTPAddr string `json:"http_addr" desc:"HTTP listen address"`
HTTPTimeout int `json:"http_timeout" desc:"HTTP connection timeout in seconds"`
ACLFile string `json:"acl" desc:"Access control list"`
ServerAddr string `json:"server" desc:"Server address"`
Username string `json:"username" desc:"Authentication username"`
@ -21,12 +23,15 @@ type proxyClientConfig struct {
}
func (c *proxyClientConfig) Check() error {
if len(c.SOCKS5Addr) == 0 {
return errors.New("no SOCKS5 listen address")
if len(c.SOCKS5Addr) == 0 && len(c.HTTPAddr) == 0 {
return errors.New("no SOCKS5 or HTTP listen address")
}
if c.SOCKS5Timeout != 0 && c.SOCKS5Timeout <= 4 {
return errors.New("invalid SOCKS5 timeout")
}
if c.HTTPTimeout != 0 && c.HTTPTimeout <= 4 {
return errors.New("invalid HTTP timeout")
}
if len(c.ServerAddr) == 0 {
return errors.New("no server address")
}

1
go.mod
View File

@ -5,6 +5,7 @@ go 1.14
require github.com/golang/protobuf v1.3.1
require (
github.com/elazarl/goproxy v0.0.0-20200426045556-49ad98f6dac1
github.com/hashicorp/golang-lru v0.5.4
github.com/lucas-clemente/quic-go v0.15.2
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect

5
go.sum
View File

@ -21,6 +21,10 @@ github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/elazarl/goproxy v0.0.0-20200426045556-49ad98f6dac1 h1:TEmChtx8+IeOghiySC8kQIr0JZOdKUmRmmkuRDuYs3E=
github.com/elazarl/goproxy v0.0.0-20200426045556-49ad98f6dac1/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM=
github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2 h1:dWB6v3RcOy03t/bUadywsbyrQwCqZeNIEX6M1OtSZOM=
github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk=
github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY=
@ -90,6 +94,7 @@ github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY=

View File

@ -7,7 +7,6 @@ import (
"fmt"
"github.com/lucas-clemente/quic-go"
"github.com/tobyxdd/hysteria/internal/utils"
"io"
"net"
"sync"
"sync/atomic"
@ -51,8 +50,8 @@ func NewClient(serverAddr string, username string, password string, tlsConfig *t
return c, nil
}
func (c *Client) Dial(packet bool, addr string) (io.ReadWriteCloser, error) {
stream, err := c.openStreamWithReconnect()
func (c *Client) Dial(packet bool, addr string) (net.Conn, error) {
stream, localAddr, remoteAddr, err := c.openStreamWithReconnect()
if err != nil {
return nil, err
}
@ -79,10 +78,15 @@ func (c *Client) Dial(packet bool, addr string) (io.ReadWriteCloser, error) {
return nil, fmt.Errorf("server rejected the connection %s (msg: %s)",
resp.Result.String(), resp.Message)
}
connWrap := &utils.QUICStreamWrapperConn{
Orig: stream,
PseudoLocalAddr: localAddr,
PseudoRemoteAddr: remoteAddr,
}
if packet {
return &utils.PacketReadWriteCloser{Orig: stream}, nil
return &utils.PacketWrapperConn{Orig: connWrap}, nil
} else {
return stream, nil
return connWrap, nil
}
}
@ -166,27 +170,28 @@ func (c *Client) handleControlStream(qs quic.Session, stream quic.Stream) (AuthR
return resp.Result, resp.Message, nil
}
func (c *Client) openStreamWithReconnect() (quic.Stream, error) {
func (c *Client) openStreamWithReconnect() (quic.Stream, net.Addr, net.Addr, error) {
c.reconnectMutex.Lock()
defer c.reconnectMutex.Unlock()
if c.closed {
return nil, ErrClosed
return nil, nil, nil, ErrClosed
}
stream, err := c.quicSession.OpenStream()
if err == nil {
// All good
return stream, nil
return stream, c.quicSession.LocalAddr(), c.quicSession.RemoteAddr(), nil
}
// Something is wrong
if nErr, ok := err.(net.Error); ok && nErr.Temporary() {
// Temporary error, just return
return nil, err
return nil, nil, nil, err
}
// Permanent error, need to reconnect
if err := c.connectToServer(); err != nil {
// Still error, oops
return nil, err
return nil, nil, nil, err
}
// We are not going to try again even if it still fails the second time
return c.quicSession.OpenStream()
stream, err = c.quicSession.OpenStream()
return stream, c.quicSession.LocalAddr(), c.quicSession.RemoteAddr(), err
}

View File

@ -110,7 +110,7 @@ func (s *Server) handleClient(cs quic.Session) {
closeErr = err
break
}
go s.handleStream(cs.RemoteAddr(), username, stream)
go s.handleStream(cs.LocalAddr(), cs.RemoteAddr(), username, stream)
}
s.clientDisconnectedFunc(cs.RemoteAddr(), username, closeErr)
_ = cs.CloseWithError(closeErrorCodeGeneric, "generic")
@ -158,7 +158,7 @@ func (s *Server) handleControlStream(cs quic.Session, stream quic.Stream) (strin
return req.Credential.Username, authResult == AuthResult_AUTH_SUCCESS, nil
}
func (s *Server) handleStream(addr net.Addr, username string, stream quic.Stream) {
func (s *Server) handleStream(localAddr net.Addr, remoteAddr net.Addr, username string, stream quic.Stream) {
defer stream.Close()
// Read request
req, err := readClientConnectRequest(stream)
@ -166,7 +166,7 @@ func (s *Server) handleStream(addr net.Addr, username string, stream quic.Stream
return
}
// Create connection with the handler
result, msg, conn := s.handleRequestFunc(addr, username, int(stream.StreamID()), req.Type, req.Address)
result, msg, conn := s.handleRequestFunc(remoteAddr, username, int(stream.StreamID()), req.Type, req.Address)
defer func() {
if conn != nil {
_ = conn.Close()
@ -178,11 +178,11 @@ func (s *Server) handleStream(addr net.Addr, username string, stream quic.Stream
Message: msg,
})
if err != nil {
s.requestClosedFunc(addr, username, int(stream.StreamID()), req.Type, req.Address, err)
s.requestClosedFunc(remoteAddr, username, int(stream.StreamID()), req.Type, req.Address, err)
return
}
if result != ConnectResult_CONN_SUCCESS {
s.requestClosedFunc(addr, username, int(stream.StreamID()), req.Type, req.Address,
s.requestClosedFunc(remoteAddr, username, int(stream.StreamID()), req.Type, req.Address,
fmt.Errorf("handler returned an unsuccessful state %s (msg: %s)", result.String(), msg))
return
}
@ -190,9 +190,13 @@ func (s *Server) handleStream(addr net.Addr, username string, stream quic.Stream
case ConnectionType_Stream:
err = utils.PipePair(stream, conn, &s.outboundBytes, &s.inboundBytes)
case ConnectionType_Packet:
err = utils.PipePair(&utils.PacketReadWriteCloser{Orig: stream}, conn, &s.outboundBytes, &s.inboundBytes)
err = utils.PipePair(&utils.PacketWrapperConn{Orig: &utils.QUICStreamWrapperConn{
Orig: stream,
PseudoLocalAddr: localAddr,
PseudoRemoteAddr: remoteAddr,
}}, conn, &s.outboundBytes, &s.inboundBytes)
default:
err = fmt.Errorf("unsupported connection type %s", req.Type.String())
}
s.requestClosedFunc(addr, username, int(stream.StreamID()), req.Type, req.Address, err)
s.requestClosedFunc(remoteAddr, username, int(stream.StreamID()), req.Type, req.Address, err)
}

View File

@ -0,0 +1,96 @@
package utils
import (
"encoding/binary"
"fmt"
"github.com/lucas-clemente/quic-go"
"io"
"net"
"time"
)
type PacketWrapperConn struct {
Orig net.Conn
}
func (w *PacketWrapperConn) Read(b []byte) (n int, err error) {
var sz uint32
if err := binary.Read(w.Orig, binary.BigEndian, &sz); err != nil {
return 0, err
}
if int(sz) <= len(b) {
return io.ReadFull(w.Orig, b[:sz])
} else {
return 0, fmt.Errorf("the buffer is too small to hold %d bytes of packet data", sz)
}
}
func (w *PacketWrapperConn) Write(b []byte) (n int, err error) {
sz := uint32(len(b))
if err := binary.Write(w.Orig, binary.BigEndian, &sz); err != nil {
return 0, err
}
return w.Orig.Write(b)
}
func (w *PacketWrapperConn) Close() error {
return w.Orig.Close()
}
func (w *PacketWrapperConn) LocalAddr() net.Addr {
return w.Orig.LocalAddr()
}
func (w *PacketWrapperConn) RemoteAddr() net.Addr {
return w.Orig.RemoteAddr()
}
func (w *PacketWrapperConn) SetDeadline(t time.Time) error {
return w.Orig.SetDeadline(t)
}
func (w *PacketWrapperConn) SetReadDeadline(t time.Time) error {
return w.Orig.SetReadDeadline(t)
}
func (w *PacketWrapperConn) SetWriteDeadline(t time.Time) error {
return w.Orig.SetWriteDeadline(t)
}
type QUICStreamWrapperConn struct {
Orig quic.Stream
PseudoLocalAddr net.Addr
PseudoRemoteAddr net.Addr
}
func (w *QUICStreamWrapperConn) Read(b []byte) (n int, err error) {
return w.Orig.Read(b)
}
func (w *QUICStreamWrapperConn) Write(b []byte) (n int, err error) {
return w.Orig.Write(b)
}
func (w *QUICStreamWrapperConn) Close() error {
return w.Orig.Close()
}
func (w *QUICStreamWrapperConn) LocalAddr() net.Addr {
return w.PseudoLocalAddr
}
func (w *QUICStreamWrapperConn) RemoteAddr() net.Addr {
return w.PseudoRemoteAddr
}
func (w *QUICStreamWrapperConn) SetDeadline(t time.Time) error {
return w.Orig.SetDeadline(t)
}
func (w *QUICStreamWrapperConn) SetReadDeadline(t time.Time) error {
return w.Orig.SetReadDeadline(t)
}
func (w *QUICStreamWrapperConn) SetWriteDeadline(t time.Time) error {
return w.Orig.SetWriteDeadline(t)
}

View File

@ -1,35 +0,0 @@
package utils
import (
"encoding/binary"
"fmt"
"io"
)
type PacketReadWriteCloser struct {
Orig io.ReadWriteCloser
}
func (rw *PacketReadWriteCloser) Read(p []byte) (n int, err error) {
var sz uint32
if err := binary.Read(rw.Orig, binary.BigEndian, &sz); err != nil {
return 0, err
}
if int(sz) <= len(p) {
return io.ReadFull(rw.Orig, p[:sz])
} else {
return 0, fmt.Errorf("the buffer is too small to hold %d bytes of packet data", sz)
}
}
func (rw *PacketReadWriteCloser) Write(p []byte) (n int, err error) {
sz := uint32(len(p))
if err := binary.Write(rw.Orig, binary.BigEndian, &sz); err != nil {
return 0, err
}
return rw.Orig.Write(p)
}
func (rw *PacketReadWriteCloser) Close() error {
return rw.Orig.Close()
}

View File

@ -61,7 +61,7 @@ func NewServer(addr string, tlsConfig *tls.Config, quicConfig *quic.Config,
}
type Client interface {
Dial(packet bool, addr string) (io.ReadWriteCloser, error)
Dial(packet bool, addr string) (net.Conn, error)
Stats() (inbound uint64, outbound uint64)
Close() error
}

58
pkg/http/server.go Normal file
View File

@ -0,0 +1,58 @@
package http
import (
"errors"
"fmt"
"github.com/elazarl/goproxy"
"github.com/tobyxdd/hysteria/pkg/acl"
"github.com/tobyxdd/hysteria/pkg/core"
"net"
"net/http"
"time"
)
func NewProxyHTTPServer(hyClient core.Client, idleTimeout time.Duration, aclEngine *acl.Engine,
newDialFunc func(reqAddr string, action acl.Action, arg string)) (*goproxy.ProxyHttpServer, error) {
proxy := goproxy.NewProxyHttpServer()
proxy.Logger = &nopLogger{}
proxy.NonproxyHandler = http.NotFoundHandler()
proxy.Tr = &http.Transport{
Dial: func(network, addr string) (net.Conn, error) {
// Parse addr string
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
ip := net.ParseIP(host)
if ip != nil {
host = ""
}
// ACL
action, arg := acl.ActionProxy, ""
if aclEngine != nil {
action, arg = aclEngine.Lookup(host, ip)
}
newDialFunc(addr, action, arg)
// Handle according to the action
switch action {
case acl.ActionDirect:
return net.Dial(network, addr)
case acl.ActionProxy:
return hyClient.Dial(false, addr)
case acl.ActionBlock:
return nil, errors.New("blocked in ACL")
case acl.ActionHijack:
return net.Dial(network, net.JoinHostPort(arg, port))
default:
return nil, fmt.Errorf("unknown action %d", action)
}
},
IdleConnTimeout: idleTimeout,
}
proxy.ConnectDial = nil
return proxy, nil
}
type nopLogger struct{}
func (n *nopLogger) Printf(format string, v ...interface{}) {}