Skip to content

Commit 9655e82

Browse files
Fix nil exception (#718)
* Fix nil exception when accessing cached tx status. Introduced service that fetches the tx status to distinguish various cases where different inputs are needed. Signed-off-by: Alexandros Filios <[email protected]>
1 parent ef076e2 commit 9655e82

File tree

5 files changed

+206
-154
lines changed

5 files changed

+206
-154
lines changed

token/services/db/retry.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,33 @@ package db
99
import (
1010
"errors"
1111
"time"
12+
13+
logging2 "github.com/hyperledger-labs/fabric-token-sdk/token/services/logging"
1214
)
1315

1416
// RetryRunner receives a function that potentially fails and retries according to the specified strategy
1517
type RetryRunner interface {
1618
Run(func() error) error
1719
}
1820

21+
var ErrMaxRetriesExceeded = errors.New("maximum number of retries exceeded")
22+
1923
const Infinitely = -1
2024

2125
func NewRetryRunner(maxTimes int, delay time.Duration, expBackoff bool) *retryRunner {
2226
return &retryRunner{
2327
delay: delay,
2428
expBackoff: expBackoff,
2529
maxTimes: maxTimes,
30+
logger: logging2.MustGetLogger("retry-runner"),
2631
}
2732
}
2833

2934
type retryRunner struct {
3035
delay time.Duration
3136
expBackoff bool
3237
maxTimes int
38+
logger logging2.Logger
3339
}
3440

3541
func (f *retryRunner) nextDelay() time.Duration {
@@ -40,14 +46,30 @@ func (f *retryRunner) nextDelay() time.Duration {
4046
}
4147

4248
func (f *retryRunner) Run(runner func() error) error {
49+
return f.RunWithErrors(func() (bool, error) {
50+
err := runner()
51+
return err == nil, err
52+
})
53+
}
54+
55+
// RunWithErrors will retry until runner() returns true or until it returns maxTimes false.
56+
// If it returns true, then the error or nil will be returned.
57+
// If it returns maxTimes false, then it will always return an error: either a join of all errors it encountered or a ErrMaxRetriesExceeded.
58+
func (f *retryRunner) RunWithErrors(runner func() (bool, error)) error {
4359
errs := make([]error, 0)
4460
for i := 0; f.maxTimes < 0 || i < f.maxTimes; i++ {
45-
if err := runner(); err != nil {
61+
terminate, err := runner()
62+
if terminate {
63+
return err
64+
}
65+
if err != nil {
4666
errs = append(errs, err)
47-
time.Sleep(f.nextDelay())
48-
} else {
49-
return nil
5067
}
68+
f.logger.Debugf("Will retry iteration [%d] after delay. %d errors returned so far", i+1, len(errs))
69+
time.Sleep(f.nextDelay())
70+
}
71+
if len(errs) == 0 {
72+
return ErrMaxRetriesExceeded
5173
}
5274
return errors.Join(errs...)
5375
}

token/services/network/orion/approval.go

Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/hyperledger-labs/fabric-token-sdk/token"
1717
"github.com/hyperledger-labs/fabric-token-sdk/token/core/common"
1818
"github.com/hyperledger-labs/fabric-token-sdk/token/driver"
19+
"github.com/hyperledger-labs/fabric-token-sdk/token/services/db"
1920
"github.com/hyperledger-labs/fabric-token-sdk/token/services/network"
2021
"github.com/hyperledger-labs/fabric-token-sdk/token/services/network/common/rws/translator"
2122
driver2 "github.com/hyperledger-labs/fabric-token-sdk/token/services/network/driver"
@@ -24,6 +25,7 @@ import (
2425

2526
type TxStatusResponseCache interface {
2627
Get(key string) (*TxStatusResponse, bool)
28+
GetOrLoad(key string, loader func() (*TxStatusResponse, error)) (*TxStatusResponse, bool, error)
2729
Add(key string, value *TxStatusResponse)
2830
}
2931

@@ -153,46 +155,49 @@ func (r *RequestApprovalResponderView) process(context view.Context, request *Ap
153155

154156
// commit
155157
txStatusFetcher := &RequestTxStatusResponderView{dbManager: r.dbManager, statusCache: r.statusCache}
156-
numRetries := 5
157-
sleepDuration := 1 * time.Second
158-
for i := 0; i < numRetries; i++ {
158+
159+
runner := db.NewRetryRunner(5, time.Second, true)
160+
161+
var envelopeRaw []byte
162+
validateErr := runner.RunWithErrors(func() (bool, error) {
159163
span.AddEvent("try_validate")
160-
envelopeRaw, retry, err := r.validate(context, request, validator)
164+
var retry bool
165+
envelopeRaw, retry, err = r.validate(context, request, validator)
166+
if err == nil {
167+
return true, nil
168+
}
169+
170+
if !retry {
171+
logger.Errorf("failed to commit transaction [%s], no more retry after this", err)
172+
return true, errors.Wrapf(err, "failed to commit transaction [%s]", request.TxID)
173+
}
174+
logger.Errorf("failed to commit transaction [%s], retry", err)
175+
// was the transaction committed, by any chance?
176+
span.AddEvent("fetch_tx_status")
177+
status, err := txStatusFetcher.process(context, &TxStatusRequest{
178+
Network: request.Network,
179+
Namespace: request.Namespace,
180+
TxID: request.TxID,
181+
})
161182
if err != nil {
162-
if !retry {
163-
logger.Errorf("failed to commit transaction [%s], no more retry after this [%d]", err, i)
164-
return nil, errors.Wrapf(err, "failed to commit transaction [%s]", request.TxID)
165-
}
166-
logger.Errorf("failed to commit transaction [%s], retry [%d]", err, i)
167-
// was the transaction committed, by any chance?
168-
span.AddEvent("fetch_tx_status")
169-
status, err := txStatusFetcher.process(context, &TxStatusRequest{
170-
Network: request.Network,
171-
Namespace: request.Namespace,
172-
TxID: request.TxID,
173-
})
174-
if err != nil {
175-
logger.Errorf("failed to ask transaction status [%s], retry [%d]", err, i)
176-
}
177-
if status != nil {
178-
if status.Status == network.Valid {
179-
return nil, nil
180-
}
181-
if status.Status == network.Invalid {
182-
break
183-
}
184-
logger.Debugf("transaction [%s] status [%s], retry [%d], wait a bit and resubmit", request.TxID, status, i)
185-
} else {
186-
logger.Errorf("failed to ask transaction status [%s], got a nil answert, retry [%d]", request.TxID, i)
187-
}
188-
time.Sleep(sleepDuration)
189-
sleepDuration = sleepDuration * 2
190-
continue
183+
logger.Errorf("failed to ask transaction status [%s], retry", err)
184+
return false, nil
191185
}
192-
return envelopeRaw, nil
193-
}
194186

195-
return nil, errors.Wrapf(err, "failed to commit transaction [%s]", request.TxID)
187+
if status.Status == network.Valid {
188+
return true, nil
189+
}
190+
if status.Status == network.Invalid {
191+
return true, errors.New("invalid transaction status")
192+
}
193+
logger.Debugf("transaction [%s] status [%s], retry, wait a bit and resubmit", request.TxID, status)
194+
return false, nil
195+
})
196+
197+
if validateErr != nil {
198+
return nil, errors.Wrapf(err, "failed to commit transaction [%s]", request.TxID)
199+
}
200+
return envelopeRaw, nil
196201
}
197202

198203
func (r *RequestApprovalResponderView) validate(context view.Context, request *ApprovalRequest, validator driver.Validator) ([]byte, bool, error) {

token/services/network/orion/broadcast.go

Lines changed: 45 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
view2 "github.com/hyperledger-labs/fabric-smart-client/platform/view"
1515
session2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/session"
1616
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
17+
"github.com/hyperledger-labs/fabric-token-sdk/token/services/db"
1718
"github.com/hyperledger-labs/fabric-token-sdk/token/services/network"
1819
"github.com/hyperledger-labs/fabric-token-sdk/token/services/network/driver"
1920
"github.com/pkg/errors"
@@ -107,73 +108,66 @@ func (r *BroadcastResponderView) Call(context view.Context) (interface{}, error)
107108
return nil, errors.Wrapf(err, "failed to get session manager for [%s]", request.Network)
108109
}
109110

110-
done := false
111-
err = nil
112-
txStatusFetcher := &RequestTxStatusResponderView{dbManager: r.dbManager, statusCache: r.statusCache}
113-
numRetries := 5
114-
sleepDuration := 1 * time.Second
111+
txStatusFetcher := NewStatusFetcher(r.dbManager)
112+
113+
runner := db.NewRetryRunner(5, 1*time.Second, true)
114+
115115
var txID string
116-
for i := 0; i < numRetries; i++ {
116+
broadcastErr := runner.RunWithErrors(func() (bool, error) {
117117
span.AddEvent("try_broadcast")
118-
var err2 error
119-
if _, txID, err2 = r.broadcast(context, sm, request); err2 != nil {
120-
span.RecordError(err2)
121-
logger.Errorf("failed to broadcast to [%s], txID [%s] with err [%s], retry [%d]", sm.CustodianID, txID, err2, i)
122-
if strings.Contains(err2.Error(), "is not valid") {
123-
err = err2
124-
break
125-
}
126-
if len(txID) != 0 {
127-
// was the transaction committed, by any chance?
128-
logger.Errorf("check transaction [%s] status on [%s], retry [%d]", txID, sm.CustodianID, i)
129-
span.AddEvent("fetch_tx_status")
130-
status, err := txStatusFetcher.process(context, &TxStatusRequest{
131-
Network: request.Network,
132-
TxID: txID,
133-
})
134-
if err != nil {
135-
logger.Errorf("failed to ask transaction status [%s][%s], retry [%d]", txID, err, i)
136-
}
137-
if status != nil {
138-
if status.Status == network.Valid {
139-
done = true
140-
break
141-
}
142-
if status.Status == network.Invalid {
143-
break
144-
}
145-
logger.Debugf("transaction [%s] status [%d], retry [%d], wait a bit and resubmit", txID, status, i)
146-
} else {
147-
logger.Errorf("failed to ask transaction status [%s], got a nil answert, retry [%d]", txID, i)
148-
}
149-
}
150-
time.Sleep(sleepDuration)
151-
sleepDuration = sleepDuration * 2
152-
continue
118+
var err error
119+
_, txID, err = r.broadcast(context, sm, request)
120+
if err == nil {
121+
return true, nil
153122
}
154-
done = true
155-
break
156-
}
157-
var broadcastError string
158-
if !done {
159-
broadcastError = fmt.Sprintf("failed to broadcast to [%s] with err [%s]", sm.CustodianID, err)
160-
}
123+
124+
span.RecordError(err)
125+
logger.Errorf("failed to broadcast to [%s], txID [%s] with err [%s], retry", sm.CustodianID, txID, err)
126+
if strings.Contains(err.Error(), "is not valid") {
127+
return true, err
128+
}
129+
if len(txID) == 0 {
130+
return false, nil
131+
}
132+
133+
// was the transaction committed, by any chance?
134+
logger.Errorf("check transaction [%s] status on [%s], retry", txID, sm.CustodianID)
135+
span.AddEvent("fetch_tx_status")
136+
137+
status, err := txStatusFetcher.FetchCode(request.Network, txID)
138+
if err != nil {
139+
logger.Errorf("failed to ask transaction status [%s][%s]", txID, err)
140+
return false, nil
141+
}
142+
if status == network.Valid {
143+
return true, nil
144+
}
145+
if status == network.Invalid {
146+
return true, errors.New("invalid transaction status")
147+
}
148+
logger.Debugf("transaction [%s] status [%d], retry, wait a bit and resubmit", txID, status)
149+
return false, nil
150+
})
161151

162152
// update cache
163153
if len(txID) != 0 {
164154
response, ok := r.statusCache.Get(txID)
165155
if ok {
166-
if len(broadcastError) == 0 {
156+
if broadcastErr == nil {
167157
response.Status = driver.Valid
168158
} else {
169159
response.Status = driver.Invalid
170160
}
161+
r.statusCache.Add(txID, response)
171162
}
172-
r.statusCache.Add(txID, response)
173163
}
174164

175165
// send back answer
176-
if err := session.SendWithContext(context.Context(), &BroadcastResponse{Err: broadcastError}); err != nil {
166+
broadcastResponse := &BroadcastResponse{}
167+
if broadcastErr != nil {
168+
broadcastResponse.Err = fmt.Sprintf("failed to broadcast to [%s] with err [%s]", sm.CustodianID, broadcastErr.Error())
169+
}
170+
if err := session.SendWithContext(context.Context(), broadcastResponse); err != nil {
177171
return nil, errors.Wrapf(err, "failed to send response")
178172
}
179173
return nil, nil
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package orion
8+
9+
import (
10+
"encoding/base64"
11+
12+
errors2 "github.com/hyperledger-labs/fabric-smart-client/pkg/utils/errors"
13+
"github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
14+
"github.com/hyperledger-labs/fabric-smart-client/platform/orion"
15+
driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/orion/driver"
16+
"github.com/hyperledger-labs/fabric-token-sdk/token/services/network/common/rws/keys"
17+
"github.com/hyperledger-labs/orion-sdk-go/pkg/bcdb"
18+
"github.com/pkg/errors"
19+
"go.uber.org/zap/zapcore"
20+
)
21+
22+
type StatusFetcher struct {
23+
dbManager *DBManager
24+
}
25+
26+
func NewStatusFetcher(dbManager *DBManager) *StatusFetcher {
27+
return &StatusFetcher{dbManager: dbManager}
28+
}
29+
30+
func (r *StatusFetcher) FetchStatus(network, namespace string, txID driver.TxID) (*TxStatusResponse, error) {
31+
oSession, code, err := r.fetchCode(network, txID)
32+
if err != nil {
33+
return nil, err
34+
}
35+
36+
// fetch token request reference
37+
qe, err := oSession.QueryExecutor(namespace)
38+
if err != nil {
39+
return nil, errors.Wrapf(err, "failed to get query executor [%s] for orion network [%s]", txID, network)
40+
}
41+
key, err := keys.CreateTokenRequestKey(txID)
42+
if err != nil {
43+
return nil, errors.Errorf("can't create for token request '%s'", txID)
44+
}
45+
trRef, err := qe.Get(orionKey(key))
46+
if err != nil {
47+
return nil, errors.Wrapf(err, "failed to get token request reference [%s] for orion network [%s]", txID, network)
48+
}
49+
if logger.IsEnabledFor(zapcore.DebugLevel) {
50+
logger.Debugf("retrieved token request hash for [%s][%s]:[%s]", key, txID, base64.StdEncoding.EncodeToString(trRef))
51+
}
52+
return &TxStatusResponse{
53+
TokenRequestReference: trRef,
54+
Status: code,
55+
}, nil
56+
}
57+
58+
func (r *StatusFetcher) FetchCode(network string, txID driver.TxID) (driver2.ValidationCode, error) {
59+
_, code, err := r.fetchCode(network, txID)
60+
return code, err
61+
}
62+
63+
func (r *StatusFetcher) fetchCode(network string, txID driver.TxID) (*orion.Session, driver2.ValidationCode, error) {
64+
sm, err := r.dbManager.GetSessionManager(network)
65+
if err != nil {
66+
return nil, 0, errors.Wrapf(err, "failed to get session manager for network [%s]", network)
67+
}
68+
oSession, err := sm.GetSession()
69+
if err != nil {
70+
return nil, 0, errors.Wrapf(err, "failed to create session to orion network [%s]", network)
71+
}
72+
ledger, err := oSession.Ledger()
73+
if err != nil {
74+
return nil, 0, errors.Wrapf(err, "failed to get ledger for orion network [%s]", network)
75+
}
76+
tx, err := ledger.GetTransactionByID(txID)
77+
if err != nil {
78+
if errors2.HasType(err, &bcdb.ErrorNotFound{}) {
79+
return oSession, driver2.Unknown, nil
80+
}
81+
return nil, 0, errors.Wrapf(err, "failed to get transaction [%s] for orion network [%s]", txID, network)
82+
}
83+
if tx.ValidationCode() == orion.VALID {
84+
return oSession, driver2.Valid, nil
85+
}
86+
return oSession, driver2.Invalid, nil
87+
}

0 commit comments

Comments
 (0)