Skip to content

Commit a1bc7c2

Browse files
Add context to simpleKeyDataStore #935
Signed-off-by: Said Altury <[email protected]>
1 parent e8ded19 commit a1bc7c2

File tree

25 files changed

+180
-161
lines changed

25 files changed

+180
-161
lines changed

platform/common/driver/kvs.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,19 @@ type BindingStore interface {
3434
}
3535

3636
type MetadataStore[K any, M any] interface {
37-
GetMetadata(key K) (M, error)
38-
ExistMetadata(key K) (bool, error)
39-
PutMetadata(key K, transientMap M) error
37+
GetMetadata(ctx context.Context, key K) (M, error)
38+
ExistMetadata(ctx context.Context, key K) (bool, error)
39+
PutMetadata(ctx context.Context, key K, transientMap M) error
4040
}
4141

4242
type EnvelopeStore[K any] interface {
43-
GetEnvelope(key K) ([]byte, error)
44-
ExistsEnvelope(key K) (bool, error)
45-
PutEnvelope(key K, env []byte) error
43+
GetEnvelope(ctx context.Context, key K) ([]byte, error)
44+
ExistsEnvelope(ctx context.Context, key K) (bool, error)
45+
PutEnvelope(ctx context.Context, key K, env []byte) error
4646
}
4747

4848
type EndorseTxStore[K any] interface {
49-
GetEndorseTx(key K) ([]byte, error)
50-
ExistsEndorseTx(key K) (bool, error)
51-
PutEndorseTx(key K, etx []byte) error
49+
GetEndorseTx(ctx context.Context, key K) ([]byte, error)
50+
ExistsEndorseTx(ctx context.Context, key K) (bool, error)
51+
PutEndorseTx(ctx context.Context, key K, etx []byte) error
5252
}

platform/fabric/core/generic/committer/committer.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ func (c *Committer) Status(ctx context.Context, txID driver2.TxID) (driver.Valid
206206
}
207207
if vc == driver.Unknown {
208208
// give it a second chance
209-
if c.EnvelopeService.Exists(txID) {
209+
if c.EnvelopeService.Exists(ctx, txID) {
210210
if err := c.extractStoredEnvelopeToVault(ctx, txID); err != nil {
211211
return driver.Unknown, "", errors.WithMessagef(err, "failed to extract stored enveloper for [%s]", txID)
212212
}
@@ -236,7 +236,7 @@ func (c *Committer) DiscardTx(ctx context.Context, txID string, message string)
236236
}
237237
if vc == driver.Unknown {
238238
// give it a second chance
239-
if c.EnvelopeService.Exists(txID) {
239+
if c.EnvelopeService.Exists(ctx, txID) {
240240
if err := c.extractStoredEnvelopeToVault(ctx, txID); err != nil {
241241
return errors.WithMessagef(err, "failed to extract stored enveloper for [%s]", txID)
242242
}
@@ -758,7 +758,7 @@ func (c *Committer) commit(ctx context.Context, txID string, block uint64, index
758758
return err
759759
}
760760
if headerType == int32(common.HeaderType_ENDORSER_TRANSACTION) {
761-
if !c.Vault.RWSExists(ctx, txID) && c.EnvelopeService.Exists(txID) {
761+
if !c.Vault.RWSExists(ctx, txID) && c.EnvelopeService.Exists(ctx, txID) {
762762
// Then match rwsets
763763
span.AddEvent("extract_stored_env_to_vault")
764764
if err := c.extractStoredEnvelopeToVault(ctx, txID); err != nil {
@@ -776,7 +776,7 @@ func (c *Committer) commit(ctx context.Context, txID string, block uint64, index
776776
return errors.WithMessagef(err, "failed to store unknown envelope for [%s]", txID)
777777
}
778778
span.AddEvent("store_env")
779-
if err := c.EnvelopeService.StoreEnvelope(txID, envelopeRaw); err != nil {
779+
if err := c.EnvelopeService.StoreEnvelope(ctx, txID, envelopeRaw); err != nil {
780780
return errors.WithMessagef(err, "failed to store unknown envelope for [%s]", txID)
781781
}
782782
span.AddEvent("get_rwset_from_evn")
@@ -811,7 +811,7 @@ func (c *Committer) commit(ctx context.Context, txID string, block uint64, index
811811

812812
func (c *Committer) commitUnknown(ctx context.Context, txID string, block uint64, indexInBlock uint64, envelope *common.Envelope) error {
813813
// if an envelope exists for the passed txID, then commit it
814-
if c.EnvelopeService.Exists(txID) {
814+
if c.EnvelopeService.Exists(ctx, txID) {
815815
return c.commitStoredEnvelope(ctx, txID, block, indexInBlock)
816816
}
817817

@@ -837,7 +837,7 @@ func (c *Committer) commitUnknown(ctx context.Context, txID string, block uint64
837837
return nil
838838
}
839839

840-
if err := c.EnvelopeService.StoreEnvelope(txID, envelopeRaw); err != nil {
840+
if err := c.EnvelopeService.StoreEnvelope(ctx, txID, envelopeRaw); err != nil {
841841
return errors.WithMessagef(err, "failed to store unknown envelope for [%s]", txID)
842842
}
843843
rws, _, err := c.RWSetLoaderService.GetRWSetFromEvn(ctx, txID)

platform/fabric/core/generic/committer/endorsertx.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func (c *Committer) DiscardEndorserTransaction(ctx context.Context, txID string,
152152
}
153153
if ok {
154154
// so, we must remember that this transaction was discarded
155-
if err := c.EnvelopeService.StoreEnvelope(txID, envRaw); err != nil {
155+
if err := c.EnvelopeService.StoreEnvelope(ctx, txID, envRaw); err != nil {
156156
return errors.WithMessagef(err, "failed to store unknown envelope for [%s]", txID)
157157
}
158158
rws, _, err := c.RWSetLoaderService.GetRWSetFromEvn(ctx, txID)

platform/fabric/core/generic/rwset/loader.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,11 @@ func (c *Loader) GetRWSetFromEvn(ctx context.Context, txID driver2.TxID) (driver
6161
span.AddEvent("start_get_rwset_from_evn")
6262
defer span.AddEvent("end_get_rwset_from_evn")
6363

64-
if !c.EnvelopeService.Exists(txID) {
64+
if !c.EnvelopeService.Exists(ctx, txID) {
6565
return nil, nil, errors.Errorf("envelope does not exists for [%s]", txID)
6666
}
6767

68-
rawEnv, err := c.EnvelopeService.LoadEnvelope(txID)
68+
rawEnv, err := c.EnvelopeService.LoadEnvelope(ctx, txID)
6969
if err != nil {
7070
return nil, nil, errors.Wrapf(err, "cannot load envelope [%s]", txID)
7171
}
@@ -98,11 +98,11 @@ func (c *Loader) GetRWSetFromETx(ctx context.Context, txID driver2.TxID) (driver
9898
span.AddEvent("start_get_rwset_from_etx")
9999
defer span.AddEvent("end_get_rwset_from_etx")
100100

101-
if !c.TransactionService.Exists(txID) {
101+
if !c.TransactionService.Exists(ctx, txID) {
102102
return nil, nil, errors.Errorf("transaction does not exists for [%s]", txID)
103103
}
104104

105-
raw, err := c.TransactionService.LoadTransaction(txID)
105+
raw, err := c.TransactionService.LoadTransaction(ctx, txID)
106106
if err != nil {
107107
return nil, nil, errors.Wrapf(err, "cannot load etx [%s]", txID)
108108
}

platform/fabric/core/generic/rwset/processor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ func (r *processorManager) ProcessByID(ctx context.Context, channel string, txID
7070
var rws driver.RWSet
7171
var tx driver.ProcessTransaction
7272
switch {
73-
case ch.EnvelopeService().Exists(txID):
73+
case ch.EnvelopeService().Exists(ctx, txID):
7474
rws, tx, err = ch.RWSetLoader().GetRWSetFromEvn(ctx, txID)
75-
case ch.TransactionService().Exists(txID):
75+
case ch.TransactionService().Exists(ctx, txID):
7676
rws, tx, err = ch.RWSetLoader().GetRWSetFromETx(ctx, txID)
7777
default:
7878
logger.Debugf("no entry found for [%s,%s]", channel, txID)

platform/fabric/core/generic/transaction/services.go

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0
77
package transaction
88

99
import (
10+
"context"
11+
1012
"github.com/hyperledger-labs/fabric-smart-client/pkg/utils/proto"
1113
driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
1214
"github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging"
@@ -26,17 +28,17 @@ func NewMetadataService(metadataKVS driver.MetadataStore, network string, channe
2628
return &mds{metadataKVS: metadataKVS, key: keyMapper(network, channel)}
2729
}
2830

29-
func (s *mds) Exists(txid string) bool {
30-
ok, _ := s.metadataKVS.ExistMetadata(s.key(txid))
31+
func (s *mds) Exists(ctx context.Context, txid string) bool {
32+
ok, _ := s.metadataKVS.ExistMetadata(ctx, s.key(txid))
3133
return ok
3234
}
3335

34-
func (s *mds) StoreTransient(txid string, transientMap driver.TransientMap) error {
35-
return s.metadataKVS.PutMetadata(s.key(txid), transientMap)
36+
func (s *mds) StoreTransient(ctx context.Context, txid string, transientMap driver.TransientMap) error {
37+
return s.metadataKVS.PutMetadata(ctx, s.key(txid), transientMap)
3638
}
3739

38-
func (s *mds) LoadTransient(txid string) (driver.TransientMap, error) {
39-
return s.metadataKVS.GetMetadata(s.key(txid))
40+
func (s *mds) LoadTransient(ctx context.Context, txid string) (driver.TransientMap, error) {
41+
return s.metadataKVS.GetMetadata(ctx, s.key(txid))
4042
}
4143

4244
type envs struct {
@@ -48,28 +50,28 @@ func NewEnvelopeService(envelopeKVS driver.EnvelopeStore, network string, channe
4850
return &envs{envelopeKVS: envelopeKVS, key: keyMapper(network, channel)}
4951
}
5052

51-
func (s *envs) Exists(txid string) bool {
52-
ok, _ := s.envelopeKVS.ExistsEnvelope(s.key(txid))
53+
func (s *envs) Exists(ctx context.Context, txid string) bool {
54+
ok, _ := s.envelopeKVS.ExistsEnvelope(ctx, s.key(txid))
5355
return ok
5456
}
5557

56-
func (s *envs) StoreEnvelope(txID string, env interface{}) error {
58+
func (s *envs) StoreEnvelope(ctx context.Context, txID string, env interface{}) error {
5759
switch e := env.(type) {
5860
case []byte:
59-
return s.envelopeKVS.PutEnvelope(s.key(txID), e)
61+
return s.envelopeKVS.PutEnvelope(ctx, s.key(txID), e)
6062
case *common.Envelope:
6163
envBytes, err := proto.Marshal(e)
6264
if err != nil {
6365
return errors.WithMessagef(err, "failed marshalling envelop for tx [%s]", txID)
6466
}
65-
return s.envelopeKVS.PutEnvelope(s.key(txID), envBytes)
67+
return s.envelopeKVS.PutEnvelope(ctx, s.key(txID), envBytes)
6668
default:
6769
return errors.Errorf("invalid env, expected []byte or *common.Envelope, got [%T]", env)
6870
}
6971
}
7072

71-
func (s *envs) LoadEnvelope(txid string) ([]byte, error) {
72-
return s.envelopeKVS.GetEnvelope(s.key(txid))
73+
func (s *envs) LoadEnvelope(ctx context.Context, txid string) ([]byte, error) {
74+
return s.envelopeKVS.GetEnvelope(ctx, s.key(txid))
7375
}
7476

7577
type ets struct {
@@ -81,17 +83,17 @@ func NewEndorseTransactionService(endorseTxKVS driver.EndorseTxStore, network st
8183
return &ets{endorseTxKVS: endorseTxKVS, key: keyMapper(network, channel)}
8284
}
8385

84-
func (s *ets) Exists(txid string) bool {
85-
ok, _ := s.endorseTxKVS.ExistsEndorseTx(s.key(txid))
86+
func (s *ets) Exists(ctx context.Context, txid string) bool {
87+
ok, _ := s.endorseTxKVS.ExistsEndorseTx(ctx, s.key(txid))
8688
return ok
8789
}
8890

89-
func (s *ets) StoreTransaction(txid string, env []byte) error {
90-
return s.endorseTxKVS.PutEndorseTx(s.key(txid), env)
91+
func (s *ets) StoreTransaction(ctx context.Context, txid string, env []byte) error {
92+
return s.endorseTxKVS.PutEndorseTx(ctx, s.key(txid), env)
9193
}
9294

93-
func (s *ets) LoadTransaction(txid string) ([]byte, error) {
94-
return s.endorseTxKVS.GetEndorseTx(s.key(txid))
95+
func (s *ets) LoadTransaction(ctx context.Context, txid string) ([]byte, error) {
96+
return s.endorseTxKVS.GetEndorseTx(ctx, s.key(txid))
9597
}
9698

9799
func keyMapper(network, channel string) func(txID driver2.TxID) driver.Key {

platform/fabric/core/generic/transaction/transasction.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ func (t *Transaction) ProposalHasBeenEndorsedBy(party view.Identity) error {
548548

549549
func (t *Transaction) StoreTransient() error {
550550
logger.Debugf("Storing transient for [%s]", t.ID())
551-
return t.channel.MetadataService().StoreTransient(t.ID(), t.TTransient)
551+
return t.channel.MetadataService().StoreTransient(t.ctx, t.ID(), t.TTransient)
552552
}
553553

554554
func (t *Transaction) ProposalResponses() ([]driver.ProposalResponse, error) {

platform/fabric/driver/transaction.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,21 @@ type Proposal interface {
4848
type TransientMap map[string][]byte
4949

5050
type MetadataService interface {
51-
Exists(txid string) bool
52-
StoreTransient(txid string, transientMap TransientMap) error
53-
LoadTransient(txid string) (TransientMap, error)
51+
Exists(ctx context.Context, txid string) bool
52+
StoreTransient(ctx context.Context, txid string, transientMap TransientMap) error
53+
LoadTransient(ctx context.Context, txid string) (TransientMap, error)
5454
}
5555

5656
type EnvelopeService interface {
57-
Exists(txid string) bool
58-
StoreEnvelope(txid string, env interface{}) error
59-
LoadEnvelope(txid string) ([]byte, error)
57+
Exists(ctx context.Context, txid string) bool
58+
StoreEnvelope(ctx context.Context, txid string, env interface{}) error
59+
LoadEnvelope(ctx context.Context, txid string) ([]byte, error)
6060
}
6161

6262
type EndorserTransactionService interface {
63-
Exists(txid string) bool
64-
StoreTransaction(txid string, raw []byte) error
65-
LoadTransaction(txid string) ([]byte, error)
63+
Exists(ctx context.Context, txid string) bool
64+
StoreTransaction(ctx context.Context, txid string, raw []byte) error
65+
LoadTransaction(ctx context.Context, txid string) ([]byte, error)
6666
}
6767

6868
type TransactionFactory interface {

platform/fabric/services/state/rwsetextractor.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0
77
package state
88

99
import (
10+
"context"
11+
1012
"github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
1113
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric"
1214
"github.com/pkg/errors"
@@ -32,7 +34,7 @@ func (r *RWSetProcessor) Process(req fabric.Request, tx fabric.ProcessTransactio
3234
return errors.Wrapf(err, "failed getting channel [%s]", tx.Channel())
3335
}
3436

35-
if !ch.MetadataService().Exists(txID) {
37+
if !ch.MetadataService().Exists(context.Background(), txID) {
3638
logger.Debugf("transaction [%s] is not known to this node, no need to extract state information", txID)
3739
return nil
3840
}
@@ -56,7 +58,7 @@ func (r *RWSetProcessor) Process(req fabric.Request, tx fabric.ProcessTransactio
5658
}
5759

5860
// extrate state info from metadata service
59-
transientMap, err := ch.MetadataService().LoadTransient(txID)
61+
transientMap, err := ch.MetadataService().LoadTransient(context.Background(), txID)
6062
if err != nil {
6163
return err
6264
}

platform/fabric/transaction.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -464,16 +464,16 @@ type MetadataService struct {
464464
ms driver.MetadataService
465465
}
466466

467-
func (m *MetadataService) Exists(txid string) bool {
468-
return m.ms.Exists(txid)
467+
func (m *MetadataService) Exists(ctx context.Context, txid string) bool {
468+
return m.ms.Exists(ctx, txid)
469469
}
470470

471-
func (m *MetadataService) StoreTransient(txid string, transientMap TransientMap) error {
472-
return m.ms.StoreTransient(txid, driver.TransientMap(transientMap))
471+
func (m *MetadataService) StoreTransient(ctx context.Context, txid string, transientMap TransientMap) error {
472+
return m.ms.StoreTransient(ctx, txid, driver.TransientMap(transientMap))
473473
}
474474

475-
func (m *MetadataService) LoadTransient(txid string) (TransientMap, error) {
476-
res, err := m.ms.LoadTransient(txid)
475+
func (m *MetadataService) LoadTransient(ctx context.Context, txid string) (TransientMap, error) {
476+
res, err := m.ms.LoadTransient(ctx, txid)
477477
if err != nil {
478478
return nil, err
479479
}
@@ -484,6 +484,6 @@ type EnvelopeService struct {
484484
ms driver.EnvelopeService
485485
}
486486

487-
func (m *EnvelopeService) Exists(txid string) bool {
488-
return m.ms.Exists(txid)
487+
func (m *EnvelopeService) Exists(ctx context.Context, txid string) bool {
488+
return m.ms.Exists(ctx, txid)
489489
}

platform/fabric/vault.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,15 +165,15 @@ func (c *Vault) InspectRWSet(ctx context.Context, rwset []byte, namespaces ...dr
165165
}
166166

167167
func (c *Vault) StoreEnvelope(ctx context.Context, id driver.TxID, env []byte) error {
168-
return c.envelopeService.StoreEnvelope(id, env)
168+
return c.envelopeService.StoreEnvelope(ctx, id, env)
169169
}
170170

171171
func (c *Vault) StoreTransaction(ctx context.Context, id driver.TxID, raw []byte) error {
172-
return c.transactionService.StoreTransaction(id, raw)
172+
return c.transactionService.StoreTransaction(ctx, id, raw)
173173
}
174174

175175
func (c *Vault) StoreTransient(ctx context.Context, id driver.TxID, tm TransientMap) error {
176-
return c.metadataService.StoreTransient(id, fdriver.TransientMap(tm))
176+
return c.metadataService.StoreTransient(ctx, id, fdriver.TransientMap(tm))
177177
}
178178

179179
// DiscardTx discards the transaction with the given transaction id.

platform/orion/core/generic/committer/committer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (c *committer) CommitTX(ctx context.Context, txID driver2.TxID, bn driver.B
189189
logger.Debugf("tx %s is already invalid", txID)
190190
return errors.Errorf("tx %s is already invalid but it is marked as valid by orion", txID)
191191
case driver.Unknown:
192-
if !c.em.Exists(txID) {
192+
if !c.em.Exists(ctx, txID) {
193193
logger.Debugf("tx %s is unknown, check the transaction filters...", txID)
194194
return c.commitWithFilter(ctx, txID)
195195
}
@@ -260,7 +260,7 @@ func (c *committer) IsFinal(ctx context.Context, txID string) error {
260260
case driver.Busy:
261261
logger.Debugf("Tx [%s] is known", txID)
262262
case driver.Unknown:
263-
if c.em.Exists(txID) {
263+
if c.em.Exists(ctx, txID) {
264264
logger.Debugf("found an envelope for [%s], consider it as known", txID)
265265
skipLoop = true
266266
break

platform/orion/core/generic/rwset/processor.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,12 @@ func (r *processorManager) ProcessByID(ctx context.Context, txID driver2.TxID) e
6262
var tx driver.ProcessTransaction
6363
var err error
6464
switch {
65-
case r.network.EnvelopeService().Exists(txID):
66-
rws, tx, err = r.getTxFromEvn(txID)
65+
case r.network.EnvelopeService().Exists(ctx, txID):
66+
rws, tx, err = r.getTxFromEvn(ctx, txID)
6767
if err != nil {
6868
return errors.Wrapf(err, "failed extraction from envelope [%s]", txID)
6969
}
70-
case r.network.TransactionService().Exists(txID):
70+
case r.network.TransactionService().Exists(ctx, txID):
7171
rws, tx, err = r.getTxFromETx(txID)
7272
if err != nil {
7373
return errors.Wrapf(err, "failed extraction from transaction [%s]", txID)
@@ -111,8 +111,8 @@ func (r *processorManager) SetDefaultProcessor(processor driver.Processor) error
111111
return nil
112112
}
113113

114-
func (r *processorManager) getTxFromEvn(txid string) (driver.RWSet, driver.ProcessTransaction, error) {
115-
rawEnv, err := r.network.EnvelopeService().LoadEnvelope(txid)
114+
func (r *processorManager) getTxFromEvn(ctx context.Context, txid string) (driver.RWSet, driver.ProcessTransaction, error) {
115+
rawEnv, err := r.network.EnvelopeService().LoadEnvelope(ctx, txid)
116116
if err != nil {
117117
return nil, nil, errors.Wrapf(err, "cannot load envelope [%s]", txid)
118118
}

0 commit comments

Comments
 (0)