diff --git a/app/cmd/client.go b/app/cmd/client.go index 2bbd5ea..e1ce48f 100644 --- a/app/cmd/client.go +++ b/app/cmd/client.go @@ -185,7 +185,8 @@ func (c *clientConfig) fillQUICConfig(hyConfig *client.Config) error { func (c *clientConfig) fillBandwidthConfig(hyConfig *client.Config) error { if c.Bandwidth.Up == "" || c.Bandwidth.Down == "" { - return configError{Field: "bandwidth", Err: errors.New("both up and down bandwidth must be set")} + // New core now allows users to omit bandwidth values and use built-in congestion control + return nil } var err error hyConfig.BandwidthConfig.MaxTx, err = convBandwidth(c.Bandwidth.Up) diff --git a/app/go.mod b/app/go.mod index b3fce18..2e75f43 100644 --- a/app/go.mod +++ b/app/go.mod @@ -42,6 +42,7 @@ require ( github.com/stretchr/objx v0.5.0 // indirect github.com/subosito/gotenv v1.4.2 // indirect github.com/txthinking/runnergroup v0.0.0-20210608031112-152c7c4432bf // indirect + github.com/zhangyunhao116/fastrand v0.3.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.11.0 // indirect diff --git a/app/go.sum b/app/go.sum index c37cf65..fcb0a14 100644 --- a/app/go.sum +++ b/app/go.sum @@ -224,6 +224,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/zhangyunhao116/fastrand v0.3.0 h1:7bwe124xcckPulX6fxtr2lFdO2KQqaefdtbk+mqO/Ig= +github.com/zhangyunhao116/fastrand v0.3.0/go.mod h1:0v5KgHho0VE6HU192HnY15de/oDS8UrbBChIFjIhBtc= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= diff --git a/core/client/client.go b/core/client/client.go index 7ed6a78..24c3be8 100644 --- a/core/client/client.go +++ b/core/client/client.go @@ -9,7 +9,9 @@ import ( "time" coreErrs "github.com/apernet/hysteria/core/errors" - "github.com/apernet/hysteria/core/internal/congestion" + "github.com/apernet/hysteria/core/internal/congestion/bbr" + "github.com/apernet/hysteria/core/internal/congestion/brutal" + "github.com/apernet/hysteria/core/internal/congestion/common" "github.com/apernet/hysteria/core/internal/protocol" "github.com/apernet/hysteria/core/internal/utils" @@ -123,9 +125,16 @@ func (c *clientImpl) connect() error { if actualTx == 0 || actualTx > c.config.BandwidthConfig.MaxTx { actualTx = c.config.BandwidthConfig.MaxTx } - // Set congestion control when applicable + // Use Brutal CC if actualTx > 0, otherwise use BBR if actualTx > 0 { - conn.SetCongestionControl(congestion.NewBrutalSender(actualTx)) + conn.SetCongestionControl(brutal.NewBrutalSender(actualTx)) + } else { + conn.SetCongestionControl(bbr.NewBBRSender( + bbr.DefaultClock{}, + bbr.GetInitialPacketSize(conn.RemoteAddr()), + 32*common.InitMaxDatagramSize, + bbr.DefaultBBRMaxCongestionWindow*common.InitMaxDatagramSize, + )) } _ = resp.Body.Close() diff --git a/core/go.mod b/core/go.mod index 79b8790..4a0616d 100644 --- a/core/go.mod +++ b/core/go.mod @@ -5,6 +5,7 @@ go 1.20 require ( github.com/apernet/quic-go v0.37.3-0.20230804221709-80e23a7cabb6 github.com/stretchr/testify v1.8.4 + github.com/zhangyunhao116/fastrand v0.3.0 go.uber.org/goleak v1.2.1 golang.org/x/time v0.3.0 ) diff --git a/core/go.sum b/core/go.sum index 49e5944..00b88a4 100644 --- a/core/go.sum +++ b/core/go.sum @@ -44,6 +44,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/zhangyunhao116/fastrand v0.3.0 h1:7bwe124xcckPulX6fxtr2lFdO2KQqaefdtbk+mqO/Ig= +github.com/zhangyunhao116/fastrand v0.3.0/go.mod h1:0v5KgHho0VE6HU192HnY15de/oDS8UrbBChIFjIhBtc= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/core/internal/congestion/bbr/bandwidth.go b/core/internal/congestion/bbr/bandwidth.go new file mode 100644 index 0000000..4f0b75f --- /dev/null +++ b/core/internal/congestion/bbr/bandwidth.go @@ -0,0 +1,25 @@ +package bbr + +import ( + "math" + "time" + + "github.com/apernet/quic-go/congestion" +) + +// Bandwidth of a connection +type Bandwidth uint64 + +const infBandwidth Bandwidth = math.MaxUint64 + +const ( + // BitsPerSecond is 1 bit per second + BitsPerSecond Bandwidth = 1 + // BytesPerSecond is 1 byte per second + BytesPerSecond = 8 * BitsPerSecond +) + +// BandwidthFromDelta calculates the bandwidth from a number of bytes and a time delta +func BandwidthFromDelta(bytes congestion.ByteCount, delta time.Duration) Bandwidth { + return Bandwidth(bytes) * Bandwidth(time.Second) / Bandwidth(delta) * BytesPerSecond +} diff --git a/core/internal/congestion/bbr/bandwidth_sampler.go b/core/internal/congestion/bbr/bandwidth_sampler.go new file mode 100644 index 0000000..d116e5f --- /dev/null +++ b/core/internal/congestion/bbr/bandwidth_sampler.go @@ -0,0 +1,374 @@ +package bbr + +import ( + "math" + "time" + + "github.com/apernet/quic-go/congestion" +) + +var InfiniteBandwidth = Bandwidth(math.MaxUint64) + +// SendTimeState is a subset of ConnectionStateOnSentPacket which is returned +// to the caller when the packet is acked or lost. +type SendTimeState struct { + // Whether other states in this object is valid. + isValid bool + // Whether the sender is app limited at the time the packet was sent. + // App limited bandwidth sample might be artificially low because the sender + // did not have enough data to send in order to saturate the link. + isAppLimited bool + // Total number of sent bytes at the time the packet was sent. + // Includes the packet itself. + totalBytesSent congestion.ByteCount + // Total number of acked bytes at the time the packet was sent. + totalBytesAcked congestion.ByteCount + // Total number of lost bytes at the time the packet was sent. + totalBytesLost congestion.ByteCount +} + +// ConnectionStateOnSentPacket represents the information about a sent packet +// and the state of the connection at the moment the packet was sent, +// specifically the information about the most recently acknowledged packet at +// that moment. +type ConnectionStateOnSentPacket struct { + packetNumber congestion.PacketNumber + // Time at which the packet is sent. + sendTime time.Time + // Size of the packet. + size congestion.ByteCount + // The value of |totalBytesSentAtLastAckedPacket| at the time the + // packet was sent. + totalBytesSentAtLastAckedPacket congestion.ByteCount + // The value of |lastAckedPacketSentTime| at the time the packet was + // sent. + lastAckedPacketSentTime time.Time + // The value of |lastAckedPacketAckTime| at the time the packet was + // sent. + lastAckedPacketAckTime time.Time + // Send time states that are returned to the congestion controller when the + // packet is acked or lost. + sendTimeState SendTimeState +} + +// BandwidthSample +type BandwidthSample struct { + // The bandwidth at that particular sample. Zero if no valid bandwidth sample + // is available. + bandwidth Bandwidth + // The RTT measurement at this particular sample. Zero if no RTT sample is + // available. Does not correct for delayed ack time. + rtt time.Duration + // States captured when the packet was sent. + stateAtSend SendTimeState +} + +func NewBandwidthSample() *BandwidthSample { + return &BandwidthSample{ + // FIXME: the default value of original code is zero. + rtt: InfiniteRTT, + } +} + +// BandwidthSampler keeps track of sent and acknowledged packets and outputs a +// bandwidth sample for every packet acknowledged. The samples are taken for +// individual packets, and are not filtered; the consumer has to filter the +// bandwidth samples itself. In certain cases, the sampler will locally severely +// underestimate the bandwidth, hence a maximum filter with a size of at least +// one RTT is recommended. +// +// This class bases its samples on the slope of two curves: the number of bytes +// sent over time, and the number of bytes acknowledged as received over time. +// It produces a sample of both slopes for every packet that gets acknowledged, +// based on a slope between two points on each of the corresponding curves. Note +// that due to the packet loss, the number of bytes on each curve might get +// further and further away from each other, meaning that it is not feasible to +// compare byte values coming from different curves with each other. +// +// The obvious points for measuring slope sample are the ones corresponding to +// the packet that was just acknowledged. Let us denote them as S_1 (point at +// which the current packet was sent) and A_1 (point at which the current packet +// was acknowledged). However, taking a slope requires two points on each line, +// so estimating bandwidth requires picking a packet in the past with respect to +// which the slope is measured. +// +// For that purpose, BandwidthSampler always keeps track of the most recently +// acknowledged packet, and records it together with every outgoing packet. +// When a packet gets acknowledged (A_1), it has not only information about when +// it itself was sent (S_1), but also the information about the latest +// acknowledged packet right before it was sent (S_0 and A_0). +// +// Based on that data, send and ack rate are estimated as: +// +// send_rate = (bytes(S_1) - bytes(S_0)) / (time(S_1) - time(S_0)) +// ack_rate = (bytes(A_1) - bytes(A_0)) / (time(A_1) - time(A_0)) +// +// Here, the ack rate is intuitively the rate we want to treat as bandwidth. +// However, in certain cases (e.g. ack compression) the ack rate at a point may +// end up higher than the rate at which the data was originally sent, which is +// not indicative of the real bandwidth. Hence, we use the send rate as an upper +// bound, and the sample value is +// +// rate_sample = min(send_rate, ack_rate) +// +// An important edge case handled by the sampler is tracking the app-limited +// samples. There are multiple meaning of "app-limited" used interchangeably, +// hence it is important to understand and to be able to distinguish between +// them. +// +// Meaning 1: connection state. The connection is said to be app-limited when +// there is no outstanding data to send. This means that certain bandwidth +// samples in the future would not be an accurate indication of the link +// capacity, and it is important to inform consumer about that. Whenever +// connection becomes app-limited, the sampler is notified via OnAppLimited() +// method. +// +// Meaning 2: a phase in the bandwidth sampler. As soon as the bandwidth +// sampler becomes notified about the connection being app-limited, it enters +// app-limited phase. In that phase, all *sent* packets are marked as +// app-limited. Note that the connection itself does not have to be +// app-limited during the app-limited phase, and in fact it will not be +// (otherwise how would it send packets?). The boolean flag below indicates +// whether the sampler is in that phase. +// +// Meaning 3: a flag on the sent packet and on the sample. If a sent packet is +// sent during the app-limited phase, the resulting sample related to the +// packet will be marked as app-limited. +// +// With the terminology issue out of the way, let us consider the question of +// what kind of situation it addresses. +// +// Consider a scenario where we first send packets 1 to 20 at a regular +// bandwidth, and then immediately run out of data. After a few seconds, we send +// packets 21 to 60, and only receive ack for 21 between sending packets 40 and +// 41. In this case, when we sample bandwidth for packets 21 to 40, the S_0/A_0 +// we use to compute the slope is going to be packet 20, a few seconds apart +// from the current packet, hence the resulting estimate would be extremely low +// and not indicative of anything. Only at packet 41 the S_0/A_0 will become 21, +// meaning that the bandwidth sample would exclude the quiescence. +// +// Based on the analysis of that scenario, we implement the following rule: once +// OnAppLimited() is called, all sent packets will produce app-limited samples +// up until an ack for a packet that was sent after OnAppLimited() was called. +// Note that while the scenario above is not the only scenario when the +// connection is app-limited, the approach works in other cases too. +type BandwidthSampler struct { + // The total number of congestion controlled bytes sent during the connection. + totalBytesSent congestion.ByteCount + // The total number of congestion controlled bytes which were acknowledged. + totalBytesAcked congestion.ByteCount + // The total number of congestion controlled bytes which were lost. + totalBytesLost congestion.ByteCount + // The value of |totalBytesSent| at the time the last acknowledged packet + // was sent. Valid only when |lastAckedPacketSentTime| is valid. + totalBytesSentAtLastAckedPacket congestion.ByteCount + // The time at which the last acknowledged packet was sent. Set to + // QuicTime::Zero() if no valid timestamp is available. + lastAckedPacketSentTime time.Time + // The time at which the most recent packet was acknowledged. + lastAckedPacketAckTime time.Time + // The most recently sent packet. + lastSendPacket congestion.PacketNumber + // Indicates whether the bandwidth sampler is currently in an app-limited + // phase. + isAppLimited bool + // The packet that will be acknowledged after this one will cause the sampler + // to exit the app-limited phase. + endOfAppLimitedPhase congestion.PacketNumber + // Record of the connection state at the point where each packet in flight was + // sent, indexed by the packet number. + connectionStats *ConnectionStates +} + +func NewBandwidthSampler() *BandwidthSampler { + return &BandwidthSampler{ + connectionStats: &ConnectionStates{ + stats: make(map[congestion.PacketNumber]*ConnectionStateOnSentPacket), + }, + } +} + +// OnPacketSent Inputs the sent packet information into the sampler. Assumes that all +// packets are sent in order. The information about the packet will not be +// released from the sampler until it the packet is either acknowledged or +// declared lost. +func (s *BandwidthSampler) OnPacketSent(sentTime time.Time, lastSentPacket congestion.PacketNumber, sentBytes, bytesInFlight congestion.ByteCount, hasRetransmittableData bool) { + s.lastSendPacket = lastSentPacket + + if !hasRetransmittableData { + return + } + + s.totalBytesSent += sentBytes + + // If there are no packets in flight, the time at which the new transmission + // opens can be treated as the A_0 point for the purpose of bandwidth + // sampling. This underestimates bandwidth to some extent, and produces some + // artificially low samples for most packets in flight, but it provides with + // samples at important points where we would not have them otherwise, most + // importantly at the beginning of the connection. + if bytesInFlight == 0 { + s.lastAckedPacketAckTime = sentTime + s.totalBytesSentAtLastAckedPacket = s.totalBytesSent + + // In this situation ack compression is not a concern, set send rate to + // effectively infinite. + s.lastAckedPacketSentTime = sentTime + } + + s.connectionStats.Insert(lastSentPacket, sentTime, sentBytes, s) +} + +// OnPacketAcked Notifies the sampler that the |lastAckedPacket| is acknowledged. Returns a +// bandwidth sample. If no bandwidth sample is available, +// QuicBandwidth::Zero() is returned. +func (s *BandwidthSampler) OnPacketAcked(ackTime time.Time, lastAckedPacket congestion.PacketNumber) *BandwidthSample { + sentPacketState := s.connectionStats.Get(lastAckedPacket) + if sentPacketState == nil { + return NewBandwidthSample() + } + + sample := s.onPacketAckedInner(ackTime, lastAckedPacket, sentPacketState) + s.connectionStats.Remove(lastAckedPacket) + + return sample +} + +// onPacketAckedInner Handles the actual bandwidth calculations, whereas the outer method handles +// retrieving and removing |sentPacket|. +func (s *BandwidthSampler) onPacketAckedInner(ackTime time.Time, lastAckedPacket congestion.PacketNumber, sentPacket *ConnectionStateOnSentPacket) *BandwidthSample { + s.totalBytesAcked += sentPacket.size + + s.totalBytesSentAtLastAckedPacket = sentPacket.sendTimeState.totalBytesSent + s.lastAckedPacketSentTime = sentPacket.sendTime + s.lastAckedPacketAckTime = ackTime + + // Exit app-limited phase once a packet that was sent while the connection is + // not app-limited is acknowledged. + if s.isAppLimited && lastAckedPacket > s.endOfAppLimitedPhase { + s.isAppLimited = false + } + + // There might have been no packets acknowledged at the moment when the + // current packet was sent. In that case, there is no bandwidth sample to + // make. + if sentPacket.lastAckedPacketSentTime.IsZero() { + return NewBandwidthSample() + } + + // Infinite rate indicates that the sampler is supposed to discard the + // current send rate sample and use only the ack rate. + sendRate := InfiniteBandwidth + if sentPacket.sendTime.After(sentPacket.lastAckedPacketSentTime) { + sendRate = BandwidthFromDelta(sentPacket.sendTimeState.totalBytesSent-sentPacket.totalBytesSentAtLastAckedPacket, sentPacket.sendTime.Sub(sentPacket.lastAckedPacketSentTime)) + } + + // During the slope calculation, ensure that ack time of the current packet is + // always larger than the time of the previous packet, otherwise division by + // zero or integer underflow can occur. + if !ackTime.After(sentPacket.lastAckedPacketAckTime) { + // TODO(wub): Compare this code count before and after fixing clock jitter + // issue. + // if sentPacket.lastAckedPacketAckTime.Equal(sentPacket.sendTime) { + // This is the 1st packet after quiescense. + // QUIC_CODE_COUNT_N(quic_prev_ack_time_larger_than_current_ack_time, 1, 2); + // } else { + // QUIC_CODE_COUNT_N(quic_prev_ack_time_larger_than_current_ack_time, 2, 2); + // } + + return NewBandwidthSample() + } + + ackRate := BandwidthFromDelta(s.totalBytesAcked-sentPacket.sendTimeState.totalBytesAcked, + ackTime.Sub(sentPacket.lastAckedPacketAckTime)) + + // Note: this sample does not account for delayed acknowledgement time. This + // means that the RTT measurements here can be artificially high, especially + // on low bandwidth connections. + sample := &BandwidthSample{ + bandwidth: minBandwidth(sendRate, ackRate), + rtt: ackTime.Sub(sentPacket.sendTime), + } + + SentPacketToSendTimeState(sentPacket, &sample.stateAtSend) + return sample +} + +// OnPacketLost Informs the sampler that a packet is considered lost and it should no +// longer keep track of it. +func (s *BandwidthSampler) OnPacketLost(packetNumber congestion.PacketNumber) SendTimeState { + ok, sentPacket := s.connectionStats.Remove(packetNumber) + sendTimeState := SendTimeState{ + isValid: ok, + } + if sentPacket != nil { + s.totalBytesLost += sentPacket.size + SentPacketToSendTimeState(sentPacket, &sendTimeState) + } + + return sendTimeState +} + +// OnAppLimited Informs the sampler that the connection is currently app-limited, causing +// the sampler to enter the app-limited phase. The phase will expire by +// itself. +func (s *BandwidthSampler) OnAppLimited() { + s.isAppLimited = true + s.endOfAppLimitedPhase = s.lastSendPacket +} + +// SentPacketToSendTimeState Copy a subset of the (private) ConnectionStateOnSentPacket to the (public) +// SendTimeState. Always set send_time_state->is_valid to true. +func SentPacketToSendTimeState(sentPacket *ConnectionStateOnSentPacket, sendTimeState *SendTimeState) { + sendTimeState.isAppLimited = sentPacket.sendTimeState.isAppLimited + sendTimeState.totalBytesSent = sentPacket.sendTimeState.totalBytesSent + sendTimeState.totalBytesAcked = sentPacket.sendTimeState.totalBytesAcked + sendTimeState.totalBytesLost = sentPacket.sendTimeState.totalBytesLost + sendTimeState.isValid = true +} + +// ConnectionStates Record of the connection state at the point where each packet in flight was +// sent, indexed by the packet number. +// FIXME: using LinkedList replace map to fast remove all the packets lower than the specified packet number. +type ConnectionStates struct { + stats map[congestion.PacketNumber]*ConnectionStateOnSentPacket +} + +func (s *ConnectionStates) Insert(packetNumber congestion.PacketNumber, sentTime time.Time, bytes congestion.ByteCount, sampler *BandwidthSampler) bool { + if _, ok := s.stats[packetNumber]; ok { + return false + } + + s.stats[packetNumber] = NewConnectionStateOnSentPacket(packetNumber, sentTime, bytes, sampler) + return true +} + +func (s *ConnectionStates) Get(packetNumber congestion.PacketNumber) *ConnectionStateOnSentPacket { + return s.stats[packetNumber] +} + +func (s *ConnectionStates) Remove(packetNumber congestion.PacketNumber) (bool, *ConnectionStateOnSentPacket) { + state, ok := s.stats[packetNumber] + if ok { + delete(s.stats, packetNumber) + } + return ok, state +} + +func NewConnectionStateOnSentPacket(packetNumber congestion.PacketNumber, sentTime time.Time, bytes congestion.ByteCount, sampler *BandwidthSampler) *ConnectionStateOnSentPacket { + return &ConnectionStateOnSentPacket{ + packetNumber: packetNumber, + sendTime: sentTime, + size: bytes, + lastAckedPacketSentTime: sampler.lastAckedPacketSentTime, + lastAckedPacketAckTime: sampler.lastAckedPacketAckTime, + totalBytesSentAtLastAckedPacket: sampler.totalBytesSentAtLastAckedPacket, + sendTimeState: SendTimeState{ + isValid: true, + isAppLimited: sampler.isAppLimited, + totalBytesSent: sampler.totalBytesSent, + totalBytesAcked: sampler.totalBytesAcked, + totalBytesLost: sampler.totalBytesLost, + }, + } +} diff --git a/core/internal/congestion/bbr/bbr_sender.go b/core/internal/congestion/bbr/bbr_sender.go new file mode 100644 index 0000000..3c21f06 --- /dev/null +++ b/core/internal/congestion/bbr/bbr_sender.go @@ -0,0 +1,1001 @@ +package bbr + +// src from https://quiche.googlesource.com/quiche.git/+/66dea072431f94095dfc3dd2743cb94ef365f7ef/quic/core/congestion_control/bbr_sender.cc + +import ( + "fmt" + "math" + "net" + "time" + + "github.com/apernet/hysteria/core/internal/congestion/common" + "github.com/apernet/quic-go/congestion" + "github.com/zhangyunhao116/fastrand" +) + +const ( + InitialPacketSizeIPv4 = 1252 + InitialPacketSizeIPv6 = 1232 + InitialCongestionWindow = 32 + DefaultBBRMaxCongestionWindow = 10000 +) + +func GetInitialPacketSize(addr net.Addr) congestion.ByteCount { + maxSize := congestion.ByteCount(1200) + // If this is not a UDP address, we don't know anything about the MTU. + // Use the minimum size of an Initial packet as the max packet size. + if udpAddr, ok := addr.(*net.UDPAddr); ok { + if udpAddr.IP.To4() != nil { + maxSize = InitialPacketSizeIPv4 + } else { + maxSize = InitialPacketSizeIPv6 + } + } + return congestion.ByteCount(maxSize) +} + +var ( + + // Default initial rtt used before any samples are received. + InitialRtt = 100 * time.Millisecond + + // The gain used for the STARTUP, equal to 4*ln(2). + DefaultHighGain = 2.77 + + // The gain used in STARTUP after loss has been detected. + // 1.5 is enough to allow for 25% exogenous loss and still observe a 25% growth + // in measured bandwidth. + StartupAfterLossGain = 1.5 + + // The cycle of gains used during the PROBE_BW stage. + PacingGain = []float64{1.25, 0.75, 1, 1, 1, 1, 1, 1} + + // The length of the gain cycle. + GainCycleLength = len(PacingGain) + + // The size of the bandwidth filter window, in round-trips. + BandwidthWindowSize = GainCycleLength + 2 + + // The time after which the current min_rtt value expires. + MinRttExpiry = 10 * time.Second + + // The minimum time the connection can spend in PROBE_RTT mode. + ProbeRttTime = time.Millisecond * 200 + + // If the bandwidth does not increase by the factor of |kStartupGrowthTarget| + // within |kRoundTripsWithoutGrowthBeforeExitingStartup| rounds, the connection + // will exit the STARTUP mode. + StartupGrowthTarget = 1.25 + RoundTripsWithoutGrowthBeforeExitingStartup = int64(3) + + // Coefficient of target congestion window to use when basing PROBE_RTT on BDP. + ModerateProbeRttMultiplier = 0.75 + + // Coefficient to determine if a new RTT is sufficiently similar to min_rtt that + // we don't need to enter PROBE_RTT. + SimilarMinRttThreshold = 1.125 + + // Congestion window gain for QUIC BBR during PROBE_BW phase. + DefaultCongestionWindowGainConst = 2.0 +) + +type bbrMode int + +const ( + // Startup phase of the connection. + STARTUP = iota + // After achieving the highest possible bandwidth during the startup, lower + // the pacing rate in order to drain the queue. + DRAIN + // Cruising mode. + PROBE_BW + // Temporarily slow down sending in order to empty the buffer and measure + // the real minimum RTT. + PROBE_RTT +) + +type bbrRecoveryState int + +const ( + // Do not limit. + NOT_IN_RECOVERY = iota + + // Allow an extra outstanding byte for each byte acknowledged. + CONSERVATION + + // Allow two extra outstanding bytes for each byte acknowledged (slow + // start). + GROWTH +) + +type bbrSender struct { + mode bbrMode + clock Clock + rttStats congestion.RTTStatsProvider + bytesInFlight congestion.ByteCount + // return total bytes of unacked packets. + // GetBytesInFlight func() congestion.ByteCount + // Bandwidth sampler provides BBR with the bandwidth measurements at + // individual points. + sampler *BandwidthSampler + // The number of the round trips that have occurred during the connection. + roundTripCount int64 + // The packet number of the most recently sent packet. + lastSendPacket congestion.PacketNumber + // Acknowledgement of any packet after |current_round_trip_end_| will cause + // the round trip counter to advance. + currentRoundTripEnd congestion.PacketNumber + // The filter that tracks the maximum bandwidth over the multiple recent + // round-trips. + maxBandwidth *WindowedFilter + // Tracks the maximum number of bytes acked faster than the sending rate. + maxAckHeight *WindowedFilter + // The time this aggregation started and the number of bytes acked during it. + aggregationEpochStartTime time.Time + aggregationEpochBytes congestion.ByteCount + // Minimum RTT estimate. Automatically expires within 10 seconds (and + // triggers PROBE_RTT mode) if no new value is sampled during that period. + minRtt time.Duration + // The time at which the current value of |min_rtt_| was assigned. + minRttTimestamp time.Time + // The maximum allowed number of bytes in flight. + congestionWindow congestion.ByteCount + // The initial value of the |congestion_window_|. + initialCongestionWindow congestion.ByteCount + // The largest value the |congestion_window_| can achieve. + initialMaxCongestionWindow congestion.ByteCount + // The smallest value the |congestion_window_| can achieve. + // minCongestionWindow congestion.ByteCount + // The pacing gain applied during the STARTUP phase. + highGain float64 + // The CWND gain applied during the STARTUP phase. + highCwndGain float64 + // The pacing gain applied during the DRAIN phase. + drainGain float64 + // The current pacing rate of the connection. + pacingRate Bandwidth + // The gain currently applied to the pacing rate. + pacingGain float64 + // The gain currently applied to the congestion window. + congestionWindowGain float64 + // The gain used for the congestion window during PROBE_BW. Latched from + // quic_bbr_cwnd_gain flag. + congestionWindowGainConst float64 + // The number of RTTs to stay in STARTUP mode. Defaults to 3. + numStartupRtts int64 + // If true, exit startup if 1RTT has passed with no bandwidth increase and + // the connection is in recovery. + exitStartupOnLoss bool + // Number of round-trips in PROBE_BW mode, used for determining the current + // pacing gain cycle. + cycleCurrentOffset int + // The time at which the last pacing gain cycle was started. + lastCycleStart time.Time + // Indicates whether the connection has reached the full bandwidth mode. + isAtFullBandwidth bool + // Number of rounds during which there was no significant bandwidth increase. + roundsWithoutBandwidthGain int64 + // The bandwidth compared to which the increase is measured. + bandwidthAtLastRound Bandwidth + // Set to true upon exiting quiescence. + exitingQuiescence bool + // Time at which PROBE_RTT has to be exited. Setting it to zero indicates + // that the time is yet unknown as the number of packets in flight has not + // reached the required value. + exitProbeRttAt time.Time + // Indicates whether a round-trip has passed since PROBE_RTT became active. + probeRttRoundPassed bool + // Indicates whether the most recent bandwidth sample was marked as + // app-limited. + lastSampleIsAppLimited bool + // Indicates whether any non app-limited samples have been recorded. + hasNoAppLimitedSample bool + // Indicates app-limited calls should be ignored as long as there's + // enough data inflight to see more bandwidth when necessary. + flexibleAppLimited bool + // Current state of recovery. + recoveryState bbrRecoveryState + // Receiving acknowledgement of a packet after |end_recovery_at_| will cause + // BBR to exit the recovery mode. A value above zero indicates at least one + // loss has been detected, so it must not be set back to zero. + endRecoveryAt congestion.PacketNumber + // A window used to limit the number of bytes in flight during loss recovery. + recoveryWindow congestion.ByteCount + // If true, consider all samples in recovery app-limited. + isAppLimitedRecovery bool + // When true, pace at 1.5x and disable packet conservation in STARTUP. + slowerStartup bool + // When true, disables packet conservation in STARTUP. + rateBasedStartup bool + // When non-zero, decreases the rate in STARTUP by the total number of bytes + // lost in STARTUP divided by CWND. + startupRateReductionMultiplier int64 + // Sum of bytes lost in STARTUP. + startupBytesLost congestion.ByteCount + // When true, add the most recent ack aggregation measurement during STARTUP. + enableAckAggregationDuringStartup bool + // When true, expire the windowed ack aggregation values in STARTUP when + // bandwidth increases more than 25%. + expireAckAggregationInStartup bool + // If true, will not exit low gain mode until bytes_in_flight drops below BDP + // or it's time for high gain mode. + drainToTarget bool + // If true, use a CWND of 0.75*BDP during probe_rtt instead of 4 packets. + probeRttBasedOnBdp bool + // If true, skip probe_rtt and update the timestamp of the existing min_rtt to + // now if min_rtt over the last cycle is within 12.5% of the current min_rtt. + // Even if the min_rtt is 12.5% too low, the 25% gain cycling and 2x CWND gain + // should overcome an overly small min_rtt. + probeRttSkippedIfSimilarRtt bool + // If true, disable PROBE_RTT entirely as long as the connection was recently + // app limited. + probeRttDisabledIfAppLimited bool + appLimitedSinceLastProbeRtt bool + minRttSinceLastProbeRtt time.Duration + // Latched value of --quic_always_get_bw_sample_when_acked. + alwaysGetBwSampleWhenAcked bool + + pacer *common.Pacer + + maxDatagramSize congestion.ByteCount +} + +func NewBBRSender( + clock Clock, + initialMaxDatagramSize, + initialCongestionWindow, + initialMaxCongestionWindow congestion.ByteCount, +) *bbrSender { + b := &bbrSender{ + mode: STARTUP, + clock: clock, + sampler: NewBandwidthSampler(), + maxBandwidth: NewWindowedFilter(int64(BandwidthWindowSize), MaxFilter), + maxAckHeight: NewWindowedFilter(int64(BandwidthWindowSize), MaxFilter), + congestionWindow: initialCongestionWindow, + initialCongestionWindow: initialCongestionWindow, + highGain: DefaultHighGain, + highCwndGain: DefaultHighGain, + drainGain: 1.0 / DefaultHighGain, + pacingGain: 1.0, + congestionWindowGain: 1.0, + congestionWindowGainConst: DefaultCongestionWindowGainConst, + numStartupRtts: RoundTripsWithoutGrowthBeforeExitingStartup, + recoveryState: NOT_IN_RECOVERY, + recoveryWindow: initialMaxCongestionWindow, + minRttSinceLastProbeRtt: InfiniteRTT, + maxDatagramSize: initialMaxDatagramSize, + } + b.pacer = common.NewPacer(func() congestion.ByteCount { + return congestion.ByteCount(b.BandwidthEstimate() / BytesPerSecond) + }) + return b +} + +func (b *bbrSender) maxCongestionWindow() congestion.ByteCount { + return b.maxDatagramSize * DefaultBBRMaxCongestionWindow +} + +func (b *bbrSender) minCongestionWindow() congestion.ByteCount { + return b.maxDatagramSize * b.initialCongestionWindow +} + +func (b *bbrSender) SetRTTStatsProvider(provider congestion.RTTStatsProvider) { + b.rttStats = provider +} + +func (b *bbrSender) GetBytesInFlight() congestion.ByteCount { + return b.bytesInFlight +} + +// TimeUntilSend returns when the next packet should be sent. +func (b *bbrSender) TimeUntilSend(bytesInFlight congestion.ByteCount) time.Time { + b.bytesInFlight = bytesInFlight + return b.pacer.TimeUntilSend() +} + +func (b *bbrSender) HasPacingBudget(now time.Time) bool { + return b.pacer.Budget(now) >= b.maxDatagramSize +} + +func (b *bbrSender) SetMaxDatagramSize(s congestion.ByteCount) { + if s < b.maxDatagramSize { + panic(fmt.Sprintf("congestion BUG: decreased max datagram size from %d to %d", b.maxDatagramSize, s)) + } + cwndIsMinCwnd := b.congestionWindow == b.minCongestionWindow() + b.maxDatagramSize = s + if cwndIsMinCwnd { + b.congestionWindow = b.minCongestionWindow() + } + b.pacer.SetMaxDatagramSize(s) +} + +func (b *bbrSender) OnPacketSent(sentTime time.Time, bytesInFlight congestion.ByteCount, packetNumber congestion.PacketNumber, bytes congestion.ByteCount, isRetransmittable bool) { + b.pacer.SentPacket(sentTime, bytes) + b.lastSendPacket = packetNumber + + b.bytesInFlight = bytesInFlight + if bytesInFlight == 0 && b.sampler.isAppLimited { + b.exitingQuiescence = true + } + + if b.aggregationEpochStartTime.IsZero() { + b.aggregationEpochStartTime = sentTime + } + + b.sampler.OnPacketSent(sentTime, packetNumber, bytes, bytesInFlight, isRetransmittable) +} + +func (b *bbrSender) CanSend(bytesInFlight congestion.ByteCount) bool { + b.bytesInFlight = bytesInFlight + return bytesInFlight < b.GetCongestionWindow() +} + +func (b *bbrSender) GetCongestionWindow() congestion.ByteCount { + if b.mode == PROBE_RTT { + return b.ProbeRttCongestionWindow() + } + + if b.InRecovery() && !(b.rateBasedStartup && b.mode == STARTUP) { + return minByteCount(b.congestionWindow, b.recoveryWindow) + } + + return b.congestionWindow +} + +func (b *bbrSender) MaybeExitSlowStart() { +} + +func (b *bbrSender) OnPacketAcked(number congestion.PacketNumber, ackedBytes, priorInFlight congestion.ByteCount, eventTime time.Time) { + totalBytesAckedBefore := b.sampler.totalBytesAcked + isRoundStart, minRttExpired := false, false + lastAckedPacket := number + + isRoundStart = b.UpdateRoundTripCounter(lastAckedPacket) + minRttExpired = b.UpdateBandwidthAndMinRtt(eventTime, number, ackedBytes) + b.UpdateRecoveryState(false, isRoundStart) + bytesAcked := b.sampler.totalBytesAcked - totalBytesAckedBefore + excessAcked := b.UpdateAckAggregationBytes(eventTime, bytesAcked) + + // Handle logic specific to STARTUP and DRAIN modes. + if isRoundStart && !b.isAtFullBandwidth { + b.CheckIfFullBandwidthReached() + } + b.MaybeExitStartupOrDrain(eventTime) + + // Handle logic specific to PROBE_RTT. + b.MaybeEnterOrExitProbeRtt(eventTime, isRoundStart, minRttExpired) + + // After the model is updated, recalculate the pacing rate and congestion + // window. + b.CalculatePacingRate() + b.CalculateCongestionWindow(bytesAcked, excessAcked) + b.CalculateRecoveryWindow(bytesAcked, congestion.ByteCount(0)) +} + +func (b *bbrSender) OnPacketLost(number congestion.PacketNumber, lostBytes, priorInFlight congestion.ByteCount) { + eventTime := time.Now() + totalBytesAckedBefore := b.sampler.totalBytesAcked + isRoundStart, minRttExpired := false, false + + b.DiscardLostPackets(number, lostBytes) + + // Input the new data into the BBR model of the connection. + var excessAcked congestion.ByteCount + + // Handle logic specific to PROBE_BW mode. + if b.mode == PROBE_BW { + b.UpdateGainCyclePhase(time.Now(), priorInFlight, true) + } + + // Handle logic specific to STARTUP and DRAIN modes. + b.MaybeExitStartupOrDrain(eventTime) + + // Handle logic specific to PROBE_RTT. + b.MaybeEnterOrExitProbeRtt(eventTime, isRoundStart, minRttExpired) + + // Calculate number of packets acked and lost. + bytesAcked := b.sampler.totalBytesAcked - totalBytesAckedBefore + bytesLost := lostBytes + + // After the model is updated, recalculate the pacing rate and congestion + // window. + b.CalculatePacingRate() + b.CalculateCongestionWindow(bytesAcked, excessAcked) + b.CalculateRecoveryWindow(bytesAcked, bytesLost) +} + +//func (b *bbrSender) OnCongestionEvent(priorInFlight congestion.ByteCount, eventTime time.Time, ackedPackets, lostPackets []*congestion.Packet) { +// totalBytesAckedBefore := b.sampler.totalBytesAcked +// isRoundStart, minRttExpired := false, false +// +// if lostPackets != nil { +// b.DiscardLostPackets(lostPackets) +// } +// +// // Input the new data into the BBR model of the connection. +// var excessAcked congestion.ByteCount +// if len(ackedPackets) > 0 { +// lastAckedPacket := ackedPackets[len(ackedPackets)-1].PacketNumber +// isRoundStart = b.UpdateRoundTripCounter(lastAckedPacket) +// minRttExpired = b.UpdateBandwidthAndMinRtt(eventTime, ackedPackets) +// b.UpdateRecoveryState(lastAckedPacket, len(lostPackets) > 0, isRoundStart) +// bytesAcked := b.sampler.totalBytesAcked - totalBytesAckedBefore +// excessAcked = b.UpdateAckAggregationBytes(eventTime, bytesAcked) +// } +// +// // Handle logic specific to PROBE_BW mode. +// if b.mode == PROBE_BW { +// b.UpdateGainCyclePhase(eventTime, priorInFlight, len(lostPackets) > 0) +// } +// +// // Handle logic specific to STARTUP and DRAIN modes. +// if isRoundStart && !b.isAtFullBandwidth { +// b.CheckIfFullBandwidthReached() +// } +// b.MaybeExitStartupOrDrain(eventTime) +// +// // Handle logic specific to PROBE_RTT. +// b.MaybeEnterOrExitProbeRtt(eventTime, isRoundStart, minRttExpired) +// +// // Calculate number of packets acked and lost. +// bytesAcked := b.sampler.totalBytesAcked - totalBytesAckedBefore +// bytesLost := congestion.ByteCount(0) +// for _, packet := range lostPackets { +// bytesLost += packet.Length +// } +// +// // After the model is updated, recalculate the pacing rate and congestion +// // window. +// b.CalculatePacingRate() +// b.CalculateCongestionWindow(bytesAcked, excessAcked) +// b.CalculateRecoveryWindow(bytesAcked, bytesLost) +//} + +//func (b *bbrSender) SetNumEmulatedConnections(n int) { +// +//} + +func (b *bbrSender) OnRetransmissionTimeout(packetsRetransmitted bool) { +} + +//func (b *bbrSender) OnConnectionMigration() { +// +//} + +//// Experiments +//func (b *bbrSender) SetSlowStartLargeReduction(enabled bool) { +// +//} + +//func (b *bbrSender) BandwidthEstimate() Bandwidth { +// return Bandwidth(b.maxBandwidth.GetBest()) +//} + +// BandwidthEstimate returns the current bandwidth estimate +func (b *bbrSender) BandwidthEstimate() Bandwidth { + if b.rttStats == nil { + return infBandwidth + } + srtt := b.rttStats.SmoothedRTT() + if srtt == 0 { + // If we haven't measured an rtt, the bandwidth estimate is unknown. + return infBandwidth + } + return BandwidthFromDelta(b.GetCongestionWindow(), srtt) +} + +//func (b *bbrSender) HybridSlowStart() *HybridSlowStart { +// return nil +//} + +//func (b *bbrSender) SlowstartThreshold() congestion.ByteCount { +// return 0 +//} + +//func (b *bbrSender) RenoBeta() float32 { +// return 0.0 +//} + +func (b *bbrSender) InRecovery() bool { + return b.recoveryState != NOT_IN_RECOVERY +} + +func (b *bbrSender) InSlowStart() bool { + return b.mode == STARTUP +} + +//func (b *bbrSender) ShouldSendProbingPacket() bool { +// if b.pacingGain <= 1 { +// return false +// } +// // TODO(b/77975811): If the pipe is highly under-utilized, consider not +// // sending a probing transmission, because the extra bandwidth is not needed. +// // If flexible_app_limited is enabled, check if the pipe is sufficiently full. +// if b.flexibleAppLimited { +// return !b.IsPipeSufficientlyFull() +// } else { +// return true +// } +//} + +//func (b *bbrSender) IsPipeSufficientlyFull() bool { +// // See if we need more bytes in flight to see more bandwidth. +// if b.mode == STARTUP { +// // STARTUP exits if it doesn't observe a 25% bandwidth increase, so the CWND +// // must be more than 25% above the target. +// return b.GetBytesInFlight() >= b.GetTargetCongestionWindow(1.5) +// } +// if b.pacingGain > 1 { +// // Super-unity PROBE_BW doesn't exit until 1.25 * BDP is achieved. +// return b.GetBytesInFlight() >= b.GetTargetCongestionWindow(b.pacingGain) +// } +// // If bytes_in_flight are above the target congestion window, it should be +// // possible to observe the same or more bandwidth if it's available. +// return b.GetBytesInFlight() >= b.GetTargetCongestionWindow(1.1) +//} + +//func (b *bbrSender) SetFromConfig() { +// // TODO: not impl. +//} + +func (b *bbrSender) UpdateRoundTripCounter(lastAckedPacket congestion.PacketNumber) bool { + if b.currentRoundTripEnd == 0 || lastAckedPacket > b.currentRoundTripEnd { + b.currentRoundTripEnd = lastAckedPacket + b.roundTripCount++ + // if b.rttStats != nil && b.InSlowStart() { + // TODO: ++stats_->slowstart_num_rtts; + // } + return true + } + return false +} + +func (b *bbrSender) UpdateBandwidthAndMinRtt(now time.Time, number congestion.PacketNumber, ackedBytes congestion.ByteCount) bool { + sampleMinRtt := InfiniteRTT + + if !b.alwaysGetBwSampleWhenAcked && ackedBytes == 0 { + // Skip acked packets with 0 in flight bytes when updating bandwidth. + return false + } + bandwidthSample := b.sampler.OnPacketAcked(now, number) + if b.alwaysGetBwSampleWhenAcked && !bandwidthSample.stateAtSend.isValid { + // From the sampler's perspective, the packet has never been sent, or the + // packet has been acked or marked as lost previously. + return false + } + b.lastSampleIsAppLimited = bandwidthSample.stateAtSend.isAppLimited + // has_non_app_limited_sample_ |= + // !bandwidth_sample.state_at_send.is_app_limited; + if !bandwidthSample.stateAtSend.isAppLimited { + b.hasNoAppLimitedSample = true + } + if bandwidthSample.rtt > 0 { + sampleMinRtt = minRtt(sampleMinRtt, bandwidthSample.rtt) + } + if !bandwidthSample.stateAtSend.isAppLimited || bandwidthSample.bandwidth > b.BandwidthEstimate() { + b.maxBandwidth.Update(int64(bandwidthSample.bandwidth), b.roundTripCount) + } + + // If none of the RTT samples are valid, return immediately. + if sampleMinRtt == InfiniteRTT { + return false + } + + b.minRttSinceLastProbeRtt = minRtt(b.minRttSinceLastProbeRtt, sampleMinRtt) + // Do not expire min_rtt if none was ever available. + minRttExpired := b.minRtt > 0 && (now.After(b.minRttTimestamp.Add(MinRttExpiry))) + if minRttExpired || sampleMinRtt < b.minRtt || b.minRtt == 0 { + if minRttExpired && b.ShouldExtendMinRttExpiry() { + minRttExpired = false + } else { + b.minRtt = sampleMinRtt + } + b.minRttTimestamp = now + // Reset since_last_probe_rtt fields. + b.minRttSinceLastProbeRtt = InfiniteRTT + b.appLimitedSinceLastProbeRtt = false + } + + return minRttExpired +} + +func (b *bbrSender) ShouldExtendMinRttExpiry() bool { + if b.probeRttDisabledIfAppLimited && b.appLimitedSinceLastProbeRtt { + // Extend the current min_rtt if we've been app limited recently. + return true + } + + minRttIncreasedSinceLastProbe := b.minRttSinceLastProbeRtt > time.Duration(float64(b.minRtt)*SimilarMinRttThreshold) + if b.probeRttSkippedIfSimilarRtt && b.appLimitedSinceLastProbeRtt && !minRttIncreasedSinceLastProbe { + // Extend the current min_rtt if we've been app limited recently and an rtt + // has been measured in that time that's less than 12.5% more than the + // current min_rtt. + return true + } + + return false +} + +func (b *bbrSender) DiscardLostPackets(number congestion.PacketNumber, lostBytes congestion.ByteCount) { + b.sampler.OnPacketLost(number) + if b.mode == STARTUP { + // if b.rttStats != nil { + // TODO: slow start. + // } + if b.startupRateReductionMultiplier != 0 { + b.startupBytesLost += lostBytes + } + } +} + +func (b *bbrSender) UpdateRecoveryState(hasLosses, isRoundStart bool) { + // Exit recovery when there are no losses for a round. + if !hasLosses { + b.endRecoveryAt = b.lastSendPacket + } + switch b.recoveryState { + case NOT_IN_RECOVERY: + // Enter conservation on the first loss. + if hasLosses { + b.recoveryState = CONSERVATION + // This will cause the |recovery_window_| to be set to the correct + // value in CalculateRecoveryWindow(). + b.recoveryWindow = 0 + // Since the conservation phase is meant to be lasting for a whole + // round, extend the current round as if it were started right now. + b.currentRoundTripEnd = b.lastSendPacket + if false && b.lastSampleIsAppLimited { + b.isAppLimitedRecovery = true + } + } + case CONSERVATION: + if isRoundStart { + b.recoveryState = GROWTH + } + fallthrough + case GROWTH: + // Exit recovery if appropriate. + if !hasLosses && b.lastSendPacket > b.endRecoveryAt { + b.recoveryState = NOT_IN_RECOVERY + b.isAppLimitedRecovery = false + } + } + + if b.recoveryState != NOT_IN_RECOVERY && b.isAppLimitedRecovery { + b.sampler.OnAppLimited() + } +} + +func (b *bbrSender) UpdateAckAggregationBytes(ackTime time.Time, ackedBytes congestion.ByteCount) congestion.ByteCount { + // Compute how many bytes are expected to be delivered, assuming max bandwidth + // is correct. + expectedAckedBytes := congestion.ByteCount(b.maxBandwidth.GetBest()) * + congestion.ByteCount((ackTime.Sub(b.aggregationEpochStartTime))) + // Reset the current aggregation epoch as soon as the ack arrival rate is less + // than or equal to the max bandwidth. + if b.aggregationEpochBytes <= expectedAckedBytes { + // Reset to start measuring a new aggregation epoch. + b.aggregationEpochBytes = ackedBytes + b.aggregationEpochStartTime = ackTime + return 0 + } + // Compute how many extra bytes were delivered vs max bandwidth. + // Include the bytes most recently acknowledged to account for stretch acks. + b.aggregationEpochBytes += ackedBytes + b.maxAckHeight.Update(int64(b.aggregationEpochBytes-expectedAckedBytes), b.roundTripCount) + return b.aggregationEpochBytes - expectedAckedBytes +} + +func (b *bbrSender) UpdateGainCyclePhase(now time.Time, priorInFlight congestion.ByteCount, hasLosses bool) { + bytesInFlight := b.GetBytesInFlight() + // In most cases, the cycle is advanced after an RTT passes. + shouldAdvanceGainCycling := now.Sub(b.lastCycleStart) > b.GetMinRtt() + + // If the pacing gain is above 1.0, the connection is trying to probe the + // bandwidth by increasing the number of bytes in flight to at least + // pacing_gain * BDP. Make sure that it actually reaches the target, as long + // as there are no losses suggesting that the buffers are not able to hold + // that much. + if b.pacingGain > 1.0 && !hasLosses && priorInFlight < b.GetTargetCongestionWindow(b.pacingGain) { + shouldAdvanceGainCycling = false + } + // If pacing gain is below 1.0, the connection is trying to drain the extra + // queue which could have been incurred by probing prior to it. If the number + // of bytes in flight falls down to the estimated BDP value earlier, conclude + // that the queue has been successfully drained and exit this cycle early. + if b.pacingGain < 1.0 && bytesInFlight <= b.GetTargetCongestionWindow(1.0) { + shouldAdvanceGainCycling = true + } + + if shouldAdvanceGainCycling { + b.cycleCurrentOffset = (b.cycleCurrentOffset + 1) % GainCycleLength + b.lastCycleStart = now + // Stay in low gain mode until the target BDP is hit. + // Low gain mode will be exited immediately when the target BDP is achieved. + if b.drainToTarget && b.pacingGain < 1.0 && PacingGain[b.cycleCurrentOffset] == 1.0 && + bytesInFlight > b.GetTargetCongestionWindow(1.0) { + return + } + b.pacingGain = PacingGain[b.cycleCurrentOffset] + } +} + +func (b *bbrSender) GetTargetCongestionWindow(gain float64) congestion.ByteCount { + bdp := congestion.ByteCount(b.GetMinRtt()) * congestion.ByteCount(b.BandwidthEstimate()) + congestionWindow := congestion.ByteCount(gain * float64(bdp)) + + // BDP estimate will be zero if no bandwidth samples are available yet. + if congestionWindow == 0 { + congestionWindow = congestion.ByteCount(gain * float64(b.initialCongestionWindow)) + } + + return maxByteCount(congestionWindow, b.minCongestionWindow()) +} + +func (b *bbrSender) CheckIfFullBandwidthReached() { + if b.lastSampleIsAppLimited { + return + } + + target := Bandwidth(float64(b.bandwidthAtLastRound) * StartupGrowthTarget) + if b.BandwidthEstimate() >= target { + b.bandwidthAtLastRound = b.BandwidthEstimate() + b.roundsWithoutBandwidthGain = 0 + if b.expireAckAggregationInStartup { + // Expire old excess delivery measurements now that bandwidth increased. + b.maxAckHeight.Reset(0, b.roundTripCount) + } + return + } + b.roundsWithoutBandwidthGain++ + if b.roundsWithoutBandwidthGain >= b.numStartupRtts || (b.exitStartupOnLoss && b.InRecovery()) { + b.isAtFullBandwidth = true + } +} + +func (b *bbrSender) MaybeExitStartupOrDrain(now time.Time) { + if b.mode == STARTUP && b.isAtFullBandwidth { + b.OnExitStartup(now) + b.mode = DRAIN + b.pacingGain = b.drainGain + b.congestionWindowGain = b.highCwndGain + } + if b.mode == DRAIN && b.GetBytesInFlight() <= b.GetTargetCongestionWindow(1) { + b.EnterProbeBandwidthMode(now) + } +} + +func (b *bbrSender) EnterProbeBandwidthMode(now time.Time) { + b.mode = PROBE_BW + b.congestionWindowGain = b.congestionWindowGainConst + + // Pick a random offset for the gain cycle out of {0, 2..7} range. 1 is + // excluded because in that case increased gain and decreased gain would not + // follow each other. + b.cycleCurrentOffset = fastrand.Int() % (GainCycleLength - 1) + if b.cycleCurrentOffset >= 1 { + b.cycleCurrentOffset += 1 + } + + b.lastCycleStart = now + b.pacingGain = PacingGain[b.cycleCurrentOffset] +} + +func (b *bbrSender) MaybeEnterOrExitProbeRtt(now time.Time, isRoundStart, minRttExpired bool) { + if minRttExpired && !b.exitingQuiescence && b.mode != PROBE_RTT { + if b.InSlowStart() { + b.OnExitStartup(now) + } + b.mode = PROBE_RTT + b.pacingGain = 1.0 + // Do not decide on the time to exit PROBE_RTT until the |bytes_in_flight| + // is at the target small value. + b.exitProbeRttAt = time.Time{} + } + + if b.mode == PROBE_RTT { + b.sampler.OnAppLimited() + if b.exitProbeRttAt.IsZero() { + // If the window has reached the appropriate size, schedule exiting + // PROBE_RTT. The CWND during PROBE_RTT is kMinimumCongestionWindow, but + // we allow an extra packet since QUIC checks CWND before sending a + // packet. + if b.GetBytesInFlight() < b.ProbeRttCongestionWindow()+b.maxDatagramSize { + b.exitProbeRttAt = now.Add(ProbeRttTime) + b.probeRttRoundPassed = false + } + } else { + if isRoundStart { + b.probeRttRoundPassed = true + } + if !now.Before(b.exitProbeRttAt) && b.probeRttRoundPassed { + b.minRttTimestamp = now + if !b.isAtFullBandwidth { + b.EnterStartupMode(now) + } else { + b.EnterProbeBandwidthMode(now) + } + } + } + } + b.exitingQuiescence = false +} + +func (b *bbrSender) ProbeRttCongestionWindow() congestion.ByteCount { + if b.probeRttBasedOnBdp { + return b.GetTargetCongestionWindow(ModerateProbeRttMultiplier) + } else { + return b.minCongestionWindow() + } +} + +func (b *bbrSender) EnterStartupMode(now time.Time) { + // if b.rttStats != nil { + // TODO: slow start. + // } + b.mode = STARTUP + b.pacingGain = b.highGain + b.congestionWindowGain = b.highCwndGain +} + +func (b *bbrSender) OnExitStartup(now time.Time) { + if b.rttStats == nil { + return + } + // TODO: slow start. +} + +func (b *bbrSender) CalculatePacingRate() { + if b.BandwidthEstimate() == 0 { + return + } + + targetRate := Bandwidth(b.pacingGain * float64(b.BandwidthEstimate())) + if b.isAtFullBandwidth { + b.pacingRate = targetRate + return + } + + // Pace at the rate of initial_window / RTT as soon as RTT measurements are + // available. + if b.pacingRate == 0 && b.rttStats.MinRTT() > 0 { + b.pacingRate = BandwidthFromDelta(b.initialCongestionWindow, b.rttStats.MinRTT()) + return + } + // Slow the pacing rate in STARTUP once loss has ever been detected. + hasEverDetectedLoss := b.endRecoveryAt > 0 + if b.slowerStartup && hasEverDetectedLoss && b.hasNoAppLimitedSample { + b.pacingRate = Bandwidth(StartupAfterLossGain * float64(b.BandwidthEstimate())) + return + } + + // Slow the pacing rate in STARTUP by the bytes_lost / CWND. + if b.startupRateReductionMultiplier != 0 && hasEverDetectedLoss && b.hasNoAppLimitedSample { + b.pacingRate = Bandwidth((1.0 - (float64(b.startupBytesLost) * float64(b.startupRateReductionMultiplier) / float64(b.congestionWindow))) * float64(targetRate)) + // Ensure the pacing rate doesn't drop below the startup growth target times + // the bandwidth estimate. + b.pacingRate = maxBandwidth(b.pacingRate, Bandwidth(StartupGrowthTarget*float64(b.BandwidthEstimate()))) + return + } + + // Do not decrease the pacing rate during startup. + b.pacingRate = maxBandwidth(b.pacingRate, targetRate) +} + +func (b *bbrSender) CalculateCongestionWindow(ackedBytes, excessAcked congestion.ByteCount) { + if b.mode == PROBE_RTT { + return + } + + targetWindow := b.GetTargetCongestionWindow(b.congestionWindowGain) + if b.isAtFullBandwidth { + // Add the max recently measured ack aggregation to CWND. + targetWindow += congestion.ByteCount(b.maxAckHeight.GetBest()) + } else if b.enableAckAggregationDuringStartup { + // Add the most recent excess acked. Because CWND never decreases in + // STARTUP, this will automatically create a very localized max filter. + targetWindow += excessAcked + } + + // Instead of immediately setting the target CWND as the new one, BBR grows + // the CWND towards |target_window| by only increasing it |bytes_acked| at a + // time. + addBytesAcked := true || !b.InRecovery() + if b.isAtFullBandwidth { + b.congestionWindow = minByteCount(targetWindow, b.congestionWindow+ackedBytes) + } else if addBytesAcked && (b.congestionWindow < targetWindow || b.sampler.totalBytesAcked < b.initialCongestionWindow) { + // If the connection is not yet out of startup phase, do not decrease the + // window. + b.congestionWindow += ackedBytes + } + + // Enforce the limits on the congestion window. + b.congestionWindow = maxByteCount(b.congestionWindow, b.minCongestionWindow()) + b.congestionWindow = minByteCount(b.congestionWindow, b.maxCongestionWindow()) +} + +func (b *bbrSender) CalculateRecoveryWindow(ackedBytes, lostBytes congestion.ByteCount) { + if b.rateBasedStartup && b.mode == STARTUP { + return + } + + if b.recoveryState == NOT_IN_RECOVERY { + return + } + + // Set up the initial recovery window. + if b.recoveryWindow == 0 { + b.recoveryWindow = maxByteCount(b.GetBytesInFlight()+ackedBytes, b.minCongestionWindow()) + return + } + + // Remove losses from the recovery window, while accounting for a potential + // integer underflow. + if b.recoveryWindow >= lostBytes { + b.recoveryWindow -= lostBytes + } else { + b.recoveryWindow = congestion.ByteCount(b.maxDatagramSize) + } + // In CONSERVATION mode, just subtracting losses is sufficient. In GROWTH, + // release additional |bytes_acked| to achieve a slow-start-like behavior. + if b.recoveryState == GROWTH { + b.recoveryWindow += ackedBytes + } + // Sanity checks. Ensure that we always allow to send at least an MSS or + // |bytes_acked| in response, whichever is larger. + b.recoveryWindow = maxByteCount(b.recoveryWindow, b.GetBytesInFlight()+ackedBytes) + b.recoveryWindow = maxByteCount(b.recoveryWindow, b.minCongestionWindow()) +} + +var _ congestion.CongestionControl = (*bbrSender)(nil) + +func (b *bbrSender) GetMinRtt() time.Duration { + if b.minRtt > 0 { + return b.minRtt + } else { + return InitialRtt + } +} + +func minRtt(a, b time.Duration) time.Duration { + if a < b { + return a + } else { + return b + } +} + +func minBandwidth(a, b Bandwidth) Bandwidth { + if a < b { + return a + } else { + return b + } +} + +func maxBandwidth(a, b Bandwidth) Bandwidth { + if a > b { + return a + } else { + return b + } +} + +func maxByteCount(a, b congestion.ByteCount) congestion.ByteCount { + if a > b { + return a + } else { + return b + } +} + +func minByteCount(a, b congestion.ByteCount) congestion.ByteCount { + if a < b { + return a + } else { + return b + } +} + +var InfiniteRTT = time.Duration(math.MaxInt64) diff --git a/core/internal/congestion/bbr/clock.go b/core/internal/congestion/bbr/clock.go new file mode 100644 index 0000000..a66344f --- /dev/null +++ b/core/internal/congestion/bbr/clock.go @@ -0,0 +1,18 @@ +package bbr + +import "time" + +// A Clock returns the current time +type Clock interface { + Now() time.Time +} + +// DefaultClock implements the Clock interface using the Go stdlib clock. +type DefaultClock struct{} + +var _ Clock = DefaultClock{} + +// Now gets the current time +func (DefaultClock) Now() time.Time { + return time.Now() +} diff --git a/core/internal/congestion/bbr/windowed_filter.go b/core/internal/congestion/bbr/windowed_filter.go new file mode 100644 index 0000000..92a41aa --- /dev/null +++ b/core/internal/congestion/bbr/windowed_filter.go @@ -0,0 +1,132 @@ +package bbr + +// WindowedFilter Use the following to construct a windowed filter object of type T. +// For example, a min filter using QuicTime as the time type: +// +// WindowedFilter, QuicTime, QuicTime::Delta> ObjectName; +// +// A max filter using 64-bit integers as the time type: +// +// WindowedFilter, uint64_t, int64_t> ObjectName; +// +// Specifically, this template takes four arguments: +// 1. T -- type of the measurement that is being filtered. +// 2. Compare -- MinFilter or MaxFilter, depending on the type of filter +// desired. +// 3. TimeT -- the type used to represent timestamps. +// 4. TimeDeltaT -- the type used to represent continuous time intervals between +// two timestamps. Has to be the type of (a - b) if both |a| and |b| are +// of type TimeT. +type WindowedFilter struct { + // Time length of window. + windowLength int64 + estimates []Sample + comparator func(int64, int64) bool +} + +type Sample struct { + sample int64 + time int64 +} + +// Compares two values and returns true if the first is greater than or equal +// to the second. +func MaxFilter(a, b int64) bool { + return a >= b +} + +// Compares two values and returns true if the first is less than or equal +// to the second. +func MinFilter(a, b int64) bool { + return a <= b +} + +func NewWindowedFilter(windowLength int64, comparator func(int64, int64) bool) *WindowedFilter { + return &WindowedFilter{ + windowLength: windowLength, + estimates: make([]Sample, 3), + comparator: comparator, + } +} + +// Changes the window length. Does not update any current samples. +func (f *WindowedFilter) SetWindowLength(windowLength int64) { + f.windowLength = windowLength +} + +func (f *WindowedFilter) GetBest() int64 { + return f.estimates[0].sample +} + +func (f *WindowedFilter) GetSecondBest() int64 { + return f.estimates[1].sample +} + +func (f *WindowedFilter) GetThirdBest() int64 { + return f.estimates[2].sample +} + +func (f *WindowedFilter) Update(sample, time int64) { + if f.estimates[0].time == 0 || f.comparator(sample, f.estimates[0].sample) || (time-f.estimates[2].time) > f.windowLength { + f.Reset(sample, time) + return + } + + if f.comparator(sample, f.estimates[1].sample) { + f.estimates[1].sample = sample + f.estimates[1].time = time + f.estimates[2].sample = sample + f.estimates[2].time = time + } else if f.comparator(sample, f.estimates[2].sample) { + f.estimates[2].sample = sample + f.estimates[2].time = time + } + + // Expire and update estimates as necessary. + if time-f.estimates[0].time > f.windowLength { + // The best estimate hasn't been updated for an entire window, so promote + // second and third best estimates. + f.estimates[0].sample = f.estimates[1].sample + f.estimates[0].time = f.estimates[1].time + f.estimates[1].sample = f.estimates[2].sample + f.estimates[1].time = f.estimates[2].time + f.estimates[2].sample = sample + f.estimates[2].time = time + // Need to iterate one more time. Check if the new best estimate is + // outside the window as well, since it may also have been recorded a + // long time ago. Don't need to iterate once more since we cover that + // case at the beginning of the method. + if time-f.estimates[0].time > f.windowLength { + f.estimates[0].sample = f.estimates[1].sample + f.estimates[0].time = f.estimates[1].time + f.estimates[1].sample = f.estimates[2].sample + f.estimates[1].time = f.estimates[2].time + } + return + } + if f.estimates[1].sample == f.estimates[0].sample && time-f.estimates[1].time > f.windowLength>>2 { + // A quarter of the window has passed without a better sample, so the + // second-best estimate is taken from the second quarter of the window. + f.estimates[1].sample = sample + f.estimates[1].time = time + f.estimates[2].sample = sample + f.estimates[2].time = time + return + } + + if f.estimates[2].sample == f.estimates[1].sample && time-f.estimates[2].time > f.windowLength>>1 { + // We've passed a half of the window without a better estimate, so take + // a third-best estimate from the second half of the window. + f.estimates[2].sample = sample + f.estimates[2].time = time + } +} + +func (f *WindowedFilter) Reset(newSample, newTime int64) { + f.estimates[0].sample = newSample + f.estimates[0].time = newTime + f.estimates[1].sample = newSample + f.estimates[1].time = newTime + f.estimates[2].sample = newSample + f.estimates[2].time = newTime +} diff --git a/core/internal/congestion/brutal.go b/core/internal/congestion/brutal/brutal.go similarity index 93% rename from core/internal/congestion/brutal.go rename to core/internal/congestion/brutal/brutal.go index c5aba50..0f441b3 100644 --- a/core/internal/congestion/brutal.go +++ b/core/internal/congestion/brutal/brutal.go @@ -1,14 +1,14 @@ -package congestion +package brutal import ( "time" + "github.com/apernet/hysteria/core/internal/congestion/common" + "github.com/apernet/quic-go/congestion" ) const ( - initMaxDatagramSize = 1252 - pktInfoSlotCount = 4 minSampleCount = 50 minAckRate = 0.8 @@ -20,7 +20,7 @@ type BrutalSender struct { rttStats congestion.RTTStatsProvider bps congestion.ByteCount maxDatagramSize congestion.ByteCount - pacer *pacer + pacer *common.Pacer pktInfoSlots [pktInfoSlotCount]pktInfo ackRate float64 @@ -35,10 +35,10 @@ type pktInfo struct { func NewBrutalSender(bps uint64) *BrutalSender { bs := &BrutalSender{ bps: congestion.ByteCount(bps), - maxDatagramSize: initMaxDatagramSize, + maxDatagramSize: common.InitMaxDatagramSize, ackRate: 1, } - bs.pacer = newPacer(func() congestion.ByteCount { + bs.pacer = common.NewPacer(func() congestion.ByteCount { return congestion.ByteCount(float64(bs.bps) / bs.ackRate) }) return bs @@ -142,10 +142,3 @@ func (b *BrutalSender) InRecovery() bool { func (b *BrutalSender) MaybeExitSlowStart() {} func (b *BrutalSender) OnRetransmissionTimeout(packetsRetransmitted bool) {} - -func maxDuration(a, b time.Duration) time.Duration { - if a > b { - return a - } - return b -} diff --git a/core/internal/congestion/pacer.go b/core/internal/congestion/common/pacer.go similarity index 69% rename from core/internal/congestion/pacer.go rename to core/internal/congestion/common/pacer.go index 80e3675..5b5079c 100644 --- a/core/internal/congestion/pacer.go +++ b/core/internal/congestion/common/pacer.go @@ -1,4 +1,4 @@ -package congestion +package common import ( "math" @@ -8,28 +8,30 @@ import ( ) const ( + InitMaxDatagramSize = 1252 + maxBurstPackets = 10 minPacingDelay = time.Millisecond ) -// The pacer implements a token bucket pacing algorithm. -type pacer struct { +// Pacer implements a token bucket pacing algorithm. +type Pacer struct { budgetAtLastSent congestion.ByteCount maxDatagramSize congestion.ByteCount lastSentTime time.Time getBandwidth func() congestion.ByteCount // in bytes/s } -func newPacer(getBandwidth func() congestion.ByteCount) *pacer { - p := &pacer{ - budgetAtLastSent: maxBurstPackets * initMaxDatagramSize, - maxDatagramSize: initMaxDatagramSize, +func NewPacer(getBandwidth func() congestion.ByteCount) *Pacer { + p := &Pacer{ + budgetAtLastSent: maxBurstPackets * InitMaxDatagramSize, + maxDatagramSize: InitMaxDatagramSize, getBandwidth: getBandwidth, } return p } -func (p *pacer) SentPacket(sendTime time.Time, size congestion.ByteCount) { +func (p *Pacer) SentPacket(sendTime time.Time, size congestion.ByteCount) { budget := p.Budget(sendTime) if size > budget { p.budgetAtLastSent = 0 @@ -39,7 +41,7 @@ func (p *pacer) SentPacket(sendTime time.Time, size congestion.ByteCount) { p.lastSentTime = sendTime } -func (p *pacer) Budget(now time.Time) congestion.ByteCount { +func (p *Pacer) Budget(now time.Time) congestion.ByteCount { if p.lastSentTime.IsZero() { return p.maxBurstSize() } @@ -47,7 +49,7 @@ func (p *pacer) Budget(now time.Time) congestion.ByteCount { return minByteCount(p.maxBurstSize(), budget) } -func (p *pacer) maxBurstSize() congestion.ByteCount { +func (p *Pacer) maxBurstSize() congestion.ByteCount { return maxByteCount( congestion.ByteCount((minPacingDelay+time.Millisecond).Nanoseconds())*p.getBandwidth()/1e9, maxBurstPackets*p.maxDatagramSize, @@ -56,7 +58,7 @@ func (p *pacer) maxBurstSize() congestion.ByteCount { // TimeUntilSend returns when the next packet should be sent. // It returns the zero value of time.Time if a packet can be sent immediately. -func (p *pacer) TimeUntilSend() time.Time { +func (p *Pacer) TimeUntilSend() time.Time { if p.budgetAtLastSent >= p.maxDatagramSize { return time.Time{} } @@ -67,7 +69,7 @@ func (p *pacer) TimeUntilSend() time.Time { )) } -func (p *pacer) SetMaxDatagramSize(s congestion.ByteCount) { +func (p *Pacer) SetMaxDatagramSize(s congestion.ByteCount) { p.maxDatagramSize = s } @@ -84,3 +86,10 @@ func minByteCount(a, b congestion.ByteCount) congestion.ByteCount { } return b } + +func maxDuration(a, b time.Duration) time.Duration { + if a > b { + return a + } + return b +} diff --git a/core/server/server.go b/core/server/server.go index 202892e..82e2168 100644 --- a/core/server/server.go +++ b/core/server/server.go @@ -9,7 +9,9 @@ import ( "github.com/apernet/quic-go" "github.com/apernet/quic-go/http3" - "github.com/apernet/hysteria/core/internal/congestion" + "github.com/apernet/hysteria/core/internal/congestion/bbr" + "github.com/apernet/hysteria/core/internal/congestion/brutal" + "github.com/apernet/hysteria/core/internal/congestion/common" "github.com/apernet/hysteria/core/internal/protocol" "github.com/apernet/hysteria/core/internal/utils" ) @@ -126,9 +128,16 @@ func (h *h3sHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Set authenticated flag h.authenticated = true h.authID = id - // Update congestion control when applicable + // Use Brutal CC if actualTx > 0, otherwise use BBR if actualTx > 0 { - h.conn.SetCongestionControl(congestion.NewBrutalSender(actualTx)) + h.conn.SetCongestionControl(brutal.NewBrutalSender(actualTx)) + } else { + h.conn.SetCongestionControl(bbr.NewBBRSender( + bbr.DefaultClock{}, + bbr.GetInitialPacketSize(h.conn.RemoteAddr()), + 32*common.InitMaxDatagramSize, + bbr.DefaultBBRMaxCongestionWindow*common.InitMaxDatagramSize, + )) } // Auth OK, send response protocol.AuthResponseDataToHeader(w.Header(), !h.config.DisableUDP, h.config.BandwidthConfig.MaxRx) diff --git a/extras/go.mod b/extras/go.mod index 45d7689..2808707 100644 --- a/extras/go.mod +++ b/extras/go.mod @@ -20,6 +20,7 @@ require ( github.com/quic-go/qpack v0.4.0 // indirect github.com/quic-go/qtls-go1-20 v0.3.1 // indirect github.com/stretchr/objx v0.5.0 // indirect + github.com/zhangyunhao116/fastrand v0.3.0 // indirect golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/net v0.12.0 // indirect diff --git a/extras/go.sum b/extras/go.sum index ba406d1..06846ae 100644 --- a/extras/go.sum +++ b/extras/go.sum @@ -39,6 +39,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/zhangyunhao116/fastrand v0.3.0 h1:7bwe124xcckPulX6fxtr2lFdO2KQqaefdtbk+mqO/Ig= +github.com/zhangyunhao116/fastrand v0.3.0/go.mod h1:0v5KgHho0VE6HU192HnY15de/oDS8UrbBChIFjIhBtc= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=