diff --git a/Makefile b/Makefile index fec27e218..507b92cf2 100755 --- a/Makefile +++ b/Makefile @@ -30,8 +30,7 @@ download-fabric: include $(TOP)/checks.mk .PHONY: unit-tests -unit-tests: - +unit-tests: testing-docker-images @export FAB_BINS=$(FAB_BINS); go test -cover $(shell go list ./... | grep -v '/integration/') cd integration/nwo/; go test -cover ./... diff --git a/platform/common/core/generic/vault/helpers_test.go b/platform/common/core/generic/vault/helpers_test.go new file mode 100644 index 000000000..3765a40f9 --- /dev/null +++ b/platform/common/core/generic/vault/helpers_test.go @@ -0,0 +1,1004 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package vault_test + +import ( + "testing" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/core" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/core/generic/vault" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/core/generic/vault/txidstore" + driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver" + "github.com/hyperledger/fabric-protos-go/ledger/rwset" + "github.com/hyperledger/fabric-protos-go/ledger/rwset/kvrwset" + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil" + "github.com/stretchr/testify/assert" +) + +type vc int + +const ( + _ vc = iota + valid + invalid + busy + unknown +) + +type vcProvider struct{} + +func (p *vcProvider) ToInt32(code vc) int32 { return int32(code) } +func (p *vcProvider) FromInt32(code int32) vc { + return vc(code) +} +func (p *vcProvider) Unknown() vc { return unknown } +func (p *vcProvider) Busy() vc { return busy } +func (p *vcProvider) Valid() vc { return valid } +func (p *vcProvider) Invalid() vc { return invalid } + +func newInterceptor(qe vault.QueryExecutor, txidStore vault.TXIDStoreReader[vc], txid core.TxID) vault.TxInterceptor { + return vault.NewInterceptor[vc](qe, txidStore, txid, &vcProvider{}) +} + +var SingleDBCases = []struct { + Name string + Fn func(*testing.T, driver.VersionedPersistence) +}{ + {"Merge", TTestMerge}, + {"Inspector", TTestInspector}, + {"InterceptorErr", TTestInterceptorErr}, + {"InterceptorConcurrency", TTestInterceptorConcurrency}, + {"QueryExecutor", TTestQueryExecutor}, + {"ShardLikeCommit", TTestShardLikeCommit}, + {"VaultErr", TTestVaultErr}, +} + +var DoubleDBCases = []struct { + Name string + Fn func(*testing.T, driver.VersionedPersistence, driver.VersionedPersistence) +}{ + {"Run", TTestRun}, +} + +func TTestInterceptorErr(t *testing.T, ddb driver.VersionedPersistence) { + tidstore, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(ddb), &vcProvider{}) + assert.NoError(t, err) + vault1 := vault.New[vc](ddb, tidstore, &vcProvider{}, newInterceptor) + rws, err := vault1.NewRWSet("txid") + assert.NoError(t, err) + + _, err = rws.GetState("foo", "bar", 15) + assert.EqualError(t, err, "invalid get option [15]") + _, err = rws.GetState("foo", "bar", 15, 16) + assert.EqualError(t, err, "a single getoption is supported, 2 provided") + + _, err = rws.GetStateMetadata("foo", "bar", 15) + assert.EqualError(t, err, "invalid get option [15]") + _, err = rws.GetStateMetadata("foo", "bar", 15, 16) + assert.EqualError(t, err, "a single getoption is supported, 2 provided") + + rws.Done() + + _, err = rws.GetStateMetadata("foo", "bar") + assert.EqualError(t, err, "this instance was closed") + _, err = rws.GetState("foo", "bar") + assert.EqualError(t, err, "this instance was closed") + err = rws.SetState("foo", "bar", []byte("whocares")) + assert.EqualError(t, err, "this instance was closed") + err = rws.SetStateMetadata("foo", "bar", nil) + assert.EqualError(t, err, "this instance was closed") + err = rws.DeleteState("foo", "bar") + assert.EqualError(t, err, "this instance was closed") + _, _, err = rws.GetReadAt("foo", 12312) + assert.EqualError(t, err, "this instance was closed") + _, _, err = rws.GetWriteAt("foo", 12312) + assert.EqualError(t, err, "this instance was closed") + err = rws.AppendRWSet([]byte("foo")) + assert.EqualError(t, err, "this instance was closed") + + rws, err = vault1.NewRWSet("validtxid") + assert.NoError(t, err) + rws.Done() + err = vault1.CommitTX("validtxid", 2, 3) + assert.NoError(t, err) + rws, err = vault1.NewRWSet("validtxid") + assert.NoError(t, err) + err = rws.IsValid() + assert.EqualError(t, err, "duplicate txid validtxid") +} + +func TTestInterceptorConcurrency(t *testing.T, ddb driver.VersionedPersistence) { + ns := "namespace" + k := "key1" + mk := "meyakey1" + + tidstore, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(ddb), &vcProvider{}) + assert.NoError(t, err) + vault1 := vault.New[vc](ddb, tidstore, &vcProvider{}, newInterceptor) + rws, err := vault1.NewRWSet("txid") + assert.NoError(t, err) + + v, err := rws.GetState(ns, k) + assert.NoError(t, err) + assert.Nil(t, v) + + err = ddb.BeginUpdate() + assert.NoError(t, err) + err = ddb.SetState(ns, k, []byte("val"), 35, 1) + assert.NoError(t, err) + err = ddb.Commit() + assert.NoError(t, err) + + _, _, err = rws.GetReadAt(ns, 0) + assert.EqualError(t, err, "invalid read [namespace:key1]: previous value returned at version 0:0, current value at version 35:1") + + _, err = rws.GetState(ns, k) + assert.EqualError(t, err, "invalid read [namespace:key1]: previous value returned at version 0:0, current value at version 35:1") + + mv, err := rws.GetStateMetadata(ns, mk) + assert.NoError(t, err) + assert.Nil(t, mv) + + err = ddb.BeginUpdate() + assert.NoError(t, err) + err = ddb.SetStateMetadata(ns, mk, map[string][]byte{"k": []byte("v")}, 36, 2) + assert.NoError(t, err) + err = ddb.Commit() + assert.NoError(t, err) + + _, err = rws.GetStateMetadata(ns, mk) + assert.EqualError(t, err, "invalid metadata read: previous value returned at version 0:0, current value at version 36:2") +} + +func TTestQueryExecutor(t *testing.T, ddb driver.VersionedPersistence) { + ns := "namespace" + + tidstore, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(ddb), &vcProvider{}) + assert.NoError(t, err) + vault := vault.New[vc](ddb, tidstore, &vcProvider{}, newInterceptor) + + err = ddb.BeginUpdate() + assert.NoError(t, err) + err = ddb.SetState(ns, "k2", []byte("k2_value"), 35, 1) + assert.NoError(t, err) + err = ddb.SetState(ns, "k3", []byte("k3_value"), 35, 2) + assert.NoError(t, err) + err = ddb.SetState(ns, "k1", []byte("k1_value"), 35, 3) + assert.NoError(t, err) + err = ddb.SetState(ns, "k111", []byte("k111_value"), 35, 4) + assert.NoError(t, err) + err = ddb.Commit() + assert.NoError(t, err) + + qe, err := vault.NewQueryExecutor() + assert.NoError(t, err) + defer qe.Done() + + v, err := qe.GetState(ns, "k1") + assert.NoError(t, err) + assert.Equal(t, []byte("k1_value"), v) + v, err = qe.GetState(ns, "barfobarfs") + assert.NoError(t, err) + assert.Equal(t, []byte(nil), v) + + itr, err := qe.GetStateRangeScanIterator(ns, "", "") + defer itr.Close() + assert.NoError(t, err) + + res := make([]driver.VersionedRead, 0, 4) + for n, err := itr.Next(); n != nil; n, err = itr.Next() { + assert.NoError(t, err) + res = append(res, *n) + } + assert.Len(t, res, 4) + assert.Equal(t, []driver.VersionedRead{ + {Key: "k1", Raw: []byte("k1_value"), Block: 35, IndexInBlock: 3}, + {Key: "k111", Raw: []byte("k111_value"), Block: 35, IndexInBlock: 4}, + {Key: "k2", Raw: []byte("k2_value"), Block: 35, IndexInBlock: 1}, + {Key: "k3", Raw: []byte("k3_value"), Block: 35, IndexInBlock: 2}, + }, res) + + itr, err = ddb.GetStateRangeScanIterator(ns, "k1", "k3") + defer itr.Close() + assert.NoError(t, err) + + res = make([]driver.VersionedRead, 0, 3) + for n, err := itr.Next(); n != nil; n, err = itr.Next() { + assert.NoError(t, err) + res = append(res, *n) + } + assert.Len(t, res, 3) + assert.Equal(t, []driver.VersionedRead{ + {Key: "k1", Raw: []byte("k1_value"), Block: 35, IndexInBlock: 3}, + {Key: "k111", Raw: []byte("k111_value"), Block: 35, IndexInBlock: 4}, + {Key: "k2", Raw: []byte("k2_value"), Block: 35, IndexInBlock: 1}, + }, res) + + itr, err = ddb.GetStateSetIterator(ns, "k1", "k2", "k111") + defer itr.Close() + assert.NoError(t, err) + + res = make([]driver.VersionedRead, 0, 3) + for n, err := itr.Next(); n != nil; n, err = itr.Next() { + assert.NoError(t, err) + res = append(res, *n) + } + assert.Len(t, res, 3) + assert.Equal(t, []driver.VersionedRead{ + {Key: "k1", Raw: []byte("k1_value"), Block: 35, IndexInBlock: 3}, + {Key: "k2", Raw: []byte("k2_value"), Block: 35, IndexInBlock: 1}, + {Key: "k111", Raw: []byte("k111_value"), Block: 35, IndexInBlock: 4}, + }, res) + + itr, err = ddb.GetStateSetIterator(ns, "k1", "k5") + defer itr.Close() + assert.NoError(t, err) + + res = make([]driver.VersionedRead, 0, 2) + for n, err := itr.Next(); n != nil; n, err = itr.Next() { + assert.NoError(t, err) + res = append(res, *n) + } + assert.Len(t, res, 2) + assert.Equal(t, []driver.VersionedRead{ + {Key: "k1", Raw: []byte("k1_value"), Block: 35, IndexInBlock: 3}, + {Key: "k5"}, + }, res) +} + +func TTestShardLikeCommit(t *testing.T, ddb driver.VersionedPersistence) { + ns := "namespace" + k1 := "key1" + k2 := "key2" + + // Populate the DB with some data at some height + err := ddb.BeginUpdate() + assert.NoError(t, err) + err = ddb.SetState(ns, k1, []byte("k1val"), 35, 1) + assert.NoError(t, err) + err = ddb.SetState(ns, k2, []byte("k2val"), 37, 3) + assert.NoError(t, err) + err = ddb.Commit() + assert.NoError(t, err) + + tidstore, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(ddb), &vcProvider{}) + assert.NoError(t, err) + vault := vault.New[vc](ddb, tidstore, &vcProvider{}, newInterceptor) + + // SCENARIO 1: there is a read conflict in the proposed rwset + // create the read-write set + rwsb := rwsetutil.NewRWSetBuilder() + rwsb.AddToReadSet(ns, k1, rwsetutil.NewVersion(&kvrwset.Version{BlockNum: 35, TxNum: 1})) + rwsb.AddToReadSet(ns, k2, rwsetutil.NewVersion(&kvrwset.Version{BlockNum: 37, TxNum: 2})) + rwsb.AddToWriteSet(ns, k1, []byte("k1FromTxidInvalid")) + rwsb.AddToWriteSet(ns, k2, []byte("k2FromTxidInvalid")) + simRes, err := rwsb.GetTxSimulationResults() + assert.NoError(t, err) + rwsBytes, err := simRes.GetPubSimulationBytes() + assert.NoError(t, err) + + // give it to the kvs and check whether it's valid - it won't be + rwset, err := vault.GetRWSet("txid-invalid", rwsBytes) + assert.NoError(t, err) + err = rwset.IsValid() + assert.EqualError(t, err, "invalid read: vault at version namespace:key2 37:3, read-write set at version 37:2") + + // close the read-write set, even in case of error + rwset.Done() + + // check the status, it should be busy + code, _, err := vault.Status("txid-invalid") + assert.NoError(t, err) + assert.Equal(t, busy, code) + + // now in case of error we won't commit the read-write set, so we should discard it + err = vault.DiscardTx("txid-invalid", "") + assert.NoError(t, err) + + // check the status, it should be invalid + code, _, err = vault.Status("txid-invalid") + assert.NoError(t, err) + assert.Equal(t, invalid, code) + + // SCENARIO 2: there is no read conflict + // create the read-write set + rwsb = rwsetutil.NewRWSetBuilder() + rwsb.AddToReadSet(ns, k1, rwsetutil.NewVersion(&kvrwset.Version{BlockNum: 35, TxNum: 1})) + rwsb.AddToReadSet(ns, k2, rwsetutil.NewVersion(&kvrwset.Version{BlockNum: 37, TxNum: 3})) + rwsb.AddToWriteSet(ns, k1, []byte("k1FromTxidValid")) + rwsb.AddToWriteSet(ns, k2, []byte("k2FromTxidValid")) + + simRes, err = rwsb.GetTxSimulationResults() + assert.NoError(t, err) + rwsBytes, err = simRes.GetPubSimulationBytes() + assert.NoError(t, err) + + // give it to the kvs and check whether it's valid - it will be + rwset, err = vault.GetRWSet("txid-valid", rwsBytes) + assert.NoError(t, err) + err = rwset.IsValid() + assert.NoError(t, err) + + // close the read-write set + rwset.Done() + + // presumably the cross-shard protocol continues... + + // check the status, it should be busy + code, _, err = vault.Status("txid-valid") + assert.NoError(t, err) + assert.Equal(t, busy, code) + + // we're now asked to really commit + err = vault.CommitTX("txid-valid", 38, 10) + assert.NoError(t, err) + + // check the status, it should be valid + code, _, err = vault.Status("txid-valid") + assert.NoError(t, err) + assert.Equal(t, valid, code) + + // check the content of the kvs after that + v, b, tx, err := ddb.GetState(ns, k1) + assert.NoError(t, err) + assert.Equal(t, []byte("k1FromTxidValid"), v) + assert.Equal(t, uint64(38), b) + assert.Equal(t, uint64(10), tx) + + v, b, tx, err = ddb.GetState(ns, k2) + assert.NoError(t, err) + assert.Equal(t, []byte("k2FromTxidValid"), v) + assert.Equal(t, uint64(38), b) + assert.Equal(t, uint64(10), tx) + + // all Interceptors should be gone + assert.Len(t, vault.Interceptors, 0) +} + +func TTestVaultErr(t *testing.T, ddb driver.VersionedPersistence) { + tidstore, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(ddb), &vcProvider{}) + assert.NoError(t, err) + vault1 := vault.New[vc](ddb, tidstore, &vcProvider{}, newInterceptor) + err = vault1.CommitTX("non-existent", 0, 0) + assert.ErrorContains(t, err, "read-write set for txid non-existent could not be found") + err = vault1.DiscardTx("non-existent", "") + assert.EqualError(t, err, "read-write set for txid non-existent could not be found") + + ncrwset, err := vault1.NewRWSet("not-closed") + assert.NoError(t, err) + _, err = vault1.NewRWSet("not-closed") + assert.EqualError(t, err, "duplicate read-write set for txid not-closed") + _, err = vault1.GetRWSet("not-closed", []byte(nil)) + assert.EqualError(t, err, "programming error: previous read-write set for not-closed has not been closed") + err = vault1.CommitTX("not-closed", 0, 0) + assert.ErrorContains(t, err, "attempted to retrieve read-write set for not-closed when done has not been called") + err = vault1.DiscardTx("not-closed", "") + assert.EqualError(t, err, "attempted to retrieve read-write set for not-closed when done has not been called") + + // as a sanity-check we close it now and will be able to discard it + ncrwset.Done() + err = vault1.DiscardTx("not-closed", "pineapple") + assert.NoError(t, err) + vc, message, err := vault1.Status("not-closed") + assert.NoError(t, err) + assert.Equal(t, "pineapple", message) + assert.Equal(t, invalid, vc) + + _, err = vault1.GetRWSet("bogus", []byte("barf")) + assert.Contains(t, err.Error(), "cannot parse invalid wire-format data") + + txRWSet := &rwset.TxReadWriteSet{ + NsRwset: []*rwset.NsReadWriteSet{ + {Rwset: []byte("barf")}, + }, + } + rwsb, err := proto.Marshal(txRWSet) + assert.NoError(t, err) + + _, err = vault1.GetRWSet("bogus", rwsb) + assert.Contains(t, err.Error(), "cannot parse invalid wire-format data") + + code, _, err := vault1.Status("unknown-txid") + assert.NoError(t, err) + assert.Equal(t, unknown, code) +} + +func TTestMerge(t *testing.T, ddb driver.VersionedPersistence) { + ns := "namespace" + k1 := "key1" + k2 := "key2" + k3 := "key3" + txid := "txid" + ne1Key := "notexist1" + ne2Key := "notexist2" + + // create DB and kvs + tidstore, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(ddb), &vcProvider{}) + assert.NoError(t, err) + vault2 := vault.New[vc](ddb, tidstore, &vcProvider{}, newInterceptor) + err = ddb.BeginUpdate() + assert.NoError(t, err) + err = ddb.SetState(ns, k1, []byte("v1"), 35, 1) + assert.NoError(t, err) + err = ddb.Commit() + assert.NoError(t, err) + + rws, err := vault2.NewRWSet(txid) + assert.NoError(t, err) + v, err := rws.GetState(ns, k1) + assert.NoError(t, err) + assert.Equal(t, []byte("v1"), v) + v, err = rws.GetState(ns, ne1Key) + assert.NoError(t, err) + assert.Equal(t, []byte(nil), v) + err = rws.SetState(ns, k2, []byte("v2")) + assert.NoError(t, err) + err = rws.SetStateMetadata(ns, k3, map[string][]byte{"k3": []byte("v3")}) + assert.NoError(t, err) + + rwsb := rwsetutil.NewRWSetBuilder() + rwsb.AddToReadSet(ns, k1, rwsetutil.NewVersion(&kvrwset.Version{BlockNum: 35, TxNum: 1})) + rwsb.AddToReadSet(ns, ne2Key, nil) + rwsb.AddToWriteSet(ns, k1, []byte("newv1")) + rwsb.AddToMetadataWriteSet(ns, k1, map[string][]byte{"k1": []byte("v1")}) + simRes, err := rwsb.GetTxSimulationResults() + assert.NoError(t, err) + rwsBytes, err := simRes.GetPubSimulationBytes() + assert.NoError(t, err) + + err = rws.AppendRWSet(rwsBytes) + assert.NoError(t, err) + assert.Equal(t, vault.NamespaceKeyedMetaWrites{ + "namespace": { + "key1": {"k1": []byte("v1")}, + "key3": {"k3": []byte("v3")}, + }, + }, rws.RWs().MetaWrites) + assert.Equal(t, vault.Writes{"namespace": { + "key1": []byte("newv1"), + "key2": []byte("v2"), + }}, rws.RWs().Writes) + assert.Equal(t, vault.Reads{ + "namespace": { + "key1": {Block: 35, TxNum: 1}, + "notexist1": {Block: 0, TxNum: 0}, + "notexist2": {Block: 0, TxNum: 0}, + }, + }, rws.RWs().Reads) + + rwsb = rwsetutil.NewRWSetBuilder() + rwsb.AddToReadSet(ns, k1, rwsetutil.NewVersion(&kvrwset.Version{BlockNum: 36, TxNum: 1})) + simRes, err = rwsb.GetTxSimulationResults() + assert.NoError(t, err) + rwsBytes, err = simRes.GetPubSimulationBytes() + assert.NoError(t, err) + + err = rws.AppendRWSet(rwsBytes) + assert.EqualError(t, err, "invalid read [namespace:key1]: previous value returned at version 35:1, current value at version 35:1") + + rwsb = rwsetutil.NewRWSetBuilder() + rwsb.AddToWriteSet(ns, k2, []byte("v2")) + simRes, err = rwsb.GetTxSimulationResults() + assert.NoError(t, err) + rwsBytes, err = simRes.GetPubSimulationBytes() + assert.NoError(t, err) + + err = rws.AppendRWSet(rwsBytes) + assert.EqualError(t, err, "duplicate write entry for key namespace:key2") + + err = rws.AppendRWSet([]byte("barf")) + assert.Contains(t, err.Error(), "cannot parse invalid wire-format data") + + txRWSet := &rwset.TxReadWriteSet{ + NsRwset: []*rwset.NsReadWriteSet{ + {Rwset: []byte("barf")}, + }, + } + rwsBytes, err = proto.Marshal(txRWSet) + assert.NoError(t, err) + + err = rws.AppendRWSet(rwsBytes) + assert.Contains(t, err.Error(), "cannot parse invalid wire-format data") + + rwsb = rwsetutil.NewRWSetBuilder() + rwsb.AddToMetadataWriteSet(ns, k3, map[string][]byte{"k": []byte("v")}) + simRes, err = rwsb.GetTxSimulationResults() + assert.NoError(t, err) + rwsBytes, err = simRes.GetPubSimulationBytes() + assert.NoError(t, err) + + err = rws.AppendRWSet(rwsBytes) + assert.EqualError(t, err, "duplicate metadata write entry for key namespace:key3") +} + +func TTestInspector(t *testing.T, ddb driver.VersionedPersistence) { + txid := "txid" + ns := "ns" + k1 := "k1" + k2 := "k2" + + // create DB and kvs + tidstore, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(ddb), &vcProvider{}) + assert.NoError(t, err) + vault := vault.New[vc](ddb, tidstore, &vcProvider{}, newInterceptor) + err = ddb.BeginUpdate() + assert.NoError(t, err) + err = ddb.SetState(ns, k1, []byte("v1"), 35, 1) + assert.NoError(t, err) + err = ddb.Commit() + assert.NoError(t, err) + + rws, err := vault.NewRWSet(txid) + assert.NoError(t, err) + v, err := rws.GetState(ns, k1) + assert.NoError(t, err) + assert.Equal(t, []byte("v1"), v) + err = rws.SetState(ns, k2, []byte("v2")) + assert.NoError(t, err) + rws.Done() + + b, err := rws.Bytes() + assert.NoError(t, err) + + i, err := vault.InspectRWSet(b) + assert.NoError(t, err) + assert.NoError(t, i.IsValid()) + + // the ephemeral rwset can "see" its own writes + v, err = i.GetState(ns, k2) + assert.NoError(t, err) + assert.Equal(t, []byte("v2"), v) + + k, v, err := i.GetWriteAt(ns, 0) + assert.NoError(t, err) + assert.Equal(t, k2, k) + assert.Equal(t, []byte("v2"), v) + + k, v, err = i.GetReadAt(ns, 0) + assert.NoError(t, err) + assert.Equal(t, k1, k) + assert.Equal(t, []byte(nil), v) + + assert.Equal(t, 1, i.NumReads(ns)) + assert.Equal(t, 1, i.NumWrites(ns)) + assert.Equal(t, []string{"ns"}, i.Namespaces()) + + i.Done() + + // check filtering + i, err = vault.InspectRWSet(b, "pineapple") + assert.NoError(t, err) + assert.NoError(t, i.IsValid()) + assert.Empty(t, i.Namespaces()) + i.Done() + + i, err = vault.InspectRWSet(b, ns) + assert.NoError(t, err) + assert.NoError(t, i.IsValid()) + assert.Equal(t, []string{ns}, i.Namespaces()) + i.Done() +} + +func TTestRun(t *testing.T, db1, db2 driver.VersionedPersistence) { + ns := "namespace" + k1 := "key1" + k1Meta := "key1Meta" + k2 := "key2" + txid := "txid1" + + // create and populate 2 DBs + err := db1.BeginUpdate() + assert.NoError(t, err) + err = db1.SetState(ns, k1, []byte("v1"), 35, 1) + assert.NoError(t, err) + err = db1.SetStateMetadata(ns, k1Meta, map[string][]byte{"metakey": []byte("metavalue")}, 35, 1) + assert.NoError(t, err) + err = db1.Commit() + assert.NoError(t, err) + + err = db2.BeginUpdate() + assert.NoError(t, err) + err = db2.SetState(ns, k1, []byte("v1"), 35, 1) + assert.NoError(t, err) + err = db2.SetStateMetadata(ns, k1Meta, map[string][]byte{"metakey": []byte("metavalue")}, 35, 1) + assert.NoError(t, err) + err = db2.Commit() + assert.NoError(t, err) + + compare(t, ns, db1, db2) + + // create 2 vaults + tidstore1, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(db1), &vcProvider{}) + assert.NoError(t, err) + tidstore2, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(db2), &vcProvider{}) + assert.NoError(t, err) + vault1 := vault.New[vc](db1, tidstore1, &vcProvider{}, newInterceptor) + vault2 := vault.New[vc](db2, tidstore2, &vcProvider{}, newInterceptor) + + rws, err := vault1.NewRWSet(txid) + assert.NoError(t, err) + + rws2, err := vault2.NewRWSet(txid) + assert.NoError(t, err) + rws2.Done() + + // GET K1 + v, err := rws.GetState(ns, k1, driver2.FromIntermediate) + assert.NoError(t, err) + assert.Nil(t, v) + + v, err = rws.GetState(ns, k1 /* , fabric.FromStorage */) + assert.NoError(t, err) + assert.Equal(t, []byte("v1"), v) + + v, err = rws.GetState(ns, k1, driver2.FromBoth) + assert.NoError(t, err) + assert.Equal(t, []byte("v1"), v) + + // GET K1Meta + vMap, err := rws.GetStateMetadata(ns, k1Meta, driver2.FromIntermediate) + assert.NoError(t, err) + assert.Nil(t, vMap) + + vMap, err = rws.GetStateMetadata(ns, k1Meta /* , fabric.FromStorage */) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{"metakey": []byte("metavalue")}, vMap) + + vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromBoth) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{"metakey": []byte("metavalue")}, vMap) + + // SET K1 + err = rws.SetState(ns, k1, []byte("v1_updated")) + assert.NoError(t, err) + + // GET K1 after setting it + v, err = rws.GetState(ns, k1, driver2.FromIntermediate) + assert.NoError(t, err) + assert.Equal(t, []byte("v1_updated"), v) + + v, err = rws.GetState(ns, k1, driver2.FromStorage) + assert.NoError(t, err) + assert.Equal(t, []byte("v1"), v) + + v, err = rws.GetState(ns, k1, driver2.FromBoth) + assert.NoError(t, err) + assert.Equal(t, []byte("v1_updated"), v) + + // SET K1 + err = rws.SetStateMetadata(ns, k1Meta, map[string][]byte{"newmetakey": []byte("newmetavalue")}) + assert.NoError(t, err) + + // GET K1Meta after setting it + vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromIntermediate) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) + + vMap, err = rws.GetStateMetadata(ns, k1Meta /* , fabric.FromStorage */) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{"metakey": []byte("metavalue")}, vMap) + + vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromBoth) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) + + // GET K2 + v, err = rws.GetState(ns, k2, driver2.FromIntermediate) + assert.NoError(t, err) + assert.Nil(t, v) + + v, err = rws.GetState(ns, k2, driver2.FromStorage) + assert.NoError(t, err) + assert.Nil(t, v) + + v, err = rws.GetState(ns, k2, driver2.FromBoth) + assert.NoError(t, err) + assert.Nil(t, v) + + // SET K2 + err = rws.SetState(ns, k2, []byte("v2_updated")) + assert.NoError(t, err) + + // GET K2 after setting it + v, err = rws.GetState(ns, k2, driver2.FromIntermediate) + assert.NoError(t, err) + assert.Equal(t, []byte("v2_updated"), v) + + v, err = rws.GetState(ns, k2, driver2.FromStorage) + assert.NoError(t, err) + assert.Nil(t, v) + + v, err = rws.GetState(ns, k2, driver2.FromBoth) + assert.NoError(t, err) + assert.Equal(t, []byte("v2_updated"), v) + + // we're done with this read-write set, we serialise it + rws.Done() + rwsBytes, err := rws.Bytes() + assert.NoError(t, err) + assert.NotNil(t, rwsBytes) + + assert.NoError(t, vault1.Match(txid, rwsBytes)) + assert.Error(t, vault1.Match(txid, []byte("pineapple"))) + + // we open the read-write set fabric.From the other kvs + rws, err = vault2.GetRWSet(txid, rwsBytes) + assert.NoError(t, err) + + assert.Equal(t, []string{ns}, rws.Namespaces()) + // we check reads positionally + nReads := rws.NumReads(ns) + assert.Equal(t, 3, nReads) + rKey, rKeyVal, err := rws.GetReadAt(ns, 0) + assert.NoError(t, err) + assert.Equal(t, k1, rKey) + assert.Equal(t, []byte("v1"), rKeyVal) + rKey, rKeyVal, err = rws.GetReadAt(ns, 1) + assert.NoError(t, err) + assert.Equal(t, k1Meta, rKey) + assert.Empty(t, rKeyVal) + rKey, rKeyVal, err = rws.GetReadAt(ns, 2) + assert.NoError(t, err) + assert.Equal(t, k2, rKey) + assert.Equal(t, []byte(nil), rKeyVal) + _, _, err = rws.GetReadAt(ns, 3) + assert.EqualError(t, err, "no read at position 3 for namespace namespace") + nReads = rws.NumReads("barf") + assert.Equal(t, 0, nReads) + // we check writes positionally + nWrites := rws.NumWrites(ns) + assert.Equal(t, 2, nWrites) + nWrites = rws.NumWrites("barfobarfs") + assert.Equal(t, 0, nWrites) + wKey, wKeyVal, err := rws.GetWriteAt(ns, 0) + assert.NoError(t, err) + assert.Equal(t, k1, wKey) + assert.Equal(t, []byte("v1_updated"), wKeyVal) + wKey, wKeyVal, err = rws.GetWriteAt(ns, 1) + assert.NoError(t, err) + assert.Equal(t, k2, wKey) + assert.Equal(t, []byte("v2_updated"), wKeyVal) + _, _, err = rws.GetWriteAt(ns, 2) + assert.EqualError(t, err, "no write at position 2 for namespace namespace") + + // GET K1 + v, err = rws.GetState(ns, k1, driver2.FromIntermediate) + assert.NoError(t, err) + assert.Equal(t, []byte("v1_updated"), v) + + v, err = rws.GetState(ns, k1, driver2.FromStorage) + assert.NoError(t, err) + assert.Equal(t, []byte("v1"), v) + + v, err = rws.GetState(ns, k1, driver2.FromBoth) + assert.NoError(t, err) + assert.Equal(t, []byte("v1_updated"), v) + + // GET K2 + v, err = rws.GetState(ns, k2, driver2.FromIntermediate) + assert.NoError(t, err) + assert.Equal(t, []byte("v2_updated"), v) + + v, err = rws.GetState(ns, k2, driver2.FromStorage) + assert.NoError(t, err) + assert.Nil(t, v) + + v, err = rws.GetState(ns, k2, driver2.FromBoth) + assert.NoError(t, err) + assert.Equal(t, []byte("v2_updated"), v) + + // GET K1Meta + vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromIntermediate) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) + + vMap, err = rws.GetStateMetadata(ns, k1Meta /* , fabric.FromStorage */) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{"metakey": []byte("metavalue")}, vMap) + + vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromBoth) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) + + // DELETE K1 + err = rws.DeleteState(ns, k1) + assert.NoError(t, err) + + // GET K1 + v, err = rws.GetState(ns, k1, driver2.FromIntermediate) + assert.NoError(t, err) + assert.Nil(t, v) + + v, err = rws.GetState(ns, k1, driver2.FromStorage) + assert.NoError(t, err) + assert.Equal(t, []byte("v1"), v) + + v, err = rws.GetState(ns, k1, driver2.FromBoth) + assert.NoError(t, err) + assert.Nil(t, v) + + // we're done with this read-write set, we serialise it + rws.Done() + rwsBytes, err = rws.Bytes() + assert.NoError(t, err) + assert.NotNil(t, rwsBytes) + + // we open the read-write set fabric.From the first kvs again + rws, err = vault1.GetRWSet(txid, rwsBytes) + assert.NoError(t, err) + + assert.Equal(t, []string{ns}, rws.Namespaces()) + // we check reads positionally + nReads = rws.NumReads(ns) + assert.Equal(t, 3, nReads) + rKey, rKeyVal, err = rws.GetReadAt(ns, 0) + assert.NoError(t, err) + assert.Equal(t, k1, rKey) + assert.Equal(t, []byte("v1"), rKeyVal) + rKey, rKeyVal, err = rws.GetReadAt(ns, 1) + assert.NoError(t, err) + assert.Equal(t, k1Meta, rKey) + assert.Empty(t, rKeyVal) + rKey, rKeyVal, err = rws.GetReadAt(ns, 2) + assert.NoError(t, err) + assert.Equal(t, k2, rKey) + assert.Equal(t, []byte(nil), rKeyVal) + _, _, err = rws.GetReadAt(ns, 3) + assert.EqualError(t, err, "no read at position 3 for namespace namespace") + nReads = rws.NumReads("barf") + assert.Equal(t, 0, nReads) + // we check writes positionally + nWrites = rws.NumWrites(ns) + assert.Equal(t, 2, nWrites) + nWrites = rws.NumWrites("barfobarfs") + assert.Equal(t, 0, nWrites) + wKey, wKeyVal, err = rws.GetWriteAt(ns, 0) + assert.NoError(t, err) + assert.Equal(t, k1, wKey) + assert.Equal(t, []byte(nil), wKeyVal) + wKey, wKeyVal, err = rws.GetWriteAt(ns, 1) + assert.NoError(t, err) + assert.Equal(t, k2, wKey) + assert.Equal(t, []byte("v2_updated"), wKeyVal) + _, _, err = rws.GetWriteAt(ns, 2) + assert.EqualError(t, err, "no write at position 2 for namespace namespace") + + // GET K2 + v, err = rws.GetState(ns, k2, driver2.FromIntermediate) + assert.NoError(t, err) + assert.Equal(t, []byte("v2_updated"), v) + + v, err = rws.GetState(ns, k2, driver2.FromStorage) + assert.NoError(t, err) + assert.Nil(t, v) + + v, err = rws.GetState(ns, k2, driver2.FromBoth) + assert.NoError(t, err) + assert.Equal(t, []byte("v2_updated"), v) + + // GET K1 + v, err = rws.GetState(ns, k1, driver2.FromIntermediate) + assert.NoError(t, err) + assert.Nil(t, v) + + v, err = rws.GetState(ns, k1, driver2.FromStorage) + assert.NoError(t, err) + assert.Equal(t, []byte("v1"), v) + + v, err = rws.GetState(ns, k1, driver2.FromBoth) + assert.NoError(t, err) + assert.Nil(t, v) + + // GET K1Meta + vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromIntermediate) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) + + vMap, err = rws.GetStateMetadata(ns, k1Meta /* , fabric.FromStorage */) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{"metakey": []byte("metavalue")}, vMap) + + vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromBoth) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) + + // we're done with this read-write set + rws.Done() + + compare(t, ns, db1, db2) + + // we expect a busy txid in the Store + code, _, err := vault1.Status(txid) + assert.NoError(t, err) + assert.Equal(t, busy, code) + code, _, err = vault2.Status(txid) + assert.NoError(t, err) + assert.Equal(t, busy, code) + + compare(t, ns, db1, db2) + + // we commit it in both + err = vault1.CommitTX(txid, 35, 2) + assert.NoError(t, err) + err = vault2.CommitTX(txid, 35, 2) + assert.NoError(t, err) + + // all Interceptors should be gone + assert.Len(t, vault1.Interceptors, 0) + assert.Len(t, vault2.Interceptors, 0) + + compare(t, ns, db1, db2) + // we expect a valid txid in the Store + code, _, err = vault1.Status(txid) + assert.NoError(t, err) + assert.Equal(t, valid, code) + code, _, err = vault2.Status(txid) + assert.NoError(t, err) + assert.Equal(t, valid, code) + + compare(t, ns, db1, db2) + + v1, b1, t1, err := db1.GetState(ns, k1) + assert.NoError(t, err) + v2, b2, t2, err := db2.GetState(ns, k1) + assert.NoError(t, err) + assert.Nil(t, v1) + assert.Equal(t, uint64(0), b1) + assert.Equal(t, uint64(0), t1) + assert.Equal(t, v1, v2) + assert.Equal(t, b1, b2) + assert.Equal(t, t1, t2) + + v1, b1, t1, err = db1.GetState(ns, k2) + assert.NoError(t, err) + v2, b2, t2, err = db2.GetState(ns, k2) + assert.NoError(t, err) + assert.Equal(t, []byte("v2_updated"), v1) + assert.Equal(t, uint64(35), b1) + assert.Equal(t, uint64(2), t1) + assert.Equal(t, v1, v2) + assert.Equal(t, b1, b2) + assert.Equal(t, t1, t2) + + meta1, b1, t1, err := db1.GetStateMetadata(ns, k1Meta) + assert.NoError(t, err) + meta2, b2, t2, err := db2.GetStateMetadata(ns, k1Meta) + assert.NoError(t, err) + assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, meta1) + assert.Equal(t, uint64(35), b1) + assert.Equal(t, uint64(2), t1) + assert.Equal(t, meta1, meta2) + assert.Equal(t, b1, b2) + assert.Equal(t, t1, t2) +} + +func compare(t *testing.T, ns string, db1, db2 driver.VersionedPersistence) { + // we expect the underlying databases to be identical + itr, err := db1.GetStateRangeScanIterator(ns, "", "") + defer itr.Close() + assert.NoError(t, err) + + res1 := make([]driver.VersionedRead, 0, 4) + for n, err := itr.Next(); n != nil; n, err = itr.Next() { + assert.NoError(t, err) + res1 = append(res1, *n) + } + itr, err = db2.GetStateRangeScanIterator(ns, "", "") + defer itr.Close() + assert.NoError(t, err) + + res2 := make([]driver.VersionedRead, 0, 4) + for n, err := itr.Next(); n != nil; n, err = itr.Next() { + assert.NoError(t, err) + res2 = append(res2, *n) + } + + assert.Equal(t, res1, res2) +} diff --git a/platform/common/core/generic/vault/vault_test.go b/platform/common/core/generic/vault/vault_test.go index 4e3c8eb91..499ab89d9 100644 --- a/platform/common/core/generic/vault/vault_test.go +++ b/platform/common/core/generic/vault/vault_test.go @@ -9,1038 +9,196 @@ package vault_test import ( "fmt" "os" + "path" "path/filepath" + "reflect" "testing" - "github.com/golang/protobuf/proto" - "github.com/hyperledger-labs/fabric-smart-client/platform/common/core" - "github.com/hyperledger-labs/fabric-smart-client/platform/common/core/generic/vault" - "github.com/hyperledger-labs/fabric-smart-client/platform/common/core/generic/vault/txidstore" "github.com/hyperledger-labs/fabric-smart-client/platform/common/core/generic/vault/txidstore/mocks" - driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/common/driver" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver" _ "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver/badger" _ "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver/memory" - "github.com/hyperledger/fabric-protos-go/ledger/rwset" - "github.com/hyperledger/fabric-protos-go/ledger/rwset/kvrwset" - "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver/sql" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) //go:generate counterfeiter -o mocks/config.go -fake-name Config . config -type vc int - -const ( - _ vc = iota - valid - invalid - busy - unknown -) - -type vcProvider struct{} - -func (p *vcProvider) ToInt32(code vc) int32 { return int32(code) } -func (p *vcProvider) FromInt32(code int32) vc { - return vc(code) -} -func (p *vcProvider) Unknown() vc { return unknown } -func (p *vcProvider) Busy() vc { return busy } -func (p *vcProvider) Valid() vc { return valid } -func (p *vcProvider) Invalid() vc { return invalid } - -func newInterceptor(qe vault.QueryExecutor, txidStore vault.TXIDStoreReader[vc], txid core.TxID) vault.TxInterceptor { - return vault.NewInterceptor[vc](qe, txidStore, txid, &vcProvider{}) -} - -type config interface { - db.Config -} - var tempDir string -func TestMerge(t *testing.T) { - ns := "namespace" - k1 := "key1" - k2 := "key2" - k3 := "key3" - txid := "txid" - ne1Key := "notexist1" - ne2Key := "notexist2" - - // create DB and kvs - ddb, err := db.OpenVersioned(nil, "memory", "", nil) - assert.NoError(t, err) - tidstore, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(ddb), &vcProvider{}) - assert.NoError(t, err) - vault2 := vault.New[vc](ddb, tidstore, &vcProvider{}, newInterceptor) - err = ddb.BeginUpdate() - assert.NoError(t, err) - err = ddb.SetState(ns, k1, []byte("v1"), 35, 1) - assert.NoError(t, err) - err = ddb.Commit() - assert.NoError(t, err) - - rws, err := vault2.NewRWSet(txid) - assert.NoError(t, err) - v, err := rws.GetState(ns, k1) - assert.NoError(t, err) - assert.Equal(t, []byte("v1"), v) - v, err = rws.GetState(ns, ne1Key) - assert.NoError(t, err) - assert.Equal(t, []byte(nil), v) - err = rws.SetState(ns, k2, []byte("v2")) - assert.NoError(t, err) - err = rws.SetStateMetadata(ns, k3, map[string][]byte{"k3": []byte("v3")}) - assert.NoError(t, err) - - rwsb := rwsetutil.NewRWSetBuilder() - rwsb.AddToReadSet(ns, k1, rwsetutil.NewVersion(&kvrwset.Version{BlockNum: 35, TxNum: 1})) - rwsb.AddToReadSet(ns, ne2Key, nil) - rwsb.AddToWriteSet(ns, k1, []byte("newv1")) - rwsb.AddToMetadataWriteSet(ns, k1, map[string][]byte{"k1": []byte("v1")}) - simRes, err := rwsb.GetTxSimulationResults() - assert.NoError(t, err) - rwsBytes, err := simRes.GetPubSimulationBytes() - assert.NoError(t, err) - - err = rws.AppendRWSet(rwsBytes) - assert.NoError(t, err) - assert.Equal(t, vault.NamespaceKeyedMetaWrites{ - "namespace": { - "key1": {"k1": []byte("v1")}, - "key3": {"k3": []byte("v3")}, - }, - }, rws.RWs().MetaWrites) - assert.Equal(t, vault.Writes{"namespace": { - "key1": []byte("newv1"), - "key2": []byte("v2"), - }}, rws.RWs().Writes) - assert.Equal(t, vault.Reads{ - "namespace": { - "key1": {Block: 35, TxNum: 1}, - "notexist1": {Block: 0, TxNum: 0}, - "notexist2": {Block: 0, TxNum: 0}, - }, - }, rws.RWs().Reads) - - rwsb = rwsetutil.NewRWSetBuilder() - rwsb.AddToReadSet(ns, k1, rwsetutil.NewVersion(&kvrwset.Version{BlockNum: 36, TxNum: 1})) - simRes, err = rwsb.GetTxSimulationResults() - assert.NoError(t, err) - rwsBytes, err = simRes.GetPubSimulationBytes() - assert.NoError(t, err) - - err = rws.AppendRWSet(rwsBytes) - assert.EqualError(t, err, "invalid read [namespace:key1]: previous value returned at version 35:1, current value at version 35:1") - - rwsb = rwsetutil.NewRWSetBuilder() - rwsb.AddToWriteSet(ns, k2, []byte("v2")) - simRes, err = rwsb.GetTxSimulationResults() - assert.NoError(t, err) - rwsBytes, err = simRes.GetPubSimulationBytes() - assert.NoError(t, err) - - err = rws.AppendRWSet(rwsBytes) - assert.EqualError(t, err, "duplicate write entry for key namespace:key2") - - err = rws.AppendRWSet([]byte("barf")) - assert.Contains(t, err.Error(), "cannot parse invalid wire-format data") - - txRWSet := &rwset.TxReadWriteSet{ - NsRwset: []*rwset.NsReadWriteSet{ - {Rwset: []byte("barf")}, - }, +func TestMemory(t *testing.T) { + for _, c := range SingleDBCases { + ddb, terminate, err := openMemory() + assert.NoError(t, err) + t.Run(c.Name, func(xt *testing.T) { + defer ddb.Close() + defer terminate() + c.Fn(xt, ddb) + }) } - rwsBytes, err = proto.Marshal(txRWSet) - assert.NoError(t, err) - - err = rws.AppendRWSet(rwsBytes) - assert.Contains(t, err.Error(), "cannot parse invalid wire-format data") - - rwsb = rwsetutil.NewRWSetBuilder() - rwsb.AddToMetadataWriteSet(ns, k3, map[string][]byte{"k": []byte("v")}) - simRes, err = rwsb.GetTxSimulationResults() - assert.NoError(t, err) - rwsBytes, err = simRes.GetPubSimulationBytes() - assert.NoError(t, err) - - err = rws.AppendRWSet(rwsBytes) - assert.EqualError(t, err, "duplicate metadata write entry for key namespace:key3") -} - -func TestInspector(t *testing.T) { - txid := "txid" - ns := "ns" - k1 := "k1" - k2 := "k2" - - // create DB and kvs - ddb, err := db.OpenVersioned(nil, "memory", "", nil) - assert.NoError(t, err) - tidstore, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(ddb), &vcProvider{}) - assert.NoError(t, err) - vault := vault.New[vc](ddb, tidstore, &vcProvider{}, newInterceptor) - err = ddb.BeginUpdate() - assert.NoError(t, err) - err = ddb.SetState(ns, k1, []byte("v1"), 35, 1) - assert.NoError(t, err) - err = ddb.Commit() - assert.NoError(t, err) - - rws, err := vault.NewRWSet(txid) - assert.NoError(t, err) - v, err := rws.GetState(ns, k1) - assert.NoError(t, err) - assert.Equal(t, []byte("v1"), v) - err = rws.SetState(ns, k2, []byte("v2")) - assert.NoError(t, err) - rws.Done() - - b, err := rws.Bytes() - assert.NoError(t, err) - - i, err := vault.InspectRWSet(b) - assert.NoError(t, err) - assert.NoError(t, i.IsValid()) - - // the ephemeral rwset can "see" its own writes - v, err = i.GetState(ns, k2) - assert.NoError(t, err) - assert.Equal(t, []byte("v2"), v) - - k, v, err := i.GetWriteAt(ns, 0) - assert.NoError(t, err) - assert.Equal(t, k2, k) - assert.Equal(t, []byte("v2"), v) - - k, v, err = i.GetReadAt(ns, 0) - assert.NoError(t, err) - assert.Equal(t, k1, k) - assert.Equal(t, []byte(nil), v) - - assert.Equal(t, 1, i.NumReads(ns)) - assert.Equal(t, 1, i.NumWrites(ns)) - assert.Equal(t, []string{"ns"}, i.Namespaces()) - - i.Done() - - // check filtering - i, err = vault.InspectRWSet(b, "pineapple") - assert.NoError(t, err) - assert.NoError(t, i.IsValid()) - assert.Empty(t, i.Namespaces()) - i.Done() - - i, err = vault.InspectRWSet(b, ns) - assert.NoError(t, err) - assert.NoError(t, i.IsValid()) - assert.Equal(t, []string{ns}, i.Namespaces()) - i.Done() -} - -func testRun(t *testing.T, db1, db2 driver.VersionedPersistence) { - ns := "namespace" - k1 := "key1" - k1Meta := "key1Meta" - k2 := "key2" - txid := "txid1" - - // create and populate 2 DBs - err := db1.BeginUpdate() - assert.NoError(t, err) - err = db1.SetState(ns, k1, []byte("v1"), 35, 1) - assert.NoError(t, err) - err = db1.SetStateMetadata(ns, k1Meta, map[string][]byte{"metakey": []byte("metavalue")}, 35, 1) - assert.NoError(t, err) - err = db1.Commit() - assert.NoError(t, err) - - err = db2.BeginUpdate() - assert.NoError(t, err) - err = db2.SetState(ns, k1, []byte("v1"), 35, 1) - assert.NoError(t, err) - err = db2.SetStateMetadata(ns, k1Meta, map[string][]byte{"metakey": []byte("metavalue")}, 35, 1) - assert.NoError(t, err) - err = db2.Commit() - assert.NoError(t, err) - - compare(t, ns, db1, db2) - - // create 2 vaults - tidstore1, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(db1), &vcProvider{}) - assert.NoError(t, err) - tidstore2, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(db2), &vcProvider{}) - assert.NoError(t, err) - vault1 := vault.New[vc](db1, tidstore1, &vcProvider{}, newInterceptor) - vault2 := vault.New[vc](db2, tidstore2, &vcProvider{}, newInterceptor) - - rws, err := vault1.NewRWSet(txid) - assert.NoError(t, err) - - rws2, err := vault2.NewRWSet(txid) - assert.NoError(t, err) - rws2.Done() - - // GET K1 - v, err := rws.GetState(ns, k1, driver2.FromIntermediate) - assert.NoError(t, err) - assert.Nil(t, v) - - v, err = rws.GetState(ns, k1 /* , fabric.FromStorage */) - assert.NoError(t, err) - assert.Equal(t, []byte("v1"), v) - - v, err = rws.GetState(ns, k1, driver2.FromBoth) - assert.NoError(t, err) - assert.Equal(t, []byte("v1"), v) - - // GET K1Meta - vMap, err := rws.GetStateMetadata(ns, k1Meta, driver2.FromIntermediate) - assert.NoError(t, err) - assert.Nil(t, vMap) - - vMap, err = rws.GetStateMetadata(ns, k1Meta /* , fabric.FromStorage */) - assert.NoError(t, err) - assert.Equal(t, map[string][]byte{"metakey": []byte("metavalue")}, vMap) - - vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromBoth) - assert.NoError(t, err) - assert.Equal(t, map[string][]byte{"metakey": []byte("metavalue")}, vMap) - - // SET K1 - err = rws.SetState(ns, k1, []byte("v1_updated")) - assert.NoError(t, err) - - // GET K1 after setting it - v, err = rws.GetState(ns, k1, driver2.FromIntermediate) - assert.NoError(t, err) - assert.Equal(t, []byte("v1_updated"), v) - - v, err = rws.GetState(ns, k1, driver2.FromStorage) - assert.NoError(t, err) - assert.Equal(t, []byte("v1"), v) - - v, err = rws.GetState(ns, k1, driver2.FromBoth) - assert.NoError(t, err) - assert.Equal(t, []byte("v1_updated"), v) - - // SET K1 - err = rws.SetStateMetadata(ns, k1Meta, map[string][]byte{"newmetakey": []byte("newmetavalue")}) - assert.NoError(t, err) - - // GET K1Meta after setting it - vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromIntermediate) - assert.NoError(t, err) - assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) - - vMap, err = rws.GetStateMetadata(ns, k1Meta /* , fabric.FromStorage */) - assert.NoError(t, err) - assert.Equal(t, map[string][]byte{"metakey": []byte("metavalue")}, vMap) - - vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromBoth) - assert.NoError(t, err) - assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) - - // GET K2 - v, err = rws.GetState(ns, k2, driver2.FromIntermediate) - assert.NoError(t, err) - assert.Nil(t, v) - - v, err = rws.GetState(ns, k2, driver2.FromStorage) - assert.NoError(t, err) - assert.Nil(t, v) - - v, err = rws.GetState(ns, k2, driver2.FromBoth) - assert.NoError(t, err) - assert.Nil(t, v) - - // SET K2 - err = rws.SetState(ns, k2, []byte("v2_updated")) - assert.NoError(t, err) - - // GET K2 after setting it - v, err = rws.GetState(ns, k2, driver2.FromIntermediate) - assert.NoError(t, err) - assert.Equal(t, []byte("v2_updated"), v) - - v, err = rws.GetState(ns, k2, driver2.FromStorage) - assert.NoError(t, err) - assert.Nil(t, v) - - v, err = rws.GetState(ns, k2, driver2.FromBoth) - assert.NoError(t, err) - assert.Equal(t, []byte("v2_updated"), v) - - // we're done with this read-write set, we serialise it - rws.Done() - rwsBytes, err := rws.Bytes() - assert.NoError(t, err) - assert.NotNil(t, rwsBytes) - - assert.NoError(t, vault1.Match(txid, rwsBytes)) - assert.Error(t, vault1.Match(txid, []byte("pineapple"))) - - // we open the read-write set fabric.From the other kvs - rws, err = vault2.GetRWSet(txid, rwsBytes) - assert.NoError(t, err) - - assert.Equal(t, []string{ns}, rws.Namespaces()) - // we check reads positionally - nReads := rws.NumReads(ns) - assert.Equal(t, 3, nReads) - rKey, rKeyVal, err := rws.GetReadAt(ns, 0) - assert.NoError(t, err) - assert.Equal(t, k1, rKey) - assert.Equal(t, []byte("v1"), rKeyVal) - rKey, rKeyVal, err = rws.GetReadAt(ns, 1) - assert.NoError(t, err) - assert.Equal(t, k1Meta, rKey) - assert.Equal(t, []byte(nil), rKeyVal) - rKey, rKeyVal, err = rws.GetReadAt(ns, 2) - assert.NoError(t, err) - assert.Equal(t, k2, rKey) - assert.Equal(t, []byte(nil), rKeyVal) - _, _, err = rws.GetReadAt(ns, 3) - assert.EqualError(t, err, "no read at position 3 for namespace namespace") - nReads = rws.NumReads("barf") - assert.Equal(t, 0, nReads) - // we check writes positionally - nWrites := rws.NumWrites(ns) - assert.Equal(t, 2, nWrites) - nWrites = rws.NumWrites("barfobarfs") - assert.Equal(t, 0, nWrites) - wKey, wKeyVal, err := rws.GetWriteAt(ns, 0) - assert.NoError(t, err) - assert.Equal(t, k1, wKey) - assert.Equal(t, []byte("v1_updated"), wKeyVal) - wKey, wKeyVal, err = rws.GetWriteAt(ns, 1) - assert.NoError(t, err) - assert.Equal(t, k2, wKey) - assert.Equal(t, []byte("v2_updated"), wKeyVal) - _, _, err = rws.GetWriteAt(ns, 2) - assert.EqualError(t, err, "no write at position 2 for namespace namespace") - - // GET K1 - v, err = rws.GetState(ns, k1, driver2.FromIntermediate) - assert.NoError(t, err) - assert.Equal(t, []byte("v1_updated"), v) - - v, err = rws.GetState(ns, k1, driver2.FromStorage) - assert.NoError(t, err) - assert.Equal(t, []byte("v1"), v) - - v, err = rws.GetState(ns, k1, driver2.FromBoth) - assert.NoError(t, err) - assert.Equal(t, []byte("v1_updated"), v) - - // GET K2 - v, err = rws.GetState(ns, k2, driver2.FromIntermediate) - assert.NoError(t, err) - assert.Equal(t, []byte("v2_updated"), v) - - v, err = rws.GetState(ns, k2, driver2.FromStorage) - assert.NoError(t, err) - assert.Nil(t, v) - - v, err = rws.GetState(ns, k2, driver2.FromBoth) - assert.NoError(t, err) - assert.Equal(t, []byte("v2_updated"), v) - - // GET K1Meta - vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromIntermediate) - assert.NoError(t, err) - assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) - - vMap, err = rws.GetStateMetadata(ns, k1Meta /* , fabric.FromStorage */) - assert.NoError(t, err) - assert.Equal(t, map[string][]byte{"metakey": []byte("metavalue")}, vMap) - - vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromBoth) - assert.NoError(t, err) - assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) - - // DELETE K1 - err = rws.DeleteState(ns, k1) - assert.NoError(t, err) - - // GET K1 - v, err = rws.GetState(ns, k1, driver2.FromIntermediate) - assert.NoError(t, err) - assert.Nil(t, v) - - v, err = rws.GetState(ns, k1, driver2.FromStorage) - assert.NoError(t, err) - assert.Equal(t, []byte("v1"), v) - - v, err = rws.GetState(ns, k1, driver2.FromBoth) - assert.NoError(t, err) - assert.Nil(t, v) - - // we're done with this read-write set, we serialise it - rws.Done() - rwsBytes, err = rws.Bytes() - assert.NoError(t, err) - assert.NotNil(t, rwsBytes) - - // we open the read-write set fabric.From the first kvs again - rws, err = vault1.GetRWSet(txid, rwsBytes) - assert.NoError(t, err) - - assert.Equal(t, []string{ns}, rws.Namespaces()) - // we check reads positionally - nReads = rws.NumReads(ns) - assert.Equal(t, 3, nReads) - rKey, rKeyVal, err = rws.GetReadAt(ns, 0) - assert.NoError(t, err) - assert.Equal(t, k1, rKey) - assert.Equal(t, []byte("v1"), rKeyVal) - rKey, rKeyVal, err = rws.GetReadAt(ns, 1) - assert.NoError(t, err) - assert.Equal(t, k1Meta, rKey) - assert.Equal(t, []byte(nil), rKeyVal) - rKey, rKeyVal, err = rws.GetReadAt(ns, 2) - assert.NoError(t, err) - assert.Equal(t, k2, rKey) - assert.Equal(t, []byte(nil), rKeyVal) - _, _, err = rws.GetReadAt(ns, 3) - assert.EqualError(t, err, "no read at position 3 for namespace namespace") - nReads = rws.NumReads("barf") - assert.Equal(t, 0, nReads) - // we check writes positionally - nWrites = rws.NumWrites(ns) - assert.Equal(t, 2, nWrites) - nWrites = rws.NumWrites("barfobarfs") - assert.Equal(t, 0, nWrites) - wKey, wKeyVal, err = rws.GetWriteAt(ns, 0) - assert.NoError(t, err) - assert.Equal(t, k1, wKey) - assert.Equal(t, []byte(nil), wKeyVal) - wKey, wKeyVal, err = rws.GetWriteAt(ns, 1) - assert.NoError(t, err) - assert.Equal(t, k2, wKey) - assert.Equal(t, []byte("v2_updated"), wKeyVal) - _, _, err = rws.GetWriteAt(ns, 2) - assert.EqualError(t, err, "no write at position 2 for namespace namespace") - - // GET K2 - v, err = rws.GetState(ns, k2, driver2.FromIntermediate) - assert.NoError(t, err) - assert.Equal(t, []byte("v2_updated"), v) - - v, err = rws.GetState(ns, k2, driver2.FromStorage) - assert.NoError(t, err) - assert.Nil(t, v) - - v, err = rws.GetState(ns, k2, driver2.FromBoth) - assert.NoError(t, err) - assert.Equal(t, []byte("v2_updated"), v) - - // GET K1 - v, err = rws.GetState(ns, k1, driver2.FromIntermediate) - assert.NoError(t, err) - assert.Nil(t, v) - - v, err = rws.GetState(ns, k1, driver2.FromStorage) - assert.NoError(t, err) - assert.Equal(t, []byte("v1"), v) - - v, err = rws.GetState(ns, k1, driver2.FromBoth) - assert.NoError(t, err) - assert.Nil(t, v) - - // GET K1Meta - vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromIntermediate) - assert.NoError(t, err) - assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) - - vMap, err = rws.GetStateMetadata(ns, k1Meta /* , fabric.FromStorage */) - assert.NoError(t, err) - assert.Equal(t, map[string][]byte{"metakey": []byte("metavalue")}, vMap) - - vMap, err = rws.GetStateMetadata(ns, k1Meta, driver2.FromBoth) - assert.NoError(t, err) - assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, vMap) - - // we're done with this read-write set - rws.Done() - - compare(t, ns, db1, db2) - - // we expect a busy txid in the Store - code, _, err := vault1.Status(txid) - assert.NoError(t, err) - assert.Equal(t, busy, code) - code, _, err = vault2.Status(txid) - assert.NoError(t, err) - assert.Equal(t, busy, code) - compare(t, ns, db1, db2) - - // we commit it in both - err = vault1.CommitTX(txid, 35, 2) - assert.NoError(t, err) - err = vault2.CommitTX(txid, 35, 2) - assert.NoError(t, err) - - // all Interceptors should be gone - assert.Len(t, vault1.Interceptors, 0) - assert.Len(t, vault2.Interceptors, 0) - - compare(t, ns, db1, db2) - // we expect a valid txid in the Store - code, _, err = vault1.Status(txid) - assert.NoError(t, err) - assert.Equal(t, valid, code) - code, _, err = vault2.Status(txid) - assert.NoError(t, err) - assert.Equal(t, valid, code) - - compare(t, ns, db1, db2) - - v1, b1, t1, err := db1.GetState(ns, k1) - assert.NoError(t, err) - v2, b2, t2, err := db2.GetState(ns, k1) - assert.NoError(t, err) - assert.Nil(t, v1) - assert.Equal(t, uint64(0), b1) - assert.Equal(t, uint64(0), t1) - assert.Equal(t, v1, v2) - assert.Equal(t, b1, b2) - assert.Equal(t, t1, t2) - - v1, b1, t1, err = db1.GetState(ns, k2) - assert.NoError(t, err) - v2, b2, t2, err = db2.GetState(ns, k2) - assert.NoError(t, err) - assert.Equal(t, []byte("v2_updated"), v1) - assert.Equal(t, uint64(35), b1) - assert.Equal(t, uint64(2), t1) - assert.Equal(t, v1, v2) - assert.Equal(t, b1, b2) - assert.Equal(t, t1, t2) - - meta1, b1, t1, err := db1.GetStateMetadata(ns, k1Meta) - assert.NoError(t, err) - meta2, b2, t2, err := db2.GetStateMetadata(ns, k1Meta) - assert.NoError(t, err) - assert.Equal(t, map[string][]byte{"newmetakey": []byte("newmetavalue")}, meta1) - assert.Equal(t, uint64(35), b1) - assert.Equal(t, uint64(2), t1) - assert.Equal(t, meta1, meta2) - assert.Equal(t, b1, b2) - assert.Equal(t, t1, t2) -} - -func compare(t *testing.T, ns string, db1, db2 driver.VersionedPersistence) { - // we expect the underlying databases to be identical - itr, err := db1.GetStateRangeScanIterator(ns, "", "") - defer itr.Close() - assert.NoError(t, err) - - res1 := make([]driver.VersionedRead, 0, 4) - for n, err := itr.Next(); n != nil; n, err = itr.Next() { + for _, c := range DoubleDBCases { + db1, terminate1, err := openMemory() assert.NoError(t, err) - res1 = append(res1, *n) + db2, terminate2, err := openMemory() + assert.NoError(t, err) + t.Run(c.Name, func(xt *testing.T) { + defer db1.Close() + defer db2.Close() + defer terminate1() + defer terminate2() + c.Fn(xt, db1, db2) + }) } - itr, err = db2.GetStateRangeScanIterator(ns, "", "") - defer itr.Close() - assert.NoError(t, err) +} - res2 := make([]driver.VersionedRead, 0, 4) - for n, err := itr.Next(); n != nil; n, err = itr.Next() { +func TestBadger(t *testing.T) { + //for _, c := range SingleDBCases { + // ddb, terminate, err := openBadger("DB-TestVaultBadgerDB1") + // assert.NoError(t, err) + // t.Run(c.Name, func(xt *testing.T) { + // defer ddb.Close() + // defer terminate() + // c.Fn(xt, ddb) + // }) + //} + + for _, c := range DoubleDBCases { + db1, terminate1, err := openBadger("DB-TestVaultBadgerDB1") + assert.NoError(t, err) + db2, terminate2, err := openBadger("DB-TestVaultBadgerDB2") assert.NoError(t, err) - res2 = append(res2, *n) + t.Run(c.Name, func(xt *testing.T) { + defer db1.Close() + defer db2.Close() + defer terminate1() + defer terminate2() + c.Fn(xt, db1, db2) + }) } - - assert.Equal(t, res1, res2) } -func TestVaultInMem(t *testing.T) { - db1, err := db.OpenVersioned(nil, "memory", "", nil) - assert.NoError(t, err) - db2, err := db.OpenVersioned(nil, "memory", "", nil) - assert.NoError(t, err) - testRun(t, db1, db2) +func TestSqlite(t *testing.T) { + tempDir = t.TempDir() + + //for _, c := range SingleDBCases { + // ddb, terminate, err := openSqlite("node1") + // assert.NoError(t, err) + // t.Run(c.Name, func(xt *testing.T) { + // defer ddb.Close() + // defer terminate() + // c.Fn(xt, ddb) + // }) + //} + + for _, c := range DoubleDBCases { + db1, terminate1, err := openSqlite("node1") + assert.NoError(t, err) + db2, terminate2, err := openSqlite("node2") + assert.NoError(t, err) + t.Run(c.Name, func(xt *testing.T) { + defer db1.Close() + defer db2.Close() + defer terminate1() + defer terminate2() + c.Fn(xt, db1, db2) + }) + } } -func TestVaultBadger(t *testing.T) { - c := &mocks.Config{} - c.UnmarshalKeyReturns(nil) - c.IsSetReturns(false) - db1, err := db.OpenVersioned(nil, "badger", filepath.Join(tempDir, "DB-TestVaultBadgerDB1"), c) - assert.NoError(t, err) - db2, err := db.OpenVersioned(nil, "badger", filepath.Join(tempDir, "DB-TestVaultBadgerDB2"), c) - assert.NoError(t, err) - defer db1.Close() - defer db2.Close() - - testRun(t, db1, db2) +func TestPostgres(t *testing.T) { + //for _, c := range SingleDBCases { + // ddb, terminate, err := openPostgres("node1") + // assert.NoError(t, err) + // t.Run(c.Name, func(xt *testing.T) { + // defer ddb.Close() + // defer terminate() + // c.Fn(xt, ddb) + // }) + //} + + for _, c := range DoubleDBCases { + db1, terminate1, err := openPostgres("node1") + assert.NoError(t, err) + db2, terminate2, err := openPostgres("node2") + assert.NoError(t, err) + t.Run(c.Name, func(xt *testing.T) { + defer db1.Close() + defer db2.Close() + defer terminate1() + defer terminate2() + c.Fn(xt, db1, db2) + }) + } } -func TestVaultErr(t *testing.T) { - ddb, err := db.OpenVersioned(nil, "memory", "", nil) - assert.NoError(t, err) - tidstore, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(ddb), &vcProvider{}) - assert.NoError(t, err) - vault1 := vault.New[vc](ddb, tidstore, &vcProvider{}, newInterceptor) - err = vault1.CommitTX("non-existent", 0, 0) - assert.ErrorContains(t, err, "read-write set for txid non-existent could not be found") - err = vault1.DiscardTx("non-existent", "") - assert.EqualError(t, err, "read-write set for txid non-existent could not be found") - - ncrwset, err := vault1.NewRWSet("not-closed") - assert.NoError(t, err) - _, err = vault1.NewRWSet("not-closed") - assert.EqualError(t, err, "duplicate read-write set for txid not-closed") - _, err = vault1.GetRWSet("not-closed", []byte(nil)) - assert.EqualError(t, err, "programming error: previous read-write set for not-closed has not been closed") - err = vault1.CommitTX("not-closed", 0, 0) - assert.ErrorContains(t, err, "attempted to retrieve read-write set for not-closed when done has not been called") - err = vault1.DiscardTx("not-closed", "") - assert.EqualError(t, err, "attempted to retrieve read-write set for not-closed when done has not been called") - - // as a sanity-check we close it now and will be able to discard it - ncrwset.Done() - err = vault1.DiscardTx("not-closed", "pineapple") - assert.NoError(t, err) - vc, message, err := vault1.Status("not-closed") - assert.NoError(t, err) - assert.Equal(t, "pineapple", message) - assert.Equal(t, invalid, vc) - - _, err = vault1.GetRWSet("bogus", []byte("barf")) - assert.Contains(t, err.Error(), "cannot parse invalid wire-format data") - - txRWSet := &rwset.TxReadWriteSet{ - NsRwset: []*rwset.NsReadWriteSet{ - {Rwset: []byte("barf")}, - }, +func TestMain(m *testing.M) { + var err error + tempDir, err = os.MkdirTemp("", "vault-test") + if err != nil { + fmt.Fprintf(os.Stderr, "failed to create temporary directory: %v", err) + os.Exit(-1) } - rwsb, err := proto.Marshal(txRWSet) - assert.NoError(t, err) - - _, err = vault1.GetRWSet("bogus", rwsb) - assert.Contains(t, err.Error(), "cannot parse invalid wire-format data") + defer os.RemoveAll(tempDir) - code, _, err := vault1.Status("unknown-txid") - assert.NoError(t, err) - assert.Equal(t, unknown, code) + m.Run() } -func TestInterceptorErr(t *testing.T) { - ddb, err := db.OpenVersioned(nil, "memory", "", nil) - assert.NoError(t, err) - tidstore, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(ddb), &vcProvider{}) - assert.NoError(t, err) - vault1 := vault.New[vc](ddb, tidstore, &vcProvider{}, newInterceptor) - rws, err := vault1.NewRWSet("txid") - assert.NoError(t, err) +// Open DB utils - _, err = rws.GetState("foo", "bar", 15) - assert.EqualError(t, err, "invalid get option [15]") - _, err = rws.GetState("foo", "bar", 15, 16) - assert.EqualError(t, err, "a single getoption is supported, 2 provided") - - _, err = rws.GetStateMetadata("foo", "bar", 15) - assert.EqualError(t, err, "invalid get option [15]") - _, err = rws.GetStateMetadata("foo", "bar", 15, 16) - assert.EqualError(t, err, "a single getoption is supported, 2 provided") - - rws.Done() - - _, err = rws.GetStateMetadata("foo", "bar") - assert.EqualError(t, err, "this instance was closed") - _, err = rws.GetState("foo", "bar") - assert.EqualError(t, err, "this instance was closed") - err = rws.SetState("foo", "bar", []byte("whocares")) - assert.EqualError(t, err, "this instance was closed") - err = rws.SetStateMetadata("foo", "bar", nil) - assert.EqualError(t, err, "this instance was closed") - err = rws.DeleteState("foo", "bar") - assert.EqualError(t, err, "this instance was closed") - _, _, err = rws.GetReadAt("foo", 12312) - assert.EqualError(t, err, "this instance was closed") - _, _, err = rws.GetWriteAt("foo", 12312) - assert.EqualError(t, err, "this instance was closed") - err = rws.AppendRWSet([]byte("foo")) - assert.EqualError(t, err, "this instance was closed") - - rws, err = vault1.NewRWSet("validtxid") - assert.NoError(t, err) - rws.Done() - err = vault1.CommitTX("validtxid", 2, 3) - assert.NoError(t, err) - rws, err = vault1.NewRWSet("validtxid") - assert.NoError(t, err) - err = rws.IsValid() - assert.EqualError(t, err, "duplicate txid validtxid") +func openMemory() (driver.VersionedPersistence, func(), error) { + persistence, err := db.OpenVersioned(nil, "memory", "", nil) + return persistence, func() {}, err } -func TestInterceptorConcurrency(t *testing.T) { - ns := "namespace" - k := "key1" - mk := "meyakey1" - - ddb, err := db.OpenVersioned(nil, "memory", "", nil) - assert.NoError(t, err) - tidstore, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(ddb), &vcProvider{}) - assert.NoError(t, err) - vault1 := vault.New[vc](ddb, tidstore, &vcProvider{}, newInterceptor) - rws, err := vault1.NewRWSet("txid") - assert.NoError(t, err) - - v, err := rws.GetState(ns, k) - assert.NoError(t, err) - assert.Nil(t, v) - - err = ddb.BeginUpdate() - assert.NoError(t, err) - err = ddb.SetState(ns, k, []byte("val"), 35, 1) - assert.NoError(t, err) - err = ddb.Commit() - assert.NoError(t, err) - - _, _, err = rws.GetReadAt(ns, 0) - assert.EqualError(t, err, "invalid read [namespace:key1]: previous value returned at version 0:0, current value at version 35:1") - - _, err = rws.GetState(ns, k) - assert.EqualError(t, err, "invalid read [namespace:key1]: previous value returned at version 0:0, current value at version 35:1") - - mv, err := rws.GetStateMetadata(ns, mk) - assert.NoError(t, err) - assert.Nil(t, mv) - - err = ddb.BeginUpdate() - assert.NoError(t, err) - err = ddb.SetStateMetadata(ns, mk, map[string][]byte{"k": []byte("v")}, 36, 2) - assert.NoError(t, err) - err = ddb.Commit() - assert.NoError(t, err) - - _, err = rws.GetStateMetadata(ns, mk) - assert.EqualError(t, err, "invalid metadata read: previous value returned at version 0:0, current value at version 36:2") +func openBadger(dir string) (driver.VersionedPersistence, func(), error) { + c := &mocks.Config{} + c.UnmarshalKeyReturns(nil) + c.IsSetReturns(false) + persistence, err := db.OpenVersioned(nil, "badger", filepath.Join(tempDir, dir), c) + return persistence, func() {}, err } -func TestQueryExecutor(t *testing.T) { - ns := "namespace" - - ddb, err := db.OpenVersioned(nil, "memory", "", nil) - assert.NoError(t, err) - tidstore, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(ddb), &vcProvider{}) - assert.NoError(t, err) - vault := vault.New[vc](ddb, tidstore, &vcProvider{}, newInterceptor) - - err = ddb.BeginUpdate() - assert.NoError(t, err) - err = ddb.SetState(ns, "k2", []byte("k2_value"), 35, 1) - assert.NoError(t, err) - err = ddb.SetState(ns, "k3", []byte("k3_value"), 35, 2) - assert.NoError(t, err) - err = ddb.SetState(ns, "k1", []byte("k1_value"), 35, 3) - assert.NoError(t, err) - err = ddb.SetState(ns, "k111", []byte("k111_value"), 35, 4) - assert.NoError(t, err) - err = ddb.Commit() - assert.NoError(t, err) - - qe, err := vault.NewQueryExecutor() - assert.NoError(t, err) - defer qe.Done() - - v, err := qe.GetState(ns, "k1") - assert.NoError(t, err) - assert.Equal(t, []byte("k1_value"), v) - v, err = qe.GetState(ns, "barfobarfs") - assert.NoError(t, err) - assert.Equal(t, []byte(nil), v) +type dbConfig sql.Opts - itr, err := qe.GetStateRangeScanIterator(ns, "", "") - defer itr.Close() - assert.NoError(t, err) - - res := make([]driver.VersionedRead, 0, 4) - for n, err := itr.Next(); n != nil; n, err = itr.Next() { - assert.NoError(t, err) - res = append(res, *n) +func (c *dbConfig) IsSet(string) bool { panic("not supported") } +func (c *dbConfig) UnmarshalKey(key string, rawVal interface{}) error { + if len(key) > 0 { + return errors.New("invalid key") } - assert.Len(t, res, 4) - assert.Equal(t, []driver.VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, IndexInBlock: 3}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, IndexInBlock: 4}, - {Key: "k2", Raw: []byte("k2_value"), Block: 35, IndexInBlock: 1}, - {Key: "k3", Raw: []byte("k3_value"), Block: 35, IndexInBlock: 2}, - }, res) - - itr, err = ddb.GetStateRangeScanIterator(ns, "k1", "k3") - defer itr.Close() - assert.NoError(t, err) - - res = make([]driver.VersionedRead, 0, 3) - for n, err := itr.Next(); n != nil; n, err = itr.Next() { - assert.NoError(t, err) - res = append(res, *n) + fmt.Printf("here opts: %v", reflect.TypeOf(rawVal)) + if val, ok := rawVal.(*sql.Opts); ok { + *val = sql.Opts(*c) + return nil } - assert.Len(t, res, 3) - assert.Equal(t, []driver.VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, IndexInBlock: 3}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, IndexInBlock: 4}, - {Key: "k2", Raw: []byte("k2_value"), Block: 35, IndexInBlock: 1}, - }, res) - - itr, err = ddb.GetStateSetIterator(ns, "k1", "k2", "k111") - defer itr.Close() - assert.NoError(t, err) - - res = make([]driver.VersionedRead, 0, 3) - for n, err := itr.Next(); n != nil; n, err = itr.Next() { - assert.NoError(t, err) - res = append(res, *n) - } - assert.Len(t, res, 3) - assert.Equal(t, []driver.VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, IndexInBlock: 3}, - {Key: "k2", Raw: []byte("k2_value"), Block: 35, IndexInBlock: 1}, - {Key: "k111", Raw: []byte("k111_value"), Block: 35, IndexInBlock: 4}, - }, res) - - itr, err = ddb.GetStateSetIterator(ns, "k1", "k5") - defer itr.Close() - assert.NoError(t, err) - - res = make([]driver.VersionedRead, 0, 2) - for n, err := itr.Next(); n != nil; n, err = itr.Next() { - assert.NoError(t, err) - res = append(res, *n) - } - assert.Len(t, res, 2) - assert.Equal(t, []driver.VersionedRead{ - {Key: "k1", Raw: []byte("k1_value"), Block: 35, IndexInBlock: 3}, - {Key: "k5"}, - }, res) + return errors.New("invalid pointer type") } -func TestShardLikeCommit(t *testing.T) { - ns := "namespace" - k1 := "key1" - k2 := "key2" - - // Populate the DB with some data at some height - ddb, err := db.OpenVersioned(nil, "memory", "", nil) - assert.NoError(t, err) - err = ddb.BeginUpdate() - assert.NoError(t, err) - err = ddb.SetState(ns, k1, []byte("k1val"), 35, 1) - assert.NoError(t, err) - err = ddb.SetState(ns, k2, []byte("k2val"), 37, 3) - assert.NoError(t, err) - err = ddb.Commit() - assert.NoError(t, err) - - tidstore, err := txidstore.NewSimpleTXIDStore[vc](db.Unversioned(ddb), &vcProvider{}) - assert.NoError(t, err) - vault := vault.New[vc](ddb, tidstore, &vcProvider{}, newInterceptor) - - // SCENARIO 1: there is a read conflict in the proposed rwset - // create the read-write set - rwsb := rwsetutil.NewRWSetBuilder() - rwsb.AddToReadSet(ns, k1, rwsetutil.NewVersion(&kvrwset.Version{BlockNum: 35, TxNum: 1})) - rwsb.AddToReadSet(ns, k2, rwsetutil.NewVersion(&kvrwset.Version{BlockNum: 37, TxNum: 2})) - rwsb.AddToWriteSet(ns, k1, []byte("k1FromTxidInvalid")) - rwsb.AddToWriteSet(ns, k2, []byte("k2FromTxidInvalid")) - simRes, err := rwsb.GetTxSimulationResults() - assert.NoError(t, err) - rwsBytes, err := simRes.GetPubSimulationBytes() - assert.NoError(t, err) - - // give it to the kvs and check whether it's valid - it won't be - rwset, err := vault.GetRWSet("txid-invalid", rwsBytes) - assert.NoError(t, err) - err = rwset.IsValid() - assert.EqualError(t, err, "invalid read: vault at version namespace:key2 37:3, read-write set at version 37:2") - - // close the read-write set, even in case of error - rwset.Done() - - // check the status, it should be busy - code, _, err := vault.Status("txid-invalid") - assert.NoError(t, err) - assert.Equal(t, busy, code) - - // now in case of error we won't commit the read-write set, so we should discard it - err = vault.DiscardTx("txid-invalid", "") - assert.NoError(t, err) - - // check the status, it should be invalid - code, _, err = vault.Status("txid-invalid") - assert.NoError(t, err) - assert.Equal(t, invalid, code) - - // SCENARIO 2: there is no read conflict - // create the read-write set - rwsb = rwsetutil.NewRWSetBuilder() - rwsb.AddToReadSet(ns, k1, rwsetutil.NewVersion(&kvrwset.Version{BlockNum: 35, TxNum: 1})) - rwsb.AddToReadSet(ns, k2, rwsetutil.NewVersion(&kvrwset.Version{BlockNum: 37, TxNum: 3})) - rwsb.AddToWriteSet(ns, k1, []byte("k1FromTxidValid")) - rwsb.AddToWriteSet(ns, k2, []byte("k2FromTxidValid")) - - simRes, err = rwsb.GetTxSimulationResults() - assert.NoError(t, err) - rwsBytes, err = simRes.GetPubSimulationBytes() - assert.NoError(t, err) - - // give it to the kvs and check whether it's valid - it will be - rwset, err = vault.GetRWSet("txid-valid", rwsBytes) - assert.NoError(t, err) - err = rwset.IsValid() - assert.NoError(t, err) - - // close the read-write set - rwset.Done() - - // presumably the cross-shard protocol continues... - - // check the status, it should be busy - code, _, err = vault.Status("txid-valid") - assert.NoError(t, err) - assert.Equal(t, busy, code) - - // we're now asked to really commit - err = vault.CommitTX("txid-valid", 38, 10) - assert.NoError(t, err) - - // check the status, it should be valid - code, _, err = vault.Status("txid-valid") - assert.NoError(t, err) - assert.Equal(t, valid, code) - - // check the content of the kvs after that - v, b, tx, err := ddb.GetState(ns, k1) - assert.NoError(t, err) - assert.Equal(t, []byte("k1FromTxidValid"), v) - assert.Equal(t, uint64(38), b) - assert.Equal(t, uint64(10), tx) - - v, b, tx, err = ddb.GetState(ns, k2) - assert.NoError(t, err) - assert.Equal(t, []byte("k2FromTxidValid"), v) - assert.Equal(t, uint64(38), b) - assert.Equal(t, uint64(10), tx) - - // all Interceptors should be gone - assert.Len(t, vault.Interceptors, 0) +func openSqlite(key string) (driver.VersionedPersistence, func(), error) { + conf := &dbConfig{ + Driver: "sqlite", + DataSource: fmt.Sprintf("%s.sqlite", path.Join(tempDir, key)), + MaxOpenConns: 0, + SkipPragmas: false, + } + persistence, err := (&sql.Driver{}).NewVersioned(nil, "test_table", conf) + return persistence, func() {}, err } -func TestMain(m *testing.M) { - var err error - tempDir, err = os.MkdirTemp("", "vault-test") +func openPostgres(name string) (driver.VersionedPersistence, func(), error) { + postgresConfig := sql.DefaultConfig(fmt.Sprintf("%s-db", name)) + conf := &dbConfig{ + Driver: "postgres", + DataSource: postgresConfig.DataSource(), + MaxOpenConns: 50, + SkipPragmas: false, + } + terminate, err := sql.StartPostgresWithFmt(map[string]*sql.PostgresConfig{name: postgresConfig}) if err != nil { - fmt.Fprintf(os.Stderr, "failed to create temporary directory: %v", err) - os.Exit(-1) + return nil, func() {}, err } - defer os.RemoveAll(tempDir) - - m.Run() + persistence, err := (&sql.Driver{}).NewVersioned(nil, "test_table", conf) + return persistence, terminate, err } diff --git a/platform/view/services/db/dbtest/helpers.go b/platform/view/services/db/dbtest/helpers.go index 6c1b853ab..9f720687c 100644 --- a/platform/view/services/db/dbtest/helpers.go +++ b/platform/view/services/db/dbtest/helpers.go @@ -149,6 +149,22 @@ func TTestRangeQueries(t *testing.T, db driver.TransactionalVersionedPersistence } assert.Len(t, res, 3) assert.Equal(t, expected, res) + + expected = []driver.VersionedRead{ + {Key: "k1", Raw: []byte("k1_value"), Block: 35, IndexInBlock: 3}, + {Key: "k3", Raw: []byte("k3_value"), Block: 35, IndexInBlock: 2}, + } + itr, err = db.GetStateSetIterator(ns, "k1", "k3") + assert.NoError(t, err) + defer itr.Close() + + res = make([]driver.VersionedRead, 0, 2) + for n, err := itr.Next(); n != nil; n, err = itr.Next() { + assert.NoError(t, err) + res = append(res, *n) + } + assert.Len(t, res, 2) + assert.Equal(t, expected, res) } func TTestMeta(t *testing.T, db driver.TransactionalVersionedPersistence) { @@ -813,6 +829,20 @@ func TTestUnversionedRange(t *testing.T, db driver.Persistence) { {Key: "k111", Raw: []byte("k111_value")}, {Key: "k2", Raw: []byte("k2_value")}, }, res) + + itr, err = db.GetStateSetIterator(ns, "k1", "k2") + assert.NoError(t, err) + defer itr.Close() + res = make([]driver.Read, 0, 2) + for n, err := itr.Next(); n != nil; n, err = itr.Next() { + assert.NoError(t, err) + res = append(res, *n) + } + assert.Len(t, res, 2) + assert.Equal(t, []driver.Read{ + {Key: "k1", Raw: []byte("k1_value")}, + {Key: "k2", Raw: []byte("k2_value")}, + }, res) } func TTestUnversionedSimple(t *testing.T, db driver.Persistence) { diff --git a/platform/view/services/db/driver/sql/unversioned.go b/platform/view/services/db/driver/sql/unversioned.go index 01fa379c7..351dfb7e1 100644 --- a/platform/view/services/db/driver/sql/unversioned.go +++ b/platform/view/services/db/driver/sql/unversioned.go @@ -91,7 +91,7 @@ func (db *Unversioned) GetStateSetIterator(ns string, keys ...string) (driver.Re if len(keys) == 0 { return &EmptyIterator{}, nil } - query := fmt.Sprintf("SELECT pkey, val FROM %s WHERE ns = ? AND pkey = ANY(%s);", db.table, "?"+strings.Repeat(",?", len(keys)-1)) + query := fmt.Sprintf("SELECT pkey, val FROM %s WHERE ns = $1 AND pkey IN %s", db.table, generateParamSet(2, len(keys))) logger.Debug(query, ns, keys) rows, err := db.readDB.Query(query, append([]any{ns}, castAny(keys)...)...) @@ -104,6 +104,14 @@ func (db *Unversioned) GetStateSetIterator(ns string, keys ...string) (driver.Re }, nil } +func generateParamSet(offset, count int) string { + params := make([]string, count) + for i := 0; i < count; i++ { + params[i] = fmt.Sprintf("$%d", i+offset) + } + return fmt.Sprintf("(%s)", strings.Join(params, ", ")) +} + func castAny[A any](as []A) []any { if as == nil { return nil diff --git a/platform/view/services/db/driver/sql/versioned.go b/platform/view/services/db/driver/sql/versioned.go index afb36f7b7..06e0a200a 100644 --- a/platform/view/services/db/driver/sql/versioned.go +++ b/platform/view/services/db/driver/sql/versioned.go @@ -12,7 +12,6 @@ import ( "encoding/gob" "errors" "fmt" - "strings" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver" errors2 "github.com/pkg/errors" @@ -187,7 +186,7 @@ func (db *Persistence) GetStateSetIterator(ns string, keys ...string) (driver.Ve if len(keys) == 0 { return &EmptyVersionedIterator{}, nil } - query := fmt.Sprintf("SELECT pkey, block, txnum, val FROM %s WHERE ns = ? AND pkey = ANY(%s);", db.table, "?"+strings.Repeat(",?", len(keys)-1)) + query := fmt.Sprintf("SELECT pkey, block, txnum, val FROM %s WHERE ns = $1 AND pkey IN %s", db.table, generateParamSet(2, len(keys))) logger.Debug(query, ns, keys) rows, err := db.readDB.Query(query, append([]any{ns}, castAny(keys)...)...)