From 27460960ab0b2bc1416923a0228c49847f8998b6 Mon Sep 17 00:00:00 2001 From: Toby Date: Mon, 24 Jul 2023 20:12:48 -0700 Subject: [PATCH] feat(wip): udp rework client side tests & server tests update --- core/client/client.go | 4 ++ core/client/udp_test.go | 109 ++++++++++++++++++++++++++++++++++++++++ core/server/udp_test.go | 59 +++++++++++----------- 3 files changed, 144 insertions(+), 28 deletions(-) create mode 100644 core/client/udp_test.go diff --git a/core/client/client.go b/core/client/client.go index 153ffbd..3e9e259 100644 --- a/core/client/client.go +++ b/core/client/client.go @@ -149,6 +149,10 @@ func (c *clientImpl) openStream() (quic.Stream, error) { func (c *clientImpl) DialTCP(addr string) (net.Conn, error) { stream, err := c.openStream() if err != nil { + if netErr, ok := err.(net.Error); ok && !netErr.Temporary() { + // Connection is dead + return nil, coreErrs.ClosedError{} + } return nil, err } // Send request diff --git a/core/client/udp_test.go b/core/client/udp_test.go new file mode 100644 index 0000000..f40a5bc --- /dev/null +++ b/core/client/udp_test.go @@ -0,0 +1,109 @@ +package client + +import ( + "errors" + "fmt" + io2 "io" + "testing" + "time" + + coreErrs "github.com/apernet/hysteria/core/errors" + "github.com/apernet/hysteria/core/internal/protocol" + "go.uber.org/goleak" +) + +type udpEchoIO struct { + MsgCh chan *protocol.UDPMessage + CloseCh chan struct{} +} + +func (io *udpEchoIO) ReceiveMessage() (*protocol.UDPMessage, error) { + select { + case m := <-io.MsgCh: + return m, nil + case <-io.CloseCh: + return nil, errors.New("closed") + } +} + +func (io *udpEchoIO) SendMessage(buf []byte, msg *protocol.UDPMessage) error { + nMsg := *msg + nMsg.Data = make([]byte, len(msg.Data)) + copy(nMsg.Data, msg.Data) + io.MsgCh <- &nMsg + return nil +} + +func (io *udpEchoIO) Close() { + close(io.CloseCh) +} + +func TestUDPSessionManager(t *testing.T) { + io := &udpEchoIO{ + MsgCh: make(chan *protocol.UDPMessage, 10), + CloseCh: make(chan struct{}), + } + sm := newUDPSessionManager(io) + + rChan := make(chan error, 5) + + for i := 0; i < 5; i++ { + go func(id int) { + conn, err := sm.NewUDP() + if err != nil { + rChan <- err + return + } + defer conn.Close() + + addr := fmt.Sprintf("wow.com:%d", id) + for j := 0; j < 2; j++ { + s := fmt.Sprintf("hello %d %d", id, j) + err = conn.Send([]byte(s), addr) + if err != nil { + rChan <- err + return + } + bs, addr, err := conn.Receive() + if err != nil { + rChan <- err + return + } + if string(bs) != s || addr != addr { + rChan <- fmt.Errorf("unexpected message: %s %s", bs, addr) + return + } + } + rChan <- nil // Success + }(i) + } + + // Check the results + for i := 0; i < 5; i++ { + err := <-rChan + if err != nil { + t.Fatal(err) + } + } + + // Leak checks + // Create another UDP session + conn, err := sm.NewUDP() + if err != nil { + t.Fatal(err) + } + io.Close() + time.Sleep(1 * time.Second) // Give some time for the goroutines to exit + _, _, err = conn.Receive() + if err != io2.EOF { + t.Fatal("expected EOF after closing io") + } + _, err = sm.NewUDP() + if !errors.Is(err, coreErrs.ClosedError{}) { + t.Fatal("expected ClosedError after closing io") + } + if sm.Count() != 0 { + t.Error("session count should be 0") + } + goleak.VerifyNone(t) +} diff --git a/core/server/udp_test.go b/core/server/udp_test.go index 880bbad..a8e8296 100644 --- a/core/server/udp_test.go +++ b/core/server/udp_test.go @@ -166,26 +166,24 @@ func TestUDPSessionManager(t *testing.T) { for _, m := range ms { msgReceiveCh <- m } - // New event order should be consistent - newEvent := <-eventNewCh - if newEvent.SessionID != 1234 || newEvent.ReqAddr != "example.com:5353" { - t.Error("unexpected new event value") + // New event order should be consistent with message order + for i := 0; i < 2; i++ { + newEvent := <-eventNewCh + if newEvent.SessionID != ms[i].SessionID || newEvent.ReqAddr != ms[i].Addr { + t.Errorf("unexpected new event value: %d:%s", newEvent.SessionID, newEvent.ReqAddr) + } } - newEvent = <-eventNewCh - if newEvent.SessionID != 5678 || newEvent.ReqAddr != "example.com:9999" { - t.Error("unexpected new event value") - } - // Message order is not guaranteed + // Message order is not guaranteed so we use a map msgMap := make(map[string]bool) for i := 0; i < 4; i++ { msg := <-msgSendCh msgMap[fmt.Sprintf("%d:%s:%s", msg.SessionID, msg.Addr, string(msg.Data))] = true } - if !(msgMap["1234:example.com:5353:hello"] && - msgMap["5678:example.com:9999:goodbye"] && - msgMap["1234:example.com:5353: world"] && - msgMap["5678:example.com:9999: girl"]) { - t.Error("unexpected message value") + for _, m := range ms { + key := fmt.Sprintf("%d:%s:%s", m.SessionID, m.Addr, string(m.Data)) + if !msgMap[key] { + t.Errorf("missing message: %s", key) + } } // Timeout check startTime := time.Now() @@ -194,11 +192,14 @@ func TestUDPSessionManager(t *testing.T) { closeEvent := <-eventCloseCh closeMap[closeEvent.SessionID] = true } - if !(closeMap[1234] && closeMap[5678]) { - t.Error("unexpected close event value") + for i := 0; i < 2; i++ { + if !closeMap[ms[i].SessionID] { + t.Errorf("missing close event: %d", ms[i].SessionID) + } } - if time.Since(startTime) < 2*time.Second || time.Since(startTime) > 4*time.Second { - t.Error("unexpected timeout duration") + dur := time.Since(startTime) + if dur < 2*time.Second || dur > 4*time.Second { + t.Errorf("unexpected timeout duration: %s", dur) } }) @@ -206,7 +207,7 @@ func TestUDPSessionManager(t *testing.T) { // Close UDP connection immediately after creation io.UDPClose = true - msgReceiveCh <- &protocol.UDPMessage{ + m := &protocol.UDPMessage{ SessionID: 8888, PacketID: 0, FragID: 0, @@ -214,14 +215,15 @@ func TestUDPSessionManager(t *testing.T) { Addr: "mygod.org:1514", Data: []byte("goodnight"), } + msgReceiveCh <- m // Should have both new and close events immediately newEvent := <-eventNewCh - if newEvent.SessionID != 8888 || newEvent.ReqAddr != "mygod.org:1514" { - t.Error("unexpected new event value") + if newEvent.SessionID != m.SessionID || newEvent.ReqAddr != m.Addr { + t.Errorf("unexpected new event value: %d:%s", newEvent.SessionID, newEvent.ReqAddr) } closeEvent := <-eventCloseCh - if closeEvent.SessionID != 8888 || closeEvent.Err != errUDPClosed { - t.Error("unexpected close event value") + if closeEvent.SessionID != m.SessionID || closeEvent.Err != errUDPClosed { + t.Errorf("unexpected close event value: %d:%s", closeEvent.SessionID, closeEvent.Err) } }) @@ -229,7 +231,7 @@ func TestUDPSessionManager(t *testing.T) { // Block UDP connection creation io.BlockUDP = true - msgReceiveCh <- &protocol.UDPMessage{ + m := &protocol.UDPMessage{ SessionID: 9999, PacketID: 0, FragID: 0, @@ -237,14 +239,15 @@ func TestUDPSessionManager(t *testing.T) { Addr: "xxx.net:12450", Data: []byte("nope"), } + msgReceiveCh <- m // Should have both new and close events immediately newEvent := <-eventNewCh - if newEvent.SessionID != 9999 || newEvent.ReqAddr != "xxx.net:12450" { - t.Error("unexpected new event value") + if newEvent.SessionID != m.SessionID || newEvent.ReqAddr != m.Addr { + t.Errorf("unexpected new event value: %d:%s", newEvent.SessionID, newEvent.ReqAddr) } closeEvent := <-eventCloseCh - if closeEvent.SessionID != 9999 || closeEvent.Err != errUDPBlocked { - t.Error("unexpected close event value") + if closeEvent.SessionID != m.SessionID || closeEvent.Err != errUDPBlocked { + t.Errorf("unexpected close event value: %d:%s", closeEvent.SessionID, closeEvent.Err) } })