Skip to content

Commit b8730a5

Browse files
committed
auto-adjust experiment
1 parent 80a149b commit b8730a5

File tree

4 files changed

+126
-11
lines changed

4 files changed

+126
-11
lines changed

bitswap/client/internal/messagequeue/messagequeue.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/filecoin-project/go-clock"
10+
"github.com/ipfs/boxo/bitswap/client/internal/messagequeue/ravg"
1011
bswl "github.com/ipfs/boxo/bitswap/client/wantlist"
1112
bsmsg "github.com/ipfs/boxo/bitswap/message"
1213
pb "github.com/ipfs/boxo/bitswap/message/pb"
@@ -43,8 +44,9 @@ const (
4344
// when we reach sendMessageCutoff wants/cancels, we'll send the message immediately.
4445
sendMessageCutoff = 256
4546
// wait this long before sending next message
46-
sendMessageDelay = 20 * time.Millisecond
47-
sendTimeout = 30 * time.Second
47+
minSendMessageDelay = 20 * time.Millisecond
48+
maxSendMessageDelay = 200 * time.Millisecond
49+
sendTimeout = 30 * time.Second
4850
)
4951

5052
// MessageNetwork is any network that can connect peers and generate a message
@@ -453,6 +455,8 @@ func (mq *MessageQueue) onShutdown() {
453455
func (mq *MessageQueue) runQueue() {
454456
const runRebroadcastsInterval = rebroadcastInterval / 2
455457

458+
avg := ravg.New[int](10)
459+
456460
defer mq.onShutdown()
457461

458462
// Create a timer for debouncing scheduled work.
@@ -481,9 +485,17 @@ func (mq *MessageQueue) runQueue() {
481485
if mq.events != nil {
482486
mq.events <- messageQueued
483487
}
488+
489+
pendingCount := mq.pendingWorkCount()
490+
avg.Put(pendingCount)
491+
delay := time.Duration(avg.Mean()) * time.Millisecond / 8
492+
delay = min(delay, maxSendMessageDelay)
493+
delay = max(delay, minSendMessageDelay)
494+
log.Errorf("Setting send delay", "delay", delay.String(), "count", pendingCount)
495+
484496
mq.sendMessage()
485497
hasWorkChan = nil
486-
scheduleWork.Reset(sendMessageDelay)
498+
scheduleWork.Reset(delay)
487499

488500
case <-scheduleWork.C:
489501
hasWorkChan = mq.outgoingWork

bitswap/client/internal/messagequeue/messagequeue_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ func TestWantlistRebroadcast(t *testing.T) {
423423
// Add some broadcast want-haves
424424
messageQueue.Startup()
425425
messageQueue.AddBroadcastWantHaves(bcstwh)
426-
clock.Add(sendMessageDelay)
426+
clock.Add(maxSendMessageDelay)
427427
expectEvent(t, events, messageQueued)
428428
message := <-messagesSent
429429
expectEvent(t, events, messageFinishedSending)
@@ -444,7 +444,7 @@ func TestWantlistRebroadcast(t *testing.T) {
444444

445445
// Send out some regular wants and collect them
446446
messageQueue.AddWants(wantBlocks, wantHaves)
447-
clock.Add(sendMessageDelay)
447+
clock.Add(maxSendMessageDelay)
448448
expectEvent(t, events, messageQueued)
449449
clock.Add(10 * time.Millisecond)
450450
message = <-messagesSent
@@ -474,7 +474,7 @@ func TestWantlistRebroadcast(t *testing.T) {
474474
// Cancel some of the wants
475475
cancels := append([]cid.Cid{bcstwh[0]}, wantHaves[0], wantBlocks[0])
476476
messageQueue.AddCancels(cancels)
477-
clock.Add(sendMessageDelay)
477+
clock.Add(maxSendMessageDelay)
478478
expectEvent(t, events, messageQueued)
479479
clock.Add(10 * time.Millisecond)
480480
message = <-messagesSent
@@ -637,7 +637,7 @@ func TestResponseReceived(t *testing.T) {
637637

638638
// Add some wants
639639
messageQueue.AddWants(cids[:5], nil)
640-
clock.Add(sendMessageDelay)
640+
clock.Add(maxSendMessageDelay)
641641
expectEvent(t, events, messageQueued)
642642
<-messagesSent
643643
expectEvent(t, events, messageFinishedSending)
@@ -647,7 +647,7 @@ func TestResponseReceived(t *testing.T) {
647647

648648
// Add some wants and wait another 10ms
649649
messageQueue.AddWants(cids[5:8], nil)
650-
clock.Add(sendMessageDelay)
650+
clock.Add(maxSendMessageDelay)
651651
expectEvent(t, events, messageQueued)
652652
clock.Add(10 * time.Millisecond)
653653
<-messagesSent
@@ -664,7 +664,7 @@ func TestResponseReceived(t *testing.T) {
664664
}
665665
// Elapsed time should be between when the first want was sent and the
666666
// response received (about 20ms)
667-
if upds[0] != sendMessageDelay+20*time.Millisecond {
667+
if upds[0] != maxSendMessageDelay+20*time.Millisecond {
668668
t.Fatalf("expected latency to be time since oldest message sent, was %s", upds[0].String())
669669
}
670670
}
@@ -732,7 +732,7 @@ func TestResponseReceivedDiscardsOutliers(t *testing.T) {
732732

733733
// Add some wants and wait 20ms
734734
messageQueue.AddWants(cids[:2], nil)
735-
clock.Add(sendMessageDelay)
735+
clock.Add(maxSendMessageDelay)
736736
expectEvent(t, events, messageQueued)
737737
<-messagesSent
738738
expectEvent(t, events, messageFinishedSending)
@@ -742,7 +742,7 @@ func TestResponseReceivedDiscardsOutliers(t *testing.T) {
742742
// Add some more wants and wait long enough that the first wants will be
743743
// outside the maximum valid latency, but the second wants will be inside
744744
messageQueue.AddWants(cids[2:], nil)
745-
clock.Add(sendMessageDelay)
745+
clock.Add(maxSendMessageDelay)
746746
expectEvent(t, events, messageQueued)
747747
<-messagesSent
748748
expectEvent(t, events, messageFinishedSending)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package ravg
2+
3+
import (
4+
"golang.org/x/exp/constraints"
5+
)
6+
7+
type Number interface {
8+
constraints.Integer | constraints.Float
9+
}
10+
11+
type RAvg[T Number] struct {
12+
samples []T
13+
next int
14+
full bool
15+
}
16+
17+
func New[T Number](size int) *RAvg[T] {
18+
return &RAvg[T]{
19+
samples: make([]T, size),
20+
}
21+
}
22+
23+
func (r *RAvg[T]) Len() int {
24+
return len(r.samples)
25+
}
26+
27+
func (r *RAvg[T]) Put(sample T) {
28+
r.samples[r.next] = sample
29+
r.next++
30+
if r.next == len(r.samples) {
31+
r.next = 0
32+
r.full = true
33+
}
34+
}
35+
36+
func (r *RAvg[T]) Mean() T {
37+
size, sum := r.mean()
38+
return sum / T(size)
39+
}
40+
41+
func (r *RAvg[T]) FMean() float64 {
42+
size, sum := r.mean()
43+
return float64(sum) / float64(size)
44+
}
45+
46+
func (r *RAvg[T]) mean() (int, T) {
47+
size := len(r.samples)
48+
if !r.full {
49+
size = r.next
50+
}
51+
var sum T
52+
for i := 0; i < size; i++ {
53+
sum += r.samples[i]
54+
}
55+
return size, sum
56+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package ravg
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func TestRAvgInt(t *testing.T) {
10+
avg := New[int](10)
11+
require.Equal(t, 10, avg.Len())
12+
13+
avg.Put(1)
14+
avg.Put(2)
15+
avg.Put(3)
16+
17+
require.Equal(t, 2, avg.Mean())
18+
19+
for i := 0; i < 10; i++ {
20+
for j := 1; j <= avg.Len(); j++ {
21+
avg.Put(j)
22+
}
23+
}
24+
require.Equal(t, 5, avg.Mean())
25+
26+
require.Equal(t, 5.5, avg.FMean())
27+
}
28+
29+
func TestRAvgFloat(t *testing.T) {
30+
avg := New[float64](10)
31+
require.Equal(t, 10, avg.Len())
32+
33+
avg.Put(1)
34+
avg.Put(2)
35+
avg.Put(3)
36+
37+
require.Equal(t, 2.0, avg.Mean())
38+
39+
for i := 0; i < 10; i++ {
40+
for j := 1; j <= avg.Len(); j++ {
41+
avg.Put(float64(j))
42+
}
43+
}
44+
require.Equal(t, 5.5, avg.Mean())
45+
46+
require.Equal(t, 5.5, avg.FMean())
47+
}

0 commit comments

Comments
 (0)