Skip to content

dispose context and close sessions #635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Jul 22, 2024
Merged
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1bded0c
dispose context and close sessions
adecaro Jul 18, 2024
6fd6227
fixup! dispose context and close sessions
adecaro Jul 18, 2024
ecc69d1
fix stream hash
adecaro Jul 18, 2024
f917e1d
fixup! fix stream hash
adecaro Jul 18, 2024
277fb8e
fixup! fix stream hash
adecaro Jul 18, 2024
418accb
workaround: wait before deleting the context used for the response
adecaro Jul 18, 2024
03150db
better error handling for sendWithCaschedStreams
adecaro Jul 18, 2024
88e9a8d
fixup! better error handling for sendWithCaschedStreams
adecaro Jul 18, 2024
d0c33f3
fixup! better error handling for sendWithCaschedStreams
adecaro Jul 18, 2024
34ccd73
fixup! better error handling for sendWithCaschedStreams
adecaro Jul 18, 2024
cb6033a
fixup! better error handling for sendWithCaschedStreams
adecaro Jul 18, 2024
c216564
dispose context of the initiator
adecaro Jul 18, 2024
036d1d4
better logging
adecaro Jul 18, 2024
251d66b
fixup! better logging
adecaro Jul 18, 2024
8a5ceb0
fixup! better logging
adecaro Jul 18, 2024
a73d1d4
remove address re-programming
adecaro Jul 18, 2024
afa2ab0
lock fix
adecaro Jul 19, 2024
c044fab
visualize dig dep in debug mode only
adecaro Jul 19, 2024
0d1bfad
Infof to Debugf
adecaro Jul 19, 2024
d2027e3
print caller
adecaro Jul 20, 2024
29a9e96
libp2p more options
adecaro Jul 21, 2024
3658afc
libp2p: streams are identified by remote peer id only and not closed
adecaro Jul 22, 2024
99a7843
fixup! libp2p: streams are identified by remote peer id only and not …
adecaro Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration/nwo/common/context/context.go
Original file line number Diff line number Diff line change
@@ -188,7 +188,7 @@ func (c *Context) AddIdentityAlias(id string, alias string) {
func (c *Context) PlatformByName(name string) api.Platform {
p, ok := c.PlatformsByName[name]
if !ok {
logger.Errorf("cannot find platform with name [%s], platforms available [%v]", c.PlatformsByName)
logger.Errorf("cannot find platform with name [%s], platforms available [%v]", name, c.PlatformsByName)
}
return p
}
9 changes: 9 additions & 0 deletions pkg/utils/errors/errors.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,11 @@ func HasCause(source, target error) bool {
return source != nil && target != nil && errors.Is(source, target)
}

// Is recursively checks errors wrapped using Wrapf until it detects the target error
func Is(source, target error) bool {
return source != nil && target != nil && errors.Is(source, target)
}

// Wrapf wraps an error in a way compatible with HasCause
func Wrapf(err error, format string, args ...any) error {
return errors.Wrapf(err, format, args...)
@@ -31,3 +36,7 @@ func Wrap(err error, message string) error {
func Errorf(format string, args ...any) error {
return errors.Errorf(format, args...)
}

func New(msg string) error {
return errors.New(msg)
}
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package state
package utils

import (
"crypto/rand"
6 changes: 4 additions & 2 deletions platform/fabric/services/state/namespace.go
Original file line number Diff line number Diff line change
@@ -11,6 +11,8 @@ import (
"encoding/base64"
"encoding/json"

"github.com/hyperledger-labs/fabric-smart-client/pkg/utils"

"github.com/hyperledger-labs/fabric-smart-client/platform/fabric"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/services/endorser"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
@@ -472,14 +474,14 @@ func (n *Namespace) getStateID(s interface{}) (string, error) {
}
case LinearState:
logger.Debugf("LinearState...")
key = GenerateUUID()
key = utils.GenerateUUID()
key = d.SetLinearID(key)
case EmbeddingState:
logger.Debugf("EmbeddingState...")
return n.getStateID(d.GetState())
default:
logger.Debugf("default...")
key = base64.StdEncoding.EncodeToString(GenerateBytesUUID())
key = base64.StdEncoding.EncodeToString(utils.GenerateBytesUUID())
}
return key, nil
}
2 changes: 1 addition & 1 deletion platform/orion/sdk/dig/sdk.go
Original file line number Diff line number Diff line change
@@ -92,7 +92,7 @@ func (p *SDK) Install() error {
}

func (p *SDK) Start(ctx context.Context) error {
defer logger.Infof("SDK installation complete:\n%s", digutils.Visualize(p.Container()))
defer logger.Debugf("SDK installation complete:\n%s", digutils.Visualize(p.Container()))
if err := p.SDK.Start(ctx); err != nil {
return err
}
24 changes: 19 additions & 5 deletions platform/view/core/manager/manager.go
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ import (
"reflect"
"runtime/debug"
"sync"
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/view/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
@@ -215,6 +216,7 @@ func (cm *manager) InitiateViewWithIdentity(view view.View, id view.Identity, c
cm.contexts[childContext.ID()] = childContext
cm.m.Contexts.Set(float64(len(cm.contexts)))
cm.contextsSync.Unlock()
defer cm.deleteContext(id, childContext.ID())

if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("[%s] InitiateView [view:%s], [ContextID:%s]", id, getIdentifier(view), childContext.ID())
@@ -351,24 +353,35 @@ func (cm *manager) respond(responder view.View, id view.Identity, msg *view.Mess
}
}()

if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("[%s] Respond [from:%s], [sessionID:%s], [contextID:%s], [view:%s]", id, msg.FromEndpoint, msg.SessionID, msg.ContextID, getIdentifier(responder))
}

// get context
var isNew bool
ctx, isNew, err = cm.newContext(id, msg)
if err != nil {
return nil, nil, errors.WithMessagef(err, "failed getting context for [%s,%s,%v]", msg.ContextID, id, msg)
}

// todo: if a new contxt has been created to run the responder,
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf(
"[%s] Respond [from:%s], [sessionID:%s], [contextID:%s](%v), [view:%s]",
id,
msg.FromEndpoint,
msg.SessionID,
msg.ContextID,
isNew,
getIdentifier(responder),
)
}

// todo: if a new context has been created to run the responder,
// then dispose the context when the responder terminates
// run view
if isNew {
// delete context at the end of the execution
res, err = func(ctx view.Context, responder view.View) (interface{}, error) {
defer func() {
// TODO: this is a workaround
// give some time to flush anything can be in queues
time.Sleep(5 * time.Second)
cm.deleteContext(id, ctx.ID())
}()
return ctx.RunView(responder)
@@ -406,6 +419,7 @@ func (cm *manager) newContext(id view.Identity, msg *view.Message) (view.Context
viewContext.Session().Info().ID,
)
}
viewContext.Dispose()
delete(cm.contexts, contextID)
cm.m.Contexts.Set(float64(len(cm.contexts)))
ok = false
6 changes: 3 additions & 3 deletions platform/view/services/comm/comm.go
Original file line number Diff line number Diff line change
@@ -57,7 +57,7 @@ func NewService(hostProvider host.GeneratorProvider, endpointService EndpointSer
func (s *Service) Start(ctx context.Context) {
go func() {
for {
logger.Infof("start communication service...")
logger.Debugf("start communication service...")
if err := s.init(); err != nil {
logger.Errorf("failed to initialize communication service [%s], wait a bit and try again", err)
time.Sleep(10 * time.Second)
@@ -131,7 +131,7 @@ func (s *Service) init() error {
certFile := s.ConfigService.GetPath("fsc.identity.cert.file")
if len(p2pBootstrapNode) == 0 {
// this is a bootstrap node
logger.Infof("new p2p bootstrap node [%s]", p2pListenAddress)
logger.Debugf("new p2p bootstrap node [%s]", p2pListenAddress)

h, err := s.HostProvider.NewBootstrapHost(p2pListenAddress, keyFile, certFile)
if err != nil {
@@ -156,7 +156,7 @@ func (s *Service) init() error {
return errors.WithMessagef(err, "failed to get the endpoint of the bootstrap node from [%s:%s], [%s]", p2pBootstrapNode, bootstrapNodeID, endpoints[view.P2PPort])
}
addr = addr + "/p2p/" + string(pkID)
logger.Infof("new p2p node [%s,%s]", p2pListenAddress, addr)
logger.Debugf("new p2p node [%s,%s]", p2pListenAddress, addr)
h, err := s.HostProvider.NewHost(p2pListenAddress, keyFile, certFile, addr)
if err != nil {
return err
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ type metricsReporter struct {
}

func newReporter(m *metrics) *metricsReporter {
logger.Infof("Initialized bandwidth reporter.\n")
logger.Debugf("Initialized bandwidth reporter.\n")
return &metricsReporter{m}
}

53 changes: 41 additions & 12 deletions platform/view/services/comm/host/libp2p/host.go
Original file line number Diff line number Diff line change
@@ -13,23 +13,28 @@ import (
"sync/atomic"
"time"

host2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/utils"
"github.com/libp2p/go-libp2p/core/peerstore"
"go.uber.org/zap/zapcore"

utils2 "github.com/hyperledger-labs/fabric-smart-client/pkg/utils"
host2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/crypto"
host3 "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/security/noise"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"go.uber.org/zap/zapcore"
)

var logger = flogging.MustGetLogger("libp2p-host")
var logger = flogging.MustGetLogger("view-sdk.services.comm.libp2p-host")

const (
viewProtocol = "/fsc/view/1.0.0"
@@ -93,7 +98,6 @@ func (h *host) Start(newStreamCallback func(stream host2.P2PStream)) error {
if err := h.start(false, newStreamCallback); err != nil {
return err
}

return nil
}

@@ -103,10 +107,22 @@ func newLibP2PHost(listenAddress host2.PeerIPAddress, priv crypto.PrivKey, metri
return nil, err
}

connManager, err := connmgr.NewConnManager(100, 400, connmgr.WithGracePeriod(time.Minute))
if err != nil {
return nil, errors.Wrap(err, "failed creating conn manager for libp2p host")
}
opts := []libp2p.Option{
libp2p.ListenAddrs(addr),
libp2p.Identity(priv),
libp2p.ForceReachabilityPublic(),
// support TLS connections
libp2p.Security(libp2ptls.ID, libp2ptls.New),
// support noise connections
libp2p.Security(noise.ID, noise.New),
// support any other default transports (TCP)
libp2p.DefaultTransports,
// Let's prevent our peer from having too many
// connections by attaching a connection manager.
libp2p.ConnectionManager(connManager), libp2p.ForceReachabilityPublic(),
libp2p.BandwidthReporter(newReporter(metrics)),
}

@@ -137,11 +153,10 @@ func newLibP2PHost(listenAddress host2.PeerIPAddress, priv crypto.PrivKey, metri
}

func (h *host) StreamHash(input host2.StreamInfo) host2.StreamHash {
return streamHash(input.RemotePeerID)
return streamHash(input)
}

func (h *host) Close() error {

err := h.Host.Close()
atomic.StoreInt32(&h.stopFinder, 1)
return err
@@ -153,7 +168,7 @@ func (h *host) NewStream(ctx context.Context, info host2.StreamInfo) (host2.P2PS
return nil, err
}

if len(info.RemotePeerAddress) != 0 && strings.HasPrefix(info.RemotePeerAddress, "/ip4/") {
if len(info.RemotePeerAddress) != 0 && !strings.HasPrefix(info.RemotePeerAddress, "/ip4/") {
// reprogram the addresses of the peer before opening a new stream, if it is not in the right form yet
ps := h.Host.Peerstore()
current := ps.Addrs(ID)
@@ -181,8 +196,11 @@ func (h *host) NewStream(ctx context.Context, info host2.StreamInfo) (host2.P2PS
if err != nil {
return nil, errors.Wrapf(err, "failed to create new stream to [%s]", ID)
}

return &stream{Stream: nwStream}, nil
info.RemotePeerID = nwStream.Conn().RemotePeer().String()
return &stream{
Stream: nwStream,
info: info,
}, nil
}

func (h *host) startFinder() {
@@ -227,7 +245,18 @@ func (h *host) start(failAdv bool, newStreamCallback func(stream host2.P2PStream
}

h.Host.SetStreamHandler(viewProtocol, func(s network.Stream) {
newStreamCallback(&stream{Stream: s})
uuid := utils2.GenerateUUID()
newStreamCallback(
&stream{
Stream: s,
info: host2.StreamInfo{
RemotePeerID: s.Conn().RemotePeer().String(),
RemotePeerAddress: s.Conn().RemoteMultiaddr().String(),
ContextID: uuid,
SessionID: uuid,
},
},
)
})

h.finderWg.Add(1)
14 changes: 11 additions & 3 deletions platform/view/services/comm/host/libp2p/stream.go
Original file line number Diff line number Diff line change
@@ -15,21 +15,29 @@ import (

type stream struct {
network.Stream
info host2.StreamInfo
}

func (s *stream) RemotePeerID() host2.PeerID {
return s.Conn().RemotePeer().String()
}

func (s *stream) RemotePeerAddress() host2.PeerIPAddress {
return s.Conn().RemoteMultiaddr().String()
}

func (s *stream) Hash() host2.StreamHash {
return streamHash(s.RemotePeerID())
return streamHash(s.info)
}

func (s *stream) Context() context.Context { return context.TODO() }

func streamHash(peerID host2.PeerID) host2.StreamHash {
return peerID
func (s *stream) Close() error {
// We don't close the stream here to recycle it later
return nil
}

func streamHash(info host2.StreamInfo) host2.StreamHash {
// This allows us to recycle the streams towards the same peer
return info.RemotePeerID
}
4 changes: 2 additions & 2 deletions platform/view/services/comm/host/rest/client.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ type client struct {
}

func newClient(streamProvider clientStreamProvider, nodeID host2.PeerID, rootCAs []string, tlsEnabled bool) (*client, error) {
logger.Infof("Creating p2p client for node ID [%s] with tlsEnabled = %v", nodeID, tlsEnabled)
logger.Debugf("Creating p2p client for node ID [%s] with tlsEnabled = %v", nodeID, tlsEnabled)
caCertPool, err := newRootCACertPool(rootCAs)
if err != nil {
return nil, errors.Wrapf(err, "failed to read root CA certs")
@@ -40,7 +40,7 @@ func newClient(streamProvider clientStreamProvider, nodeID host2.PeerID, rootCAs
nodeID: nodeID,
streamProvider: streamProvider,
}
logger.Infof("Created p2p client for node ID [%s] with %d root CAs and InsecureSkipVerify = %v", nodeID, len(rootCAs), c.tlsConfig.InsecureSkipVerify)
logger.Debugf("Created p2p client for node ID [%s] with %d root CAs and InsecureSkipVerify = %v", nodeID, len(rootCAs), c.tlsConfig.InsecureSkipVerify)
return c, nil
}

7 changes: 4 additions & 3 deletions platform/view/services/comm/host/rest/host.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ package rest
import (
"context"
"crypto/tls"
"fmt"
"net/http"

host2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host"
@@ -23,7 +24,7 @@ const (
contextIDLabel tracing.LabelName = "context_id"
)

var logger = flogging.MustGetLogger("rest-p2p-host")
var logger = flogging.MustGetLogger("view-sdk.services.comm.rest-p2p-host")

type host struct {
routing routing2.ServiceDiscovery
@@ -38,7 +39,7 @@ type StreamProvider interface {
}

func NewHost(nodeID host2.PeerID, listenAddress host2.PeerIPAddress, routing routing2.ServiceDiscovery, tracerProvider trace.TracerProvider, streamProvider StreamProvider, keyFile, certFile string, rootCACertFiles []string) (*host, error) {
logger.Infof("Creating new host for node [%s] on [%s] with key, cert at: [%s], [%s]", nodeID, listenAddress, keyFile, certFile)
logger.Debugf("Creating new host for node [%s] on [%s] with key, cert at: [%s], [%s]", nodeID, listenAddress, keyFile, certFile)
p2pClient, err := newClient(streamProvider, nodeID, rootCACertFiles, len(keyFile) > 0 && len(certFile) > 0)
if err != nil {
return nil, errors.Wrapf(err, "failed to create client")
@@ -94,5 +95,5 @@ func (h *host) Close() error {
func (h *host) Wait() {}

func StreamHash(info host2.StreamInfo) host2.StreamHash {
return info.RemotePeerAddress
return fmt.Sprintf("%s.%s.%s.%s", info.RemotePeerID, info.RemotePeerAddress, info.SessionID, info.ContextID)
}
6 changes: 3 additions & 3 deletions platform/view/services/comm/host/rest/routing/idrouter.go
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@ func NewEndpointServiceIDRouter(es endpointService) *endpointServiceIDRouter {
}

func (r *endpointServiceIDRouter) Lookup(id host2.PeerID) ([]host2.PeerIPAddress, bool) {
logger.Infof("Looking up endpoint of peer [%s]", id)
logger.Debugf("Looking up endpoint of peer [%s]", id)
identity, err := r.es.GetIdentity("", []byte(id))
if err != nil {
logger.Errorf("failed getting identity for peer [%s]", id)
@@ -43,10 +43,10 @@ func (r *endpointServiceIDRouter) Lookup(id host2.PeerID) ([]host2.PeerIPAddress
return []host2.PeerIPAddress{}, false
}
if address, ok := addresses[driver.P2PPort]; ok {
logger.Infof("Found endpoint of peer [%s]: [%s]", id, address)
logger.Debugf("Found endpoint of peer [%s]: [%s]", id, address)
return []host2.PeerIPAddress{address}, true
}
logger.Infof("Did not find endpoint of peer [%s]", id)
logger.Debugf("Did not find endpoint of peer [%s]", id)
return []host2.PeerIPAddress{}, false
}

Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ func newStaticLabelRouter(configPath string) (*staticLabelRouter, error) {
return nil, errors.Wrapf(err, "failed to unmarshal config")
}

logger.Infof("Found routes: %v", wrapper.Routes)
logger.Debugf("Found routes: %v", wrapper.Routes)
return &wrapper.Routes, nil
}

2 changes: 1 addition & 1 deletion platform/view/services/comm/host/rest/routing/routing.go
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
)

var logger = flogging.MustGetLogger("rest-p2p-routing")
var logger = flogging.MustGetLogger("view-sdk.services.comm.rest-p2p-routing")

// ServiceDiscovery is the interface that resolves the IP addresses given the ID of a peer
type ServiceDiscovery interface {
8 changes: 4 additions & 4 deletions platform/view/services/comm/host/rest/server.go
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ func newServer(streamProvider serverStreamProvider, listenAddress host2.PeerIPAd
}

func newHandler(streamProvider serverStreamProvider, newStreamCallback func(stream host2.P2PStream)) *gin.Engine {
logger.Infof("Creating GIN engine for p2p REST endpoint.")
logger.Debugf("Creating GIN engine for p2p REST endpoint.")
r := gin.New()
r.Use(gin.LoggerWithFormatter(func(param gin.LogFormatterParams) string {
// your custom format
@@ -73,10 +73,10 @@ func (s *server) Start(newStreamCallback func(stream host2.P2PStream)) error {

var err error
if len(s.certFile) == 0 || len(s.keyFile) == 0 {
logger.Infof("Starting up REST server without TLS on [%s]...", s.srv.Addr)
logger.Debugf("Starting up REST server without TLS on [%s]...", s.srv.Addr)
err = s.srv.ListenAndServe()
} else {
logger.Infof("Starting up REST server with TLS on [%s]...", s.srv.Addr)
logger.Debugf("Starting up REST server with TLS on [%s]...", s.srv.Addr)
err = s.srv.ListenAndServeTLS(s.certFile, s.keyFile)
}
if !errors.Is(err, http.ErrServerClosed) {
@@ -86,7 +86,7 @@ func (s *server) Start(newStreamCallback func(stream host2.P2PStream)) error {
}

func (s *server) Close() error {
logger.Infof("Shutting down server on [%s]", s.srv.Addr)
logger.Debugf("Shutting down server on [%s]", s.srv.Addr)
shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownRelease()
return s.srv.Shutdown(shutdownCtx)
2 changes: 1 addition & 1 deletion platform/view/services/comm/host/rest/websocket/common.go
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
)

var logger = flogging.MustGetLogger("view-sdk.host.rest")
var logger = flogging.MustGetLogger("view-sdk.services.comm.rest.host")

// StreamMeta is the first message sent from the websocket client to transmit metadata information
type StreamMeta struct {
Original file line number Diff line number Diff line change
@@ -92,7 +92,7 @@ func (c *MultiplexedProvider) NewClientStream(info host2.StreamInfo, ctx context
return nil, errors.Wrapf(err, "failed to open websocket")
}
conn = newClientConn(wsConn, c.tracer, c.m, func() {
logger.Infof("Closing websocket client for [%s@%s]...", src, info.RemotePeerAddress)
logger.Debugf("Closing websocket client for [%s@%s]...", src, info.RemotePeerAddress)
c.mu.Lock()
defer c.mu.Unlock()
delete(c.clients, url.String())
@@ -133,7 +133,7 @@ func (c *multiplexedClientConn) newClientSubConn(ctx context.Context, src host2.
sc := c.newSubConn(NewSubConnId())
c.subConns[sc.id] = sc
c.mu.Unlock()
logger.Infof("Created client subconn with id [%s]", sc.id)
logger.Debugf("Created client subconn with id [%s]", sc.id)
spanContext := trace.SpanContextFromContext(ctx)
marshalledSpanContext, err := tracing.MarshalContext(spanContext)
if err != nil {
@@ -168,7 +168,7 @@ func (c *multiplexedClientConn) readIncoming() {
sc.reads <- streamEOF
}
err := c.Conn.Close()
logger.Infof("Client connection closed: %v", err)
logger.Debugf("Client connection closed: %v", err)
}()
var mm MultiplexedMessage
for {
@@ -212,7 +212,7 @@ func (c *multiplexedServerConn) readIncoming(newStreamCallback func(pStream host
sc.reads <- streamEOF
}
err := c.Conn.Close()
logger.Infof("Connection closed: %v", err)
logger.Debugf("Connection closed: %v", err)
}()
var mm MultiplexedMessage
for {
@@ -245,7 +245,7 @@ func (c *multiplexedServerConn) newServerSubConn(newStreamCallback func(pStream
if err := json.Unmarshal(mm.Msg, &meta); err != nil {
logger.Errorf("failed to read meta info: %v", err)
}
logger.Debugf("Read meta info: [%s,%s]: %s", meta.ContextID, meta.SessionID, meta.SpanContext)
logger.Debugf("Read meta info: [%s] [%s,%s]: %s", mm.ID, meta.ContextID, meta.SessionID, meta.SpanContext)
// Propagating the request context will not make a difference (see comment in newClientStream)
spanContext, err := tracing.UnmarshalContext(meta.SpanContext)
if err != nil {
@@ -315,14 +315,14 @@ func (c *multiplexedBaseConn) readCloses(ctx context.Context) {
for {
select {
case <-ctx.Done():
logger.Infof("Stop waiting for closes")
logger.Debugf("Stop waiting for closes")
c.mu.Lock()
c.m.ClosedSubConns.With(sideLabel, c.side).Add(float64(len(c.subConns)))
c.subConns = make(map[SubConnId]*subConn)
c.mu.Unlock()
return
case id := <-c.closes:
logger.Infof("Closing sub conn [%v]", id)
logger.Debugf("Closing sub conn [%v]", id)
c.mu.Lock()
delete(c.subConns, id)
c.mu.Unlock()
@@ -336,7 +336,7 @@ func (c *multiplexedBaseConn) readOutgoing(ctx context.Context) {
for {
select {
case <-ctx.Done():
logger.Infof("Closing all outgoing connections")
logger.Debugf("Closing all outgoing connections")
return
case msg := <-c.writes:
c.writeMu.Lock()
@@ -369,6 +369,7 @@ func (c *subConn) ReadMessage() (messageType int, p []byte, err error) {
r := <-c.reads
return websocket.TextMessage, r.value, r.err
}

func (c *subConn) WriteMessage(_ int, data []byte) error {
c.writes <- MultiplexedMessage{c.id, data}
return <-c.writeErrs
2 changes: 1 addition & 1 deletion platform/view/services/comm/io/logger.go
Original file line number Diff line number Diff line change
@@ -8,4 +8,4 @@ package io

import "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"

var logger = flogging.MustGetLogger("view-sdk.io")
var logger = flogging.MustGetLogger("view-sdk.services.comm.io")
2 changes: 1 addition & 1 deletion platform/view/services/comm/io/streamio/reader.go
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
)

var logger = flogging.MustGetLogger("view-sdk.apps.ipo")
var logger = flogging.MustGetLogger("view-sdk.services.comm.io")

// MsgReader wraps a message-based Read function
type MsgReader interface {
3 changes: 2 additions & 1 deletion platform/view/services/comm/master.go
Original file line number Diff line number Diff line change
@@ -94,13 +94,14 @@ func (p *P2PNode) DeleteSessions(ctx context.Context, sessionID string) {
p.sessionsMutex.Lock()
defer p.sessionsMutex.Unlock()

for key := range p.sessions {
for key, session := range p.sessions {
// if key starts with sessionID, delete it
if strings.HasPrefix(key, sessionID) {
span.AddEvent("delete_session", tracing.WithAttributes(tracing.String("session_key", sessionIDLabel)))
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("deleting session [%s]", key)
}
session.closeInternal()
delete(p.sessions, key)
}
}
54 changes: 33 additions & 21 deletions platform/view/services/comm/p2p.go
Original file line number Diff line number Diff line change
@@ -16,13 +16,13 @@ import (

"github.com/gogo/protobuf/io"
"github.com/gogo/protobuf/proto"
"github.com/hyperledger-labs/fabric-smart-client/pkg/utils/errors"
proto2 "github.com/hyperledger-labs/fabric-smart-client/pkg/utils/proto"
host2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/tracing"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap/zapcore"
)
@@ -35,7 +35,7 @@ const (

var errStreamNotFound = errors.New("stream not found")

var logger = flogging.MustGetLogger("view-sdk")
var logger = flogging.MustGetLogger("view-sdk.services.comm")

type messageWithStream struct {
message *view.Message
@@ -183,30 +183,32 @@ func (p *P2PNode) dispatchMessages(ctx context.Context) {

func (p *P2PNode) sendWithCachedStreams(streamHash string, msg proto.Message) error {
if len(streamHash) == 0 {
logger.Debugf("Empty stream hash probably because of uninitialized data. New stream must be created.")
return errStreamNotFound
logger.Debugf("empty stream hash probably because of uninitialized data. New stream must be created.")
return errors.Wrapf(errStreamNotFound, "stream hash is empty")
}
p.streamsMutex.RLock()
defer p.streamsMutex.RUnlock()
logger.Debugf("send msg to stream hash [%s] of [%d] with #stream [%d]", streamHash, len(p.streams), len(p.streams[streamHash]))
for _, stream := range p.streams[streamHash] {
err := stream.send(msg)
if err == nil {
logger.Debugf("send msg [%v] with stream [%s]", msg, stream.stream.Hash())
return nil
}
// TODO: handle the case in which there's an error
logger.Errorf("error while sending message [%s] to stream with hash [%s]: %s", msg, streamHash, err)
}

return errStreamNotFound
return errors.Wrapf(errStreamNotFound, "all [%d] streams for hash [%s] failed to send", len(p.streams), streamHash)
}

// sendTo sends the passed messaged to the p2p peer with the passed ID.
// If no address is specified, then p2p will use one of the IP addresses associated to the peer in its peer store.
// If an address is specified, then the peer store will be updated with the passed address.
func (p *P2PNode) sendTo(ctx context.Context, info host2.StreamInfo, msg proto.Message) error {
streamHash := p.host.StreamHash(info)
if err := p.sendWithCachedStreams(streamHash, msg); err != errStreamNotFound {
return err
if err := p.sendWithCachedStreams(streamHash, msg); !errors.Is(err, errStreamNotFound) {
return errors.Wrap(err, "error while sending message to cached stream")
}

nwStream, err := p.host.NewStream(ctx, info)
@@ -216,7 +218,11 @@ func (p *P2PNode) sendTo(ctx context.Context, info host2.StreamInfo, msg proto.M
}
p.handleOutgoingStream(nwStream)

return p.sendWithCachedStreams(nwStream.Hash(), msg)
err = p.sendWithCachedStreams(nwStream.Hash(), msg)
if err != nil {
return errors.Wrap(err, "error while sending message to freshly created stream")
}
return nil
}

func (p *P2PNode) handleOutgoingStream(stream host2.P2PStream) {
@@ -235,9 +241,14 @@ func (p *P2PNode) handleStream(stream host2.P2PStream) {
node: p,
}

logger.Debugf("Adding new stream [%s]", stream.Hash())
streamHash := stream.Hash()
p.streamsMutex.Lock()
logger.Debugf(
"adding new stream handler to hash [%s](of [%d]) with #handlers [%d]",
streamHash,
len(p.streams),
len(p.streams[streamHash]),
)
p.streams[streamHash] = append(p.streams[streamHash], sh)
p.m.StreamHashes.Set(float64(len(p.streams)))
p.m.ActiveStreams.Add(1)
@@ -281,26 +292,24 @@ func (s *streamHandler) handleIncoming() {
if err != nil {
if s.node.isStopping {
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("error reading message while closing. ignoring.", err.Error())
logger.Debugf("error reading message while closing, ignoring [%s]", err)
}
span.End()
break
}

logger.Debugf("error reading message: [%s][%s]", err.Error(), debug.Stack())
streamHash := s.stream.Hash()
logger.Debugf("error reading message from stream [%s]: [%s][%s]", streamHash, err, debug.Stack())

// remove stream handler
streamHash := s.node.host.StreamHash(host2.StreamInfo{
RemotePeerID: s.stream.RemotePeerID(),
RemotePeerAddress: s.stream.RemotePeerAddress(),
ContextID: msg.ContextID,
SessionID: msg.SessionID,
})
s.node.streamsMutex.Lock()
logger.Debugf("Removing stream [%s]. Total streams found: %d", streamHash, len(s.node.streams[streamHash]))
logger.Debugf("removing stream [%s], total streams found: %d", streamHash, len(s.node.streams[streamHash]))
for i, thisSH := range s.node.streams[streamHash] {
if thisSH == s {
s.node.streams[streamHash] = append(s.node.streams[streamHash][:i], s.node.streams[streamHash][i+1:]...)
if len(s.node.streams[streamHash]) == 0 {
delete(s.node.streams, streamHash)
}
s.node.m.StreamHashes.Set(float64(len(s.node.streams)))
s.node.m.ActiveStreams.Add(-1)
s.wg.Done()
@@ -339,9 +348,12 @@ func (s *streamHandler) close(ctx context.Context) {
_, span := s.node.messageTracer.Start(ctx, "stream_close")
span.AddLink(trace.LinkFromContext(s.stream.Context()))
defer span.End()
s.reader.Close()
s.writer.Close()
s.stream.Close()
// no need to close reader and writer when closing the stream
//s.reader.Close()
//s.writer.Close()
if err := s.stream.Close(); err != nil {
logger.Errorf("error closing stream [%s]: [%s]", s.stream.Hash(), err)
}
s.node.m.ClosedStreams.Add(1)
// s.wg.Wait()
}
42 changes: 36 additions & 6 deletions platform/view/services/comm/session.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ package comm

import (
"context"
"runtime/debug"
"sync"

"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host"
@@ -69,32 +70,43 @@ func (n *NetworkStreamSession) Receive() <-chan *view.Message {

// Close releases all the resources allocated by this session
func (n *NetworkStreamSession) Close() {
defer logger.Debugf("Closing session [%s]", n.sessionID)
n.node.sessionsMutex.Lock()
defer n.node.sessionsMutex.Unlock()

n.closeInternal()
}

func (n *NetworkStreamSession) closeInternal() {
if n.closed {
return
}

logger.Debugf("closing session [%s] with [%d] streams", n.sessionID, len(n.streams))
toClose := make([]*streamHandler, 0, len(n.streams))
for stream := range n.streams {
logger.Debugf("session [%s], stream [%s], refCtr [%d]", n.sessionID, stream.stream.Hash(), stream.refCtr)
stream.refCtr--
if stream.refCtr == 0 {
toClose = append(toClose, stream)
}
}
n.node.sessionsMutex.Unlock()

if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("Closing session stream [%s]", n.sessionID)
logger.Debugf("closing session [%s]'s streams [%d]", n.sessionID, len(toClose))
}
for _, stream := range toClose {
stream.close(context.TODO())
}

if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("Closing session incoming [%s]", n.sessionID)
logger.Debugf("closing session [%s]'s streams [%d] done", n.sessionID, len(toClose))
}
close(n.incoming)
n.closed = true
n.streams = make(map[*streamHandler]struct{})

if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("Closing session [%s] done", n.sessionID)
logger.Debugf("closing session [%s] done", n.sessionID)
}
}

@@ -113,7 +125,25 @@ func (n *NetworkStreamSession) sendWithStatus(ctx context.Context, payload []byt
Payload: payload,
})
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("sent message [len:%d] to [%s:%s] with err [%s]", len(payload), string(n.endpointID), n.endpointAddress, err)
logger.Debugf(
"sent message [len:%d] to [%s:%s] from [%s] with err [%s]",
len(payload),
string(n.endpointID),
n.endpointAddress,
n.callerViewID,
err,
)
if len(n.callerViewID) == 0 {
logger.Debugf(
"sent message [len:%d] to [%s:%s] from [%s] with err [%s][%s]",
len(payload),
string(n.endpointID),
n.endpointAddress,
n.callerViewID,
err,
debug.Stack(),
)
}
}
return err
}