Skip to content

Commit 6dfa071

Browse files
authored
stress test update + bug fixes (#682)
Signed-off-by: Angelo De Caro <[email protected]>
1 parent 478124c commit 6dfa071

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+1568
-670
lines changed

integration/nwo/token/common/ne.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (p *NetworkHandler) DBPath(root string, tms *topology2.TMS) string {
7171
filepath.Join(
7272
root,
7373
fmt.Sprintf("%s_%s_%s", tms.Network, tms.Channel, tms.Namespace)+"_db.sqlite",
74-
) + "?_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)"
74+
) + "?_pragma=journal_mode(WAL)&_pragma=busy_timeout(20000)"
7575
}
7676

7777
func (p *NetworkHandler) FSCNodeKVSDir(uniqueName string) string {

integration/nwo/token/fabric/template.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ token:
4343
opts:
4444
createSchema: true
4545
driver: {{ SQLDriver }}
46-
maxOpenConns: 10
46+
maxOpenConns: 200
4747
dataSource: {{ SQLDataSource }}
4848
{{ if not OnlyUnity }}
4949
tokendb:
@@ -53,7 +53,7 @@ token:
5353
createSchema: true
5454
tablePrefix: tokens
5555
driver: {{ TokensSQLDriver }}
56-
maxOpenConns: 10
56+
maxOpenConns: 200
5757
dataSource: {{ TokensSQLDataSource }}
5858
{{ end }}
5959
{{ if Wallets }}

integration/nwo/token/orion/orion.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,6 @@ func (p *NetworkHandler) GenerateExtension(tms *topology2.TMS, node *sfcnode.Nod
179179
Expect(os.MkdirAll(p.IdentityDBSQLDataSourceDir(uniqueName), 0775)).ToNot(HaveOccurred(), "failed to create [%s]", p.IdentityDBSQLDataSourceDir(uniqueName))
180180

181181
t, err := template.New("peer").Funcs(template.FuncMap{
182-
"TMSID": func() string { return tms.ID() },
183-
"TMS": func() *topology2.TMS { return tms },
184-
"Wallets": func() *generators.Wallets { return p.GetEntry(tms).Wallets[node.Name] },
185182
"IsCustodian": func() bool {
186183
custodianNode, ok := tms.BackendParams[Custodian]
187184
if !ok {
@@ -192,10 +189,14 @@ func (p *NetworkHandler) GenerateExtension(tms *topology2.TMS, node *sfcnode.Nod
192189
"CustodianID": func() string {
193190
return tms.BackendParams[Custodian].(string)
194191
},
192+
"TMSID": func() string { return tms.ID() },
193+
"TMS": func() *topology2.TMS { return tms },
194+
"Wallets": func() *generators.Wallets { return p.GetEntry(tms).Wallets[node.Name] },
195195
"SQLDriver": func() string { return GetTokenPersistenceDriver(node.Options) },
196196
"SQLDataSource": func() string { return p.GetSQLDataSource(node.Options, uniqueName, tms) },
197197
"TokensSQLDriver": func() string { return GetTokenPersistenceDriver(node.Options) },
198198
"TokensSQLDataSource": func() string { return p.GetTokensSQLDataSource(node.Options, uniqueName, tms) },
199+
"OnlyUnity": func() bool { return common2.IsOnlyUnity(tms) },
199200
}).Parse(Extension)
200201
Expect(err).NotTo(HaveOccurred())
201202

integration/nwo/token/orion/template.go

Lines changed: 66 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,73 +6,118 @@ SPDX-License-Identifier: Apache-2.0
66

77
package orion
88

9-
const Extension = `
9+
const (
10+
Extension = `
1011
token:
11-
tms:
12+
# TMS stands for Token Management Service. A TMS is uniquely identified by a network, channel, and
13+
# namespace identifiers. The network identifier should refer to a configure network (Fabric, Orion, and so on).
14+
# The meaning of channel and namespace are network dependant. For Fabric, the meaning is clear.
15+
# For Orion, channel is empty and namespace is the DB name to use.
16+
tms:
1217
{{ TMSID }}:
18+
# Network identifier this TMS refers to
1319
network: {{ TMS.Network }}
20+
# Channel identifier within the specified network
1421
channel: {{ TMS.Channel }}
22+
# Namespace identifier within the specified channel
1523
namespace: {{ TMS.Namespace }}
1624
orion:
1725
custodian:
1826
id: {{ CustodianID }}
1927
enabled: {{ IsCustodian }}
2028
certification:
21-
interactive:
29+
{{ if TMS.Certifiers }} interactive:
2230
ids: {{ range TMS.Certifiers }}
23-
- {{ . }}{{ end }}
31+
- {{ . }}{{ end }}{{ end }}
2432
db:
2533
persistence:
34+
type: unity
2635
opts:
2736
createSchema: true
2837
driver: {{ SQLDriver }}
29-
maxOpenConns: 10
38+
maxOpenConns: 200
3039
dataSource: {{ SQLDataSource }}
31-
identitydb:
32-
persistence:
33-
type: unity
34-
ttxdb:
35-
persistence:
36-
type: unity
37-
auditdb:
38-
persistence:
39-
type: unity
40-
tokenlockdb:
41-
persistence:
42-
type: unity
40+
{{ if not OnlyUnity }}
4341
tokendb:
4442
persistence:
4543
type: sql
4644
opts:
4745
createSchema: true
4846
tablePrefix: tokens
4947
driver: {{ TokensSQLDriver }}
50-
maxOpenConns: 10
48+
maxOpenConns: 200
5149
dataSource: {{ TokensSQLDataSource }}
52-
{{ if Wallets }}wallets:{{ if Wallets.Certifiers }}
50+
{{ end }}
51+
{{ if Wallets }}
52+
# Wallets associated with this TMS
53+
wallets:
54+
defaultCacheSize: 200
55+
{{ if Wallets.Certifiers }}
56+
# Certifiers wallets are used to certify tokens
5357
certifiers: {{ range Wallets.Certifiers }}
5458
- id: {{ .ID }}
5559
default: {{ .Default }}
5660
path: {{ .Path }}
5761
{{ end }}
58-
{{ end }}{{ if Wallets.Issuers }}
62+
{{ end }}{{ if Wallets.Issuers }}
63+
# Issuers wallets are used to issue tokens
5964
issuers: {{ range Wallets.Issuers }}
6065
- id: {{ .ID }}
6166
default: {{ .Default }}
6267
path: {{ .Path }}
68+
opts:
69+
BCCSP:
70+
Default: {{ .Opts.Default }}
71+
# Settings for the SW crypto provider (i.e. when DEFAULT: SW)
72+
SW:
73+
Hash: {{ .Opts.SW.Hash }}
74+
Security: {{ .Opts.SW.Security }}
75+
# Settings for the PKCS#11 crypto provider (i.e. when DEFAULT: PKCS11)
76+
PKCS11:
77+
# Location of the PKCS11 module library
78+
Library: {{ .Opts.PKCS11.Library }}
79+
# Token Label
80+
Label: {{ .Opts.PKCS11.Label }}
81+
# User PIN
82+
Pin: {{ .Opts.PKCS11.Pin }}
83+
Hash: {{ .Opts.PKCS11.Hash }}
84+
Security: {{ .Opts.PKCS11.Security }}
6385
{{ end }}
6486
{{ end }}{{ if Wallets.Owners }}
87+
# Owners wallets are used to own tokens
6588
owners: {{ range Wallets.Owners }}
6689
- id: {{ .ID }}
6790
default: {{ .Default }}
6891
path: {{ .Path }}
92+
{{ if .Type }}
93+
type: {{ .Type }}
94+
{{ end }}
6995
{{ end }}
7096
{{ end }}{{ if Wallets.Auditors }}
97+
# Auditors wallets are used to audit tokens
7198
auditors: {{ range Wallets.Auditors }}
7299
- id: {{ .ID }}
73100
default: {{ .Default }}
74101
path: {{ .Path }}
102+
opts:
103+
BCCSP:
104+
Default: {{ .Opts.Default }}
105+
# Settings for the SW crypto provider (i.e. when DEFAULT: SW)
106+
SW:
107+
Hash: {{ .Opts.SW.Hash }}
108+
Security: {{ .Opts.SW.Security }}
109+
# Settings for the PKCS#11 crypto provider (i.e. when DEFAULT: PKCS11)
110+
PKCS11:
111+
# Location of the PKCS11 module library
112+
Library: {{ .Opts.PKCS11.Library }}
113+
# Token Label
114+
Label: {{ .Opts.PKCS11.Label }}
115+
# User PIN
116+
Pin: {{ .Opts.PKCS11.Pin }}
117+
Hash: {{ .Opts.PKCS11.Hash }}
118+
Security: {{ .Opts.PKCS11.Security }}
75119
{{ end }}
76120
{{ end }}
77121
{{ end }}
78-
`
122+
`
123+
)

integration/ports.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ var (
4444
ReplicationFactor: token.None,
4545
}
4646

47-
WebSocketNoReplicationOnly = []*InfrastructureType{WebSocketNoReplication}
47+
WebSocketNoReplicationOnly = []*InfrastructureType{
48+
WebSocketNoReplication,
49+
}
4850

4951
AllTestTypes = []*InfrastructureType{
5052
WebSocketNoReplication,

integration/token/common/views/finality.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package views
88

99
import (
1010
"encoding/json"
11-
"fmt"
1211

1312
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/assert"
1413
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
@@ -69,6 +68,6 @@ type finalityListener struct {
6968
}
7069

7170
func (l *finalityListener) OnStatus(txID string, status int, message string, tokenRequestHash []byte) {
72-
fmt.Printf("Received finality from network for TX [%s][%d]", txID, status)
71+
//fmt.Printf("Received finality from network for TX [%s][%d]", txID, status)
7372
l.errs <- nil
7473
}

integration/token/fungible/dlogstress/dlog_test.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ package dlog
99
import (
1010
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/fsc"
1111
"github.com/hyperledger-labs/fabric-smart-client/pkg/api"
12-
token2 "github.com/hyperledger-labs/fabric-token-sdk/integration/token"
12+
"github.com/hyperledger-labs/fabric-token-sdk/integration/token"
1313
"github.com/hyperledger-labs/fabric-token-sdk/integration/token/common"
1414
"github.com/hyperledger-labs/fabric-token-sdk/integration/token/common/sdk/fdlog"
1515
"github.com/hyperledger-labs/fabric-token-sdk/integration/token/common/sdk/odlog"
@@ -19,7 +19,10 @@ import (
1919
)
2020

2121
var _ = Describe("Stress EndToEnd", func() {
22-
for _, backend := range []string{"fabric", "orion"} {
22+
for _, backend := range []string{
23+
"fabric",
24+
"orion",
25+
} {
2326
Describe("Stress test", Label(backend), func() {
2427
ts, selector := newTestSuite(backend)
2528
AfterEach(ts.TearDown)
@@ -35,16 +38,17 @@ var sdks = map[string]api.SDK{
3538
"orion": &odlog.SDK{},
3639
}
3740

38-
func newTestSuite(backend string) (*token2.TestSuite, *token2.ReplicaSelector) {
39-
opts, selector := token2.NewReplicationOptions(token2.None)
40-
ts := token2.NewTestSuite(opts.SQLConfigs, StartPortDlog, topology.Topology(
41+
func newTestSuite(backend string) (*token.TestSuite, *token.ReplicaSelector) {
42+
//opts, selector := token.NewReplicationOptions(token.None)
43+
opts, selector := token.NewReplicationOptions(1, "alice", "bob", "charlie", "issuer", "auditor")
44+
ts := token.NewTestSuite(opts.SQLConfigs, StartPortDlog, topology.Topology(
4145
common.Opts{
4246
Backend: backend,
4347
TokenSDKDriver: "dlog",
4448
Aries: true,
4549
ReplicationOpts: opts,
46-
CommType: fsc.WebSocket,
47-
//FSCLogSpec: "token-sdk=debug:fabric-sdk=debug:info",
50+
CommType: fsc.LibP2P,
51+
//FSCLogSpec: "token-sdk=debug:orion-sdk=debug:info",
4852
SDKs: []api.SDK{sdks[backend]},
4953
Monitoring: true,
5054
},

integration/token/fungible/dlogstress/support/worker.go

Lines changed: 86 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,32 +7,87 @@ SPDX-License-Identifier: Apache-2.0
77
package support
88

99
import (
10+
"context"
11+
"fmt"
1012
"sync"
13+
"sync/atomic"
14+
"time"
1115
)
1216

1317
type Task func()
1418

1519
type Pool struct {
1620
taskQueue chan Task
1721
wg sync.WaitGroup
18-
shutdown chan struct{}
22+
ctx context.Context
23+
cancel context.CancelFunc
24+
label string
25+
stop atomic.Bool
1926
}
2027

21-
func NewPool(numWorkers int) *Pool {
28+
func NewPool(label string, numWorkers int) *Pool {
29+
ctx, cancel := context.WithCancel(context.Background())
2230
pool := &Pool{
2331
taskQueue: make(chan Task, numWorkers),
2432
wg: sync.WaitGroup{},
33+
ctx: ctx,
34+
cancel: cancel,
35+
label: label,
2536
}
2637

2738
pool.wg.Add(numWorkers)
39+
pool.stop.Store(false)
2840
for i := 0; i < numWorkers; i++ {
2941
go func() {
30-
defer pool.wg.Done()
42+
counter := atomic.Int64{}
43+
var taskDurations []time.Duration
44+
workerStart := time.Now()
45+
46+
defer func() {
47+
fmt.Printf("Context done for [%s], shutdown after, computing statistics\n", pool.label)
48+
workerEnd := time.Since(workerStart)
49+
var sum int64
50+
numTasks := counter.Load()
51+
for _, duration := range taskDurations {
52+
sum += duration.Milliseconds()
53+
}
54+
avgDuration := sum / numTasks
55+
throughput := workerEnd.Milliseconds() / numTasks
56+
57+
fmt.Printf(
58+
"Context done for [%s], shutdown after # task [%d], avg duration [%d], throughput (%d,%d): [%v] \n",
59+
pool.label,
60+
numTasks,
61+
avgDuration,
62+
throughput,
63+
workerEnd.Milliseconds(),
64+
taskDurations,
65+
)
66+
pool.wg.Done()
67+
}()
3168
for {
32-
if task, ok := <-pool.taskQueue; ok {
33-
task()
34-
} else {
69+
select {
70+
case <-pool.ctx.Done():
71+
fmt.Printf("Context done for [%s], shutdown after # %d workers\n", pool.label, numWorkers)
72+
pool.stop.Store(true)
3573
return
74+
case task := <-pool.taskQueue:
75+
if pool.stop.Load() {
76+
fmt.Printf("No more tasks for [%s] to run...\n", pool.label)
77+
return
78+
}
79+
if task == nil {
80+
fmt.Printf("Got nil task for [%s], shutdown...\n", pool.label)
81+
return
82+
}
83+
fmt.Printf("Schedule new task for [%s]: [%d]\n", pool.label, counter.Add(1))
84+
start := time.Now()
85+
task()
86+
end := time.Since(start)
87+
taskDurations = append(taskDurations, end)
88+
fmt.Printf("Task for [%s][%d], took [%v]\n", pool.label, counter.Load(), end.Milliseconds())
89+
default:
90+
fmt.Printf("Nothing to do for [%s]\n", pool.label)
3691
}
3792
}
3893
}()
@@ -41,17 +96,33 @@ func NewPool(numWorkers int) *Pool {
4196
}
4297

4398
func (p *Pool) ScheduleTask(task Task) {
44-
for {
45-
select {
46-
case <-p.shutdown:
47-
return
48-
default:
49-
p.taskQueue <- task
99+
go func() {
100+
for {
101+
select {
102+
case <-p.ctx.Done():
103+
fmt.Printf("Context done for [%s], shutdown scheduler\n", p.label)
104+
p.stop.Store(true)
105+
return
106+
default:
107+
if p.stop.Load() {
108+
fmt.Printf("Stop for [%s], shutdown scheduler\n", p.label)
109+
return
110+
}
111+
p.taskQueue <- task
112+
}
50113
}
51-
}
114+
}()
115+
}
116+
117+
func (p *Pool) Stop() {
118+
fmt.Printf("Shutting down workers for [%s]\n", p.label)
119+
p.stop.Store(true)
120+
fmt.Printf("Shutting down workers for [%s], cancel context...\n", p.label)
121+
p.cancel()
52122
}
53123

54-
func (p *Pool) Shutdown() {
55-
close(p.taskQueue)
124+
func (p *Pool) Wait() {
125+
fmt.Printf("Shutting down workers for [%s], wait workers...\n", p.label)
56126
p.wg.Wait()
127+
fmt.Printf("All workers shut down for [%s]\n", p.label)
57128
}

0 commit comments

Comments
 (0)