Skip to content

Commit 4fc8359

Browse files
authored
common platform (#547)
Signed-off-by: Alexandros Filios <[email protected]> Signed-off-by: Angelo De Caro <[email protected]>
1 parent cc10a45 commit 4fc8359

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+1697
-3426
lines changed

integration/fabric/iou/views/approver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ func (i *ApproverView) Call(context view.Context) (interface{}, error) {
7878
var wg sync.WaitGroup
7979
wg.Add(2)
8080
committer := fabric.GetDefaultChannel(context).Committer()
81-
assert.NoError(err, committer.SubscribeTxStatusChanges(tx.ID(), NewTxStatusChangeListener(tx.ID(), &wg)), "failed to add committer listener")
82-
assert.NoError(err, committer.SubscribeTxStatusChanges("", NewTxStatusChangeListener(tx.ID(), &wg)), "failed to add committer listener")
81+
assert.NoError(err, committer.AddFinalityListener(tx.ID(), NewFinalityListener(tx.ID(), &wg)), "failed to add committer listener")
82+
assert.NoError(err, committer.AddFinalityListener("", NewFinalityListener(tx.ID(), &wg)), "failed to add committer listener")
8383

8484
// Finally, the approver waits that the transaction completes its lifecycle
8585
_, err = context.RunView(state.NewFinalityWithTimeoutView(tx, 1*time.Minute))

integration/fabric/iou/views/borrower.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ func (i *CreateIOUView) Call(context view.Context) (interface{}, error) {
7171
var wg sync.WaitGroup
7272
wg.Add(2)
7373
committer := fabric.GetDefaultChannel(context).Committer()
74-
assert.NoError(err, committer.SubscribeTxStatusChanges(tx.ID(), NewTxStatusChangeListener(tx.ID(), &wg)), "failed to add committer listener")
75-
assert.NoError(err, committer.SubscribeTxStatusChanges("", NewTxStatusChangeListener(tx.ID(), &wg)), "failed to add committer listener")
74+
assert.NoError(err, committer.AddFinalityListener(tx.ID(), NewFinalityListener(tx.ID(), &wg)), "failed to add committer listener")
75+
assert.NoError(err, committer.AddFinalityListener("", NewFinalityListener(tx.ID(), &wg)), "failed to add committer listener")
7676

7777
// At this point the borrower can send the transaction to the ordering service and wait for finality.
7878
_, err = context.RunView(state.NewOrderingAndFinalityWithTimeoutView(tx, 1*time.Minute))
@@ -138,8 +138,8 @@ func (u UpdateIOUView) Call(context view.Context) (interface{}, error) {
138138
var wg sync.WaitGroup
139139
wg.Add(2)
140140
committer := fabric.GetDefaultChannel(context).Committer()
141-
assert.NoError(err, committer.SubscribeTxStatusChanges(tx.ID(), NewTxStatusChangeListener(tx.ID(), &wg)), "failed to add committer listener")
142-
assert.NoError(err, committer.SubscribeTxStatusChanges("", NewTxStatusChangeListener(tx.ID(), &wg)), "failed to add committer listener")
141+
assert.NoError(err, committer.AddFinalityListener(tx.ID(), NewFinalityListener(tx.ID(), &wg)), "failed to add committer listener")
142+
assert.NoError(err, committer.AddFinalityListener("", NewFinalityListener(tx.ID(), &wg)), "failed to add committer listener")
143143

144144
// At this point the borrower can send the transaction to the ordering service and wait for finality.
145145
_, err = context.RunView(state.NewOrderingAndFinalityWithTimeoutView(tx, 1*time.Minute))

integration/fabric/iou/views/utils.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,23 @@ SPDX-License-Identifier: Apache-2.0
66

77
package views
88

9-
import "sync"
9+
import (
10+
"sync"
1011

11-
type TxStatusChangeListener struct {
12+
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver"
13+
)
14+
15+
type FinalityListener struct {
1216
ExpectedTxID string
13-
WG *sync.WaitGroup
17+
WaitGroup *sync.WaitGroup
1418
}
1519

16-
func NewTxStatusChangeListener(expectedTxID string, WG *sync.WaitGroup) *TxStatusChangeListener {
17-
return &TxStatusChangeListener{ExpectedTxID: expectedTxID, WG: WG}
20+
func NewFinalityListener(expectedTxID string, WG *sync.WaitGroup) *FinalityListener {
21+
return &FinalityListener{ExpectedTxID: expectedTxID, WaitGroup: WG}
1822
}
1923

20-
func (t *TxStatusChangeListener) OnStatusChange(txID string, status int, statusMessage string) error {
24+
func (t *FinalityListener) OnStatus(txID string, _ driver.ValidationCode, _ string) {
2125
if txID == t.ExpectedTxID {
22-
t.WG.Done()
26+
t.WaitGroup.Done()
2327
}
24-
return nil
2528
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package committer
8+
9+
import (
10+
"context"
11+
"runtime/debug"
12+
"sync"
13+
"time"
14+
15+
"github.com/hyperledger-labs/fabric-smart-client/platform/common/core"
16+
"github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
17+
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils"
18+
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
19+
)
20+
21+
const (
22+
// How often to poll the vault for newly-committed transactions
23+
checkVaultFrequency = 1 * time.Second
24+
// Listeners that do not listen for a specific txID, but for all transactions
25+
allListenersKey = ""
26+
defaultEventQueueSize = 1000
27+
)
28+
29+
var logger = flogging.MustGetLogger("common-sdk.Committer")
30+
31+
// FinalityEvent contains information about the finality of a given transaction
32+
type FinalityEvent[V comparable] struct {
33+
TxID core.TxID
34+
ValidationCode V
35+
ValidationMessage string
36+
Block uint64
37+
IndexInBlock int
38+
Err error
39+
}
40+
41+
// FinalityListener is the interface that must be implemented to receive transaction status notifications
42+
type FinalityListener[V comparable] interface {
43+
// OnStatus is called when the status of a transaction changes, or it is valid or invalid
44+
OnStatus(txID core.TxID, status V, statusMessage string)
45+
}
46+
47+
type Vault[V comparable] interface {
48+
Statuses(ids ...string) ([]driver.TxValidationStatus[V], error)
49+
}
50+
51+
// FinalityManager manages events for the commit pipeline.
52+
// It consists of a central queue of events.
53+
// The queue is fed by multiple sources.
54+
// A single thread reads from this queue and invokes the listeners in a blocking way
55+
type FinalityManager[V comparable] struct {
56+
eventQueue chan FinalityEvent[V]
57+
vault Vault[V]
58+
postStatuses utils.Set[V]
59+
txIDListeners map[core.TxID][]FinalityListener[V]
60+
mutex sync.RWMutex
61+
}
62+
63+
func NewFinalityManager[V comparable](vault Vault[V], statuses ...V) *FinalityManager[V] {
64+
return &FinalityManager[V]{
65+
eventQueue: make(chan FinalityEvent[V], defaultEventQueueSize),
66+
vault: vault,
67+
postStatuses: utils.NewSet(statuses...),
68+
txIDListeners: map[string][]FinalityListener[V]{},
69+
}
70+
}
71+
72+
func (c *FinalityManager[V]) AddListener(txID core.TxID, toAdd FinalityListener[V]) {
73+
c.mutex.Lock()
74+
defer c.mutex.Unlock()
75+
76+
ls, ok := c.txIDListeners[txID]
77+
if !ok {
78+
ls = []FinalityListener[V]{}
79+
}
80+
c.txIDListeners[txID] = append(ls, toAdd)
81+
}
82+
83+
func (c *FinalityManager[V]) RemoveListener(txID core.TxID, toRemove FinalityListener[V]) {
84+
c.mutex.Lock()
85+
defer c.mutex.Unlock()
86+
87+
if ls, ok := utils.Remove(c.txIDListeners[txID], toRemove); ok {
88+
c.txIDListeners[txID] = ls
89+
if len(ls) == 0 {
90+
delete(c.txIDListeners, txID)
91+
}
92+
}
93+
}
94+
95+
func (c *FinalityManager[V]) Post(event FinalityEvent[V]) {
96+
c.eventQueue <- event
97+
}
98+
99+
func (c *FinalityManager[V]) Dispatch(event FinalityEvent[V]) {
100+
listeners := c.cloneListeners(event.TxID)
101+
for _, listener := range listeners {
102+
c.invokeListener(listener, event.TxID, event.ValidationCode, event.ValidationMessage)
103+
}
104+
}
105+
106+
func (c *FinalityManager[V]) Run(context context.Context) {
107+
go c.runEventQueue(context)
108+
go c.runStatusListener(context)
109+
}
110+
111+
func (c *FinalityManager[V]) invokeListener(l FinalityListener[V], txID core.TxID, status V, statusMessage string) {
112+
defer func() {
113+
if r := recover(); r != nil {
114+
logger.Errorf("caught panic while running dispatching event [%s:%d:%s]: [%s][%s]", txID, status, statusMessage, r, debug.Stack())
115+
}
116+
}()
117+
l.OnStatus(txID, status, statusMessage)
118+
}
119+
120+
func (c *FinalityManager[V]) runEventQueue(context context.Context) {
121+
for {
122+
select {
123+
case <-context.Done():
124+
return
125+
case event := <-c.eventQueue:
126+
c.Dispatch(event)
127+
}
128+
}
129+
}
130+
131+
func (c *FinalityManager[V]) runStatusListener(context context.Context) {
132+
ticker := time.NewTicker(checkVaultFrequency)
133+
defer ticker.Stop()
134+
for {
135+
select {
136+
case <-context.Done():
137+
return
138+
case <-ticker.C:
139+
statuses, err := c.vault.Statuses(c.txIDs()...)
140+
if err != nil {
141+
logger.Errorf("error fetching statuses: %w", err)
142+
continue
143+
}
144+
for _, status := range statuses {
145+
// check txID status, if it is valid or invalid, post an event
146+
logger.Debugf("check tx [%s]'s status", status.TxID)
147+
if c.postStatuses.Contains(status.ValidationCode) {
148+
// post the event
149+
c.Post(FinalityEvent[V]{
150+
TxID: status.TxID,
151+
ValidationCode: status.ValidationCode,
152+
ValidationMessage: status.Message,
153+
})
154+
}
155+
}
156+
}
157+
}
158+
}
159+
160+
func (c *FinalityManager[V]) cloneListeners(txID core.TxID) []FinalityListener[V] {
161+
c.mutex.RLock()
162+
defer c.mutex.RUnlock()
163+
164+
txListeners := c.txIDListeners[txID]
165+
allListeners := c.txIDListeners[allListenersKey]
166+
clone := make([]FinalityListener[V], len(txListeners)+len(allListeners))
167+
copy(clone[:len(txListeners)], txListeners)
168+
copy(clone[len(txListeners):], allListeners)
169+
delete(c.txIDListeners, txID)
170+
171+
return clone
172+
}
173+
174+
func (c *FinalityManager[V]) txIDs() []core.TxID {
175+
c.mutex.RLock()
176+
defer c.mutex.RUnlock()
177+
178+
keys, _ := utils.Remove(utils.Keys(c.txIDListeners), allListenersKey)
179+
return keys
180+
}

platform/fabric/core/generic/vault/inspector.go renamed to platform/common/core/generic/vault/inspector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ SPDX-License-Identifier: Apache-2.0
77
package vault
88

99
import (
10-
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver"
10+
"github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
1111
"github.com/pkg/errors"
1212
)
1313

0 commit comments

Comments
 (0)