Skip to content

Add NATS instrumentation #13999

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .fossa.yml
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,12 @@ targets:
- type: gradle
path: ./
target: ':instrumentation:mongo:mongo-async-3.3:javaagent'
- type: gradle
path: ./
target: ':instrumentation:nats:nats-2.21:javaagent'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it passes for 2.17.2+, should I change 2.21 to 2.17.2 everywhere? (library included)

- type: gradle
path: ./
target: ':instrumentation:nats:nats-2.21:library'
- type: gradle
path: ./
target: ':instrumentation:netty:netty-3.8:javaagent'
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ out/
######################
.vscode
**/bin/
.metals

# Others #
##########
Expand Down
1 change: 1 addition & 0 deletions docs/supported-libraries.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ These are the supported libraries and frameworks:
| [Micrometer](https://micrometer.io/) | 1.5+ (disabled by default) | [opentelemetry-micrometer-1.5](../instrumentation/micrometer/micrometer-1.5/library) | none |
| [MongoDB Driver](https://mongodb.github.io/mongo-java-driver/) | 3.1+ | [opentelemetry-mongo-3.1](../instrumentation/mongo/mongo-3.1/library) | [Database Client Spans], [Database Client Metrics] [6] |
| [MyBatis](https://mybatis.org/mybatis-3/) | 3.2+ | N/A | none |
| [NATS](https://github.com/nats-io/nats.java) | 3.8+ | [nats-2.21](../instrumentation/nats/nats-2.21/library) | [Messaging Spans] |
| [Netty HTTP codec [5]](https://github.com/netty/netty) | 3.8+ | [opentelemetry-netty-4.1](../instrumentation/netty/netty-4.1/library) | [HTTP Client Spans], [HTTP Client Metrics], [HTTP Server Spans], [HTTP Server Metrics] |
| [OpenSearch Rest Client](https://github.com/opensearch-project/opensearch-java) | 1.0+ | | [Database Client Spans], [Database Client Metrics] [6] |
| [OkHttp](https://github.com/square/okhttp/) | 2.2+ | [opentelemetry-okhttp-3.0](../instrumentation/okhttp/okhttp-3.0/library) | [HTTP Client Spans], [HTTP Client Metrics] |
Expand Down
20 changes: 20 additions & 0 deletions instrumentation/nats/nats-2.21/javaagent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Auth-instrumentation for NATS version 2.21

Provides OpenTelemetry auto-instrumentation for [NATS 2.21](https://github.com/nats-io/nats.java).

### Trace propagation

It's recommended to provide `Message` with a writable `Header` structure
to allow propagation between publishers and subscribers. Without headers,
the tracing context will not be propagated in the headers.

```java
import io.nats.client.impl.Headers;
import io.nats.client.impl.NatsMessage;

// don't
Message msg = NatsMessage.builder().subject("sub").build();

// do
Message msg = NatsMessage.builder().subject("sub").headers(new Headers()).build();
```
39 changes: 39 additions & 0 deletions instrumentation/nats/nats-2.21/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("io.nats")
module.set("jnats")
versions.set("[2.17.2,)")
assertInverse.set(true)
}
}

dependencies {
library("io.nats:jnats:2.21.0")

implementation(project(":instrumentation:nats:nats-2.21:library"))
testImplementation(project(":instrumentation:nats:nats-2.21:testing"))
}

tasks {
val testMessagingReceive by registering(Test::class) {
filter {
includeTestsMatching("NatsInstrumentationMessagingReceiveTest")
}
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
}

test {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
filter {
excludeTestsMatching("NatsInstrumentationMessagingReceiveTest")
}
}

check {
dependsOn(testMessagingReceive)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.nats.v2_21;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.concurrent.CompletableFuture;

public final class CompletableFutureWrapper {

private CompletableFutureWrapper() {}

public static <T> CompletableFuture<T> wrap(CompletableFuture<T> future, Context context) {
CompletableFuture<T> result = new CompletableFuture<>();
future.whenComplete(
(T value, Throwable throwable) -> {
try (Scope ignored = context.makeCurrent()) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
result.complete(value);
}
}
});

return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.nats.v2_21;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
import static io.opentelemetry.javaagent.instrumentation.nats.v2_21.NatsSingletons.PRODUCER_INSTRUMENTER;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.impl.Headers;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsRequest;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class ConnectionPublishInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return implementsInterface(named("io.nats.client.Connection"));
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isPublic()
.and(named("publish"))
.and(takesArguments(2))
.and(takesArgument(0, String.class))
.and(takesArgument(1, byte[].class)),
ConnectionPublishInstrumentation.class.getName() + "$PublishBodyAdvice");
transformer.applyAdviceToMethod(
isPublic()
.and(named("publish"))
.and(takesArguments(3))
.and(takesArgument(0, String.class))
.and(takesArgument(1, named("io.nats.client.impl.Headers")))
.and(takesArgument(2, byte[].class)),
ConnectionPublishInstrumentation.class.getName() + "$PublishHeadersBodyAdvice");
transformer.applyAdviceToMethod(
isPublic()
.and(named("publish"))
.and(takesArguments(3))
.and(takesArgument(0, String.class))
.and(takesArgument(1, String.class))
.and(takesArgument(2, byte[].class)),
ConnectionPublishInstrumentation.class.getName() + "$PublishReplyToBodyAdvice");
transformer.applyAdviceToMethod(
isPublic()
.and(named("publish"))
.and(takesArguments(4))
.and(takesArgument(0, String.class))
.and(takesArgument(1, String.class))
.and(takesArgument(2, named("io.nats.client.impl.Headers")))
.and(takesArgument(3, byte[].class)),
ConnectionPublishInstrumentation.class.getName() + "$PublishReplyToHeadersBodyAdvice");
transformer.applyAdviceToMethod(
isPublic()
.and(named("publish"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.nats.client.Message"))),
ConnectionPublishInstrumentation.class.getName() + "$PublishMessageAdvice");
}

@SuppressWarnings("unused")
public static class PublishBodyAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This Connection connection,
@Advice.Argument(0) String subject,
@Advice.Argument(1) byte[] body,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("natsRequest") NatsRequest natsRequest) {
Context parentContext = Context.current();
natsRequest = NatsRequest.create(connection, null, subject, null, body);

if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
return;
}

otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
otelScope = otelContext.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("natsRequest") NatsRequest natsRequest) {
if (otelScope == null) {
return;
}

otelScope.close();
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
}
}

@SuppressWarnings("unused")
public static class PublishHeadersBodyAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This Connection connection,
@Advice.Argument(0) String subject,
@Advice.Argument(1) Headers headers,
@Advice.Argument(2) byte[] body,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("natsRequest") NatsRequest natsRequest) {
Context parentContext = Context.current();
natsRequest = NatsRequest.create(connection, null, subject, headers, body);

if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
return;
}

otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
otelScope = otelContext.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("natsRequest") NatsRequest natsRequest) {
if (otelScope == null) {
return;
}

otelScope.close();
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
}
}

@SuppressWarnings("unused")
public static class PublishReplyToBodyAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This Connection connection,
@Advice.Argument(0) String subject,
@Advice.Argument(1) String replyTo,
@Advice.Argument(2) byte[] body,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("natsRequest") NatsRequest natsRequest) {
Context parentContext = Context.current();
natsRequest = NatsRequest.create(connection, replyTo, subject, null, body);

if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
return;
}

otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
otelScope = otelContext.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("natsRequest") NatsRequest natsRequest) {
if (otelScope == null) {
return;
}

otelScope.close();
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
}
}

@SuppressWarnings("unused")
public static class PublishReplyToHeadersBodyAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This Connection connection,
@Advice.Argument(0) String subject,
@Advice.Argument(1) String replyTo,
@Advice.Argument(2) Headers headers,
@Advice.Argument(3) byte[] body,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("natsRequest") NatsRequest natsRequest) {
Context parentContext = Context.current();
natsRequest = NatsRequest.create(connection, replyTo, subject, headers, body);

if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
return;
}

otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
otelScope = otelContext.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("natsRequest") NatsRequest natsRequest) {
if (otelScope == null) {
return;
}

otelScope.close();
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
}
}

@SuppressWarnings("unused")
public static class PublishMessageAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This Connection connection,
@Advice.Argument(0) Message message,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("natsRequest") NatsRequest natsRequest) {
Context parentContext = Context.current();
natsRequest = NatsRequest.create(connection, message);

if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
return;
}

otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
otelScope = otelContext.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelContext") Context otelContext,
@Advice.Local("otelScope") Scope otelScope,
@Advice.Local("natsRequest") NatsRequest natsRequest) {
if (otelScope == null) {
return;
}

otelScope.close();
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
}
}
}
Loading
Loading