diff --git a/pkg/core/client.go b/pkg/core/client.go index 1419c4c..d94f83a 100644 --- a/pkg/core/client.go +++ b/pkg/core/client.go @@ -185,7 +185,7 @@ func (c *Client) openStreamWithReconnect() (quic.Connection, quic.Stream, error) stream, err := c.quicConn.OpenStream() if err == nil { // All good - return c.quicConn, &wrappedQUICStream{stream}, nil + return c.quicConn, &qStream{stream}, nil } // Something is wrong if nErr, ok := err.(net.Error); ok && nErr.Temporary() { @@ -200,7 +200,7 @@ func (c *Client) openStreamWithReconnect() (quic.Connection, quic.Stream, error) } // We are not going to try again even if it still fails the second time stream, err = c.quicConn.OpenStream() - return c.quicConn, &wrappedQUICStream{stream}, err + return c.quicConn, &qStream{stream}, err } func (c *Client) DialTCP(addr string) (net.Conn, error) { diff --git a/pkg/core/server_client.go b/pkg/core/server_client.go index cda0db9..1441395 100644 --- a/pkg/core/server_client.go +++ b/pkg/core/server_client.go @@ -89,7 +89,7 @@ func (c *serverClient) Run() error { c.ConnGauge.Inc() } go func() { - stream := &wrappedQUICStream{stream} + stream := &qStream{stream} c.handleStream(stream) _ = stream.Close() if c.ConnGauge != nil { diff --git a/pkg/core/stream.go b/pkg/core/stream.go index 7771036..65259e4 100644 --- a/pkg/core/stream.go +++ b/pkg/core/stream.go @@ -7,49 +7,52 @@ import ( "github.com/lucas-clemente/quic-go" ) -// Handle stream close properly -// Ref: https://github.com/libp2p/go-libp2p-quic-transport/blob/master/stream.go -type wrappedQUICStream struct { +// qStream is a wrapper of quic.Stream that handles Close() correctly. +// quic-go's quic.Stream.Close() only closes the write side of the stream, +// NOT the read side. This would cause the pipe(s) to hang at Read() even +// after the stream is supposedly "closed". +// Ref: https://github.com/libp2p/go-libp2p/blob/master/p2p/transport/quic/stream.go +type qStream struct { Stream quic.Stream } -func (s *wrappedQUICStream) StreamID() quic.StreamID { +func (s *qStream) StreamID() quic.StreamID { return s.Stream.StreamID() } -func (s *wrappedQUICStream) Read(p []byte) (n int, err error) { +func (s *qStream) Read(p []byte) (n int, err error) { return s.Stream.Read(p) } -func (s *wrappedQUICStream) CancelRead(code quic.StreamErrorCode) { +func (s *qStream) CancelRead(code quic.StreamErrorCode) { s.Stream.CancelRead(code) } -func (s *wrappedQUICStream) SetReadDeadline(t time.Time) error { +func (s *qStream) SetReadDeadline(t time.Time) error { return s.Stream.SetReadDeadline(t) } -func (s *wrappedQUICStream) Write(p []byte) (n int, err error) { +func (s *qStream) Write(p []byte) (n int, err error) { return s.Stream.Write(p) } -func (s *wrappedQUICStream) Close() error { +func (s *qStream) Close() error { s.Stream.CancelRead(0) return s.Stream.Close() } -func (s *wrappedQUICStream) CancelWrite(code quic.StreamErrorCode) { +func (s *qStream) CancelWrite(code quic.StreamErrorCode) { s.Stream.CancelWrite(code) } -func (s *wrappedQUICStream) Context() context.Context { +func (s *qStream) Context() context.Context { return s.Stream.Context() } -func (s *wrappedQUICStream) SetWriteDeadline(t time.Time) error { +func (s *qStream) SetWriteDeadline(t time.Time) error { return s.Stream.SetWriteDeadline(t) } -func (s *wrappedQUICStream) SetDeadline(t time.Time) error { +func (s *qStream) SetDeadline(t time.Time) error { return s.Stream.SetDeadline(t) }