Skip to content

Commit 9f347aa

Browse files
committed
Merge branch 'txn-id-caching' of https://github.com/jameinel/mgo into jameinel-txn-id-caching
# Conflicts: # txn/sim_test.go # txn/tarjan_test.go
2 parents a34a648 + 9428095 commit 9f347aa

File tree

2 files changed

+153
-10
lines changed

2 files changed

+153
-10
lines changed

txn/flusher.go

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ func flush(r *Runner, t *transaction) error {
1313
Runner: r,
1414
goal: t,
1515
goalKeys: make(map[docKey]bool),
16-
queue: make(map[docKey][]token),
16+
queue: make(map[docKey][]tokenAndId),
1717
debugId: debugPrefix(),
1818
}
1919
for _, dkey := range f.goal.docKeys() {
@@ -26,10 +26,36 @@ type flusher struct {
2626
*Runner
2727
goal *transaction
2828
goalKeys map[docKey]bool
29-
queue map[docKey][]token
29+
queue map[docKey][]tokenAndId
3030
debugId string
3131
}
3232

33+
type tokenAndId struct {
34+
tt token
35+
bid bson.ObjectId
36+
}
37+
38+
func (ti tokenAndId) id() bson.ObjectId {
39+
return ti.bid
40+
}
41+
42+
func (ti tokenAndId) nonce() string {
43+
return ti.tt.nonce()
44+
}
45+
46+
func (ti tokenAndId) String() string {
47+
return string(ti.tt)
48+
}
49+
50+
func tokensWithIds(q []token) []tokenAndId {
51+
out := make([]tokenAndId, len(q))
52+
for i, tt := range q {
53+
out[i].tt = tt
54+
out[i].bid = tt.id()
55+
}
56+
return out
57+
}
58+
3359
func (f *flusher) run() (err error) {
3460
if chaosEnabled {
3561
defer f.handleChaos(&err)
@@ -248,7 +274,7 @@ NextDoc:
248274
if info.Remove == "" {
249275
// Fast path, unless workload is insert/remove heavy.
250276
revno[dkey] = info.Revno
251-
f.queue[dkey] = info.Queue
277+
f.queue[dkey] = tokensWithIds(info.Queue)
252278
f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
253279
continue NextDoc
254280
} else {
@@ -310,7 +336,7 @@ NextDoc:
310336
f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
311337
}
312338
revno[dkey] = info.Revno
313-
f.queue[dkey] = info.Queue
339+
f.queue[dkey] = tokensWithIds(info.Queue)
314340
continue NextDoc
315341
}
316342
}
@@ -452,7 +478,7 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error)
452478
break
453479
}
454480
}
455-
f.queue[dkey] = info.Queue
481+
f.queue[dkey] = tokensWithIds(info.Queue)
456482
if !found {
457483
// Rescanned transaction id was not in the queue. This could mean one
458484
// of three things:
@@ -516,12 +542,13 @@ func assembledRevnos(ops []Op, revno map[docKey]int64) []int64 {
516542

517543
func (f *flusher) hasPreReqs(tt token, dkeys docKeys) (prereqs, found bool) {
518544
found = true
545+
ttId := tt.id()
519546
NextDoc:
520547
for _, dkey := range dkeys {
521548
for _, dtt := range f.queue[dkey] {
522-
if dtt == tt {
549+
if dtt.tt == tt {
523550
continue NextDoc
524-
} else if dtt.id() != tt.id() {
551+
} else if dtt.id() != ttId {
525552
prereqs = true
526553
}
527554
}
@@ -909,17 +936,17 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err
909936
return nil
910937
}
911938

912-
func tokensToPull(dqueue []token, pull map[bson.ObjectId]*transaction, dontPull token) []token {
939+
func tokensToPull(dqueue []tokenAndId, pull map[bson.ObjectId]*transaction, dontPull token) []token {
913940
var result []token
914941
for j := len(dqueue) - 1; j >= 0; j-- {
915942
dtt := dqueue[j]
916-
if dtt == dontPull {
943+
if dtt.tt == dontPull {
917944
continue
918945
}
919946
if _, ok := pull[dtt.id()]; ok {
920947
// It was handled before and this is a leftover invalid
921948
// nonce in the queue. Cherry-pick it out.
922-
result = append(result, dtt)
949+
result = append(result, dtt.tt)
923950
}
924951
}
925952
return result

txn/txn_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,8 @@ func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) {
703703
}
704704

705705
var flaky = flag.Bool("flaky", false, "Include flaky tests")
706+
var txnQueueLength = flag.Int("qlength", 100, "txn-queue length for tests")
707+
706708

707709
func (s *S) TestTxnQueueStressTest(c *C) {
708710
// This fails about 20% of the time on Mongo 3.2 (I haven't tried
@@ -776,3 +778,117 @@ func (s *S) TestTxnQueueStressTest(c *C) {
776778
}
777779
}
778780
}
781+
782+
type txnQueue struct {
783+
Queue []string `bson:"txn-queue"`
784+
}
785+
786+
func (s *S) TestTxnQueueAssertionGrowth(c *C) {
787+
txn.SetDebug(false) // too much spam
788+
err := s.accounts.Insert(M{"_id": 0, "balance": 0})
789+
c.Assert(err, IsNil)
790+
// Create many assertion only transactions.
791+
t := time.Now()
792+
ops := []txn.Op{{
793+
C: "accounts",
794+
Id: 0,
795+
Assert: M{"balance": 0},
796+
}}
797+
for n := 0; n < *txnQueueLength; n++ {
798+
err = s.runner.Run(ops, "", nil)
799+
c.Assert(err, IsNil)
800+
}
801+
var qdoc txnQueue
802+
err = s.accounts.FindId(0).One(&qdoc)
803+
c.Assert(err, IsNil)
804+
c.Check(len(qdoc.Queue), Equals, *txnQueueLength)
805+
c.Logf("%8.3fs to set up %d assertions", time.Since(t).Seconds(), *txnQueueLength)
806+
t = time.Now()
807+
txn.SetChaos(txn.Chaos{})
808+
ops = []txn.Op{{
809+
C: "accounts",
810+
Id: 0,
811+
Update: M{"$inc": M{"balance": 100}},
812+
}}
813+
err = s.runner.Run(ops, "", nil)
814+
c.Logf("%8.3fs to clear N=%d assertions and add one more txn",
815+
time.Since(t).Seconds(), *txnQueueLength)
816+
err = s.accounts.FindId(0).One(&qdoc)
817+
c.Assert(err, IsNil)
818+
c.Check(len(qdoc.Queue), Equals, 1)
819+
}
820+
821+
func (s *S) TestTxnQueueBrokenPrepared(c *C) {
822+
txn.SetDebug(false) // too much spam
823+
badTxnToken := "123456789012345678901234_deadbeef"
824+
err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{badTxnToken}})
825+
c.Assert(err, IsNil)
826+
t := time.Now()
827+
ops := []txn.Op{{
828+
C: "accounts",
829+
Id: 0,
830+
Update: M{"$set": M{"balance": 0}},
831+
}}
832+
errString := `cannot find transaction ObjectIdHex("123456789012345678901234")`
833+
for n := 0; n < *txnQueueLength; n++ {
834+
err = s.runner.Run(ops, "", nil)
835+
c.Assert(err.Error(), Equals, errString)
836+
}
837+
var qdoc txnQueue
838+
err = s.accounts.FindId(0).One(&qdoc)
839+
c.Assert(err, IsNil)
840+
c.Check(len(qdoc.Queue), Equals, *txnQueueLength+1)
841+
c.Logf("%8.3fs to set up %d 'prepared' txns", time.Since(t).Seconds(), *txnQueueLength)
842+
t = time.Now()
843+
s.accounts.UpdateId(0, bson.M{"$pullAll": bson.M{"txn-queue": []string{badTxnToken}}})
844+
ops = []txn.Op{{
845+
C: "accounts",
846+
Id: 0,
847+
Update: M{"$inc": M{"balance": 100}},
848+
}}
849+
err = s.runner.ResumeAll()
850+
c.Assert(err, IsNil)
851+
c.Logf("%8.3fs to ResumeAll N=%d 'prepared' txns",
852+
time.Since(t).Seconds(), *txnQueueLength)
853+
err = s.accounts.FindId(0).One(&qdoc)
854+
c.Assert(err, IsNil)
855+
c.Check(len(qdoc.Queue), Equals, 1)
856+
}
857+
858+
func (s *S) TestTxnQueuePreparing(c *C) {
859+
txn.SetDebug(false) // too much spam
860+
err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{}})
861+
c.Assert(err, IsNil)
862+
t := time.Now()
863+
txn.SetChaos(txn.Chaos{
864+
KillChance: 1.0,
865+
Breakpoint: "set-prepared",
866+
})
867+
ops := []txn.Op{{
868+
C: "accounts",
869+
Id: 0,
870+
Update: M{"$set": M{"balance": 0}},
871+
}}
872+
for n := 0; n < *txnQueueLength; n++ {
873+
err = s.runner.Run(ops, "", nil)
874+
c.Assert(err, Equals, txn.ErrChaos)
875+
}
876+
var qdoc txnQueue
877+
err = s.accounts.FindId(0).One(&qdoc)
878+
c.Assert(err, IsNil)
879+
c.Check(len(qdoc.Queue), Equals, *txnQueueLength)
880+
c.Logf("%8.3fs to set up %d 'preparing' txns", time.Since(t).Seconds(), *txnQueueLength)
881+
txn.SetChaos(txn.Chaos{})
882+
t = time.Now()
883+
err = s.runner.ResumeAll()
884+
c.Logf("%8.3fs to ResumeAll N=%d 'preparing' txns",
885+
time.Since(t).Seconds(), *txnQueueLength)
886+
err = s.accounts.FindId(0).One(&qdoc)
887+
c.Assert(err, IsNil)
888+
expectedCount := 100
889+
if *txnQueueLength <= expectedCount {
890+
expectedCount = *txnQueueLength - 1
891+
}
892+
c.Check(len(qdoc.Queue), Equals, expectedCount)
893+
}
894+

0 commit comments

Comments
 (0)