Skip to content

Commit 09d223a

Browse files
authored
Merge pull request #183 from calloway-jacob/fix-confirm-receipt-early
Fix race condition on confirms
2 parents e4711f3 + 65674cf commit 09d223a

File tree

3 files changed

+37
-12
lines changed

3 files changed

+37
-12
lines changed

channel.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1435,6 +1435,11 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex
14351435
ch.m.Lock()
14361436
defer ch.m.Unlock()
14371437

1438+
var dc *DeferredConfirmation
1439+
if ch.confirming {
1440+
dc = ch.confirms.publish()
1441+
}
1442+
14381443
if err := ch.send(&basicPublish{
14391444
Exchange: exchange,
14401445
RoutingKey: key,
@@ -1457,14 +1462,13 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex
14571462
AppId: msg.AppId,
14581463
},
14591464
}); err != nil {
1465+
if ch.confirming {
1466+
ch.confirms.unpublish()
1467+
}
14601468
return nil, err
14611469
}
14621470

1463-
if ch.confirming {
1464-
return ch.confirms.Publish(), nil
1465-
}
1466-
1467-
return nil, nil
1471+
return dc, nil
14681472
}
14691473

14701474
/*

confirms.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,23 @@ func (c *confirms) Listen(l chan Confirmation) {
3939
}
4040

4141
// Publish increments the publishing counter
42-
func (c *confirms) Publish() *DeferredConfirmation {
42+
func (c *confirms) publish() *DeferredConfirmation {
4343
c.publishedMut.Lock()
4444
defer c.publishedMut.Unlock()
4545

4646
c.published++
4747
return c.deferredConfirmations.Add(c.published)
4848
}
4949

50+
// unpublish decrements the publishing counter and removes the
51+
// DeferredConfirmation. It must be called immediately after a publish fails.
52+
func (c *confirms) unpublish() {
53+
c.publishedMut.Lock()
54+
defer c.publishedMut.Unlock()
55+
c.deferredConfirmations.remove(c.published)
56+
c.published--
57+
}
58+
5059
// confirm confirms one publishing, increments the expecting delivery tag, and
5160
// removes bookkeeping for that delivery tag.
5261
func (c *confirms) confirm(confirmation Confirmation) {
@@ -135,6 +144,18 @@ func (d *deferredConfirmations) Add(tag uint64) *DeferredConfirmation {
135144
return dc
136145
}
137146

147+
// remove is only used to drop a tag whose publish failed
148+
func (d *deferredConfirmations) remove(tag uint64) {
149+
d.m.Lock()
150+
defer d.m.Unlock()
151+
dc, found := d.confirmations[tag]
152+
if !found {
153+
return
154+
}
155+
close(dc.done)
156+
delete(d.confirmations, tag)
157+
}
158+
138159
func (d *deferredConfirmations) Confirm(confirmation Confirmation) {
139160
d.m.Lock()
140161
defer d.m.Unlock()

confirms_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func TestConfirmOneResequences(t *testing.T) {
2626
c.Listen(l)
2727

2828
for i := range fixtures {
29-
if want, got := uint64(i+1), c.Publish(); want != got.DeliveryTag {
29+
if want, got := uint64(i+1), c.publish(); want != got.DeliveryTag {
3030
t.Fatalf("expected publish to return the 1 based delivery tag published, want: %d, got: %d", want, got.DeliveryTag)
3131
}
3232
}
@@ -64,7 +64,7 @@ func TestConfirmAndPublishDoNotDeadlock(t *testing.T) {
6464
}()
6565

6666
for i := 0; i < iterations; i++ {
67-
c.Publish()
67+
c.publish()
6868
<-l
6969
}
7070
}
@@ -82,7 +82,7 @@ func TestConfirmMixedResequences(t *testing.T) {
8282
c.Listen(l)
8383

8484
for range fixtures {
85-
c.Publish()
85+
c.publish()
8686
}
8787

8888
c.One(fixtures[0])
@@ -117,7 +117,7 @@ func TestConfirmMultipleResequences(t *testing.T) {
117117
c.Listen(l)
118118

119119
for range fixtures {
120-
c.Publish()
120+
c.publish()
121121
}
122122

123123
c.Multiple(fixtures[len(fixtures)-1])
@@ -141,7 +141,7 @@ func BenchmarkSequentialBufferedConfirms(t *testing.B) {
141141
if i > cap(l)-1 {
142142
<-l
143143
}
144-
c.One(Confirmation{c.Publish().DeliveryTag, true})
144+
c.One(Confirmation{c.publish().DeliveryTag, true})
145145
}
146146
}
147147

@@ -159,7 +159,7 @@ func TestConfirmsIsThreadSafe(t *testing.T) {
159159
c.Listen(l)
160160

161161
for i := 0; i < count; i++ {
162-
go func() { pub <- Confirmation{c.Publish().DeliveryTag, true} }()
162+
go func() { pub <- Confirmation{c.publish().DeliveryTag, true} }()
163163
}
164164

165165
for i := 0; i < count; i++ {

0 commit comments

Comments
 (0)