Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit d4619f0

Browse files
garyrussellartembilan
authored andcommittedOct 6, 2022
GH-2198: Observability Documentation
Resolves #2417 Refactor for latest snapshots; add documentation generation. Use NOOP registry and supplier for context. GH-2417: Fix class tangles. Disable auto doc generation and polish manually. Remove unnecessary `Supplier` casts. Remove incorrect `@Nullable`. Add Remote Service Name to Contexts Include the cluster id if possible. * Move deps to the latest snapshots
1 parent 239e350 commit d4619f0

18 files changed

+348
-146
lines changed
 

‎build.gradle

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
buildscript {
22
ext.kotlinVersion = '1.7.0'
3+
ext.isCI = System.getenv('GITHUB_ACTION') || System.getenv('bamboo_buildKey')
34
repositories {
45
mavenCentral()
56
gradlePluginPortal()
@@ -66,15 +67,16 @@ ext {
6667
junitJupiterVersion = '5.9.0'
6768
kafkaVersion = '3.3.1'
6869
log4jVersion = '2.18.0'
69-
micrometerVersion = '1.10.0-M6'
70-
micrometerTracingVersion = '1.0.0-M8'
70+
micrometerDocsVersion = "1.0.0-SNAPSHOT"
71+
micrometerVersion = '1.10.0-SNAPSHOT'
72+
micrometerTracingVersion = '1.0.0-SNAPSHOT'
7173
mockitoVersion = '4.5.1'
72-
reactorVersion = '2022.0.0-M6'
74+
reactorVersion = '2022.0.0-SNAPSHOT'
7375
scalaVersion = '2.13'
7476
springBootVersion = '2.6.11' // docs module
75-
springDataVersion = '2022.0.0-M6'
76-
springRetryVersion = '2.0.0-M1'
77-
springVersion = '6.0.0-M6'
77+
springDataVersion = '2022.0.0-SNAPSHOT'
78+
springRetryVersion = '2.0.0-SNAPSHOT'
79+
springVersion = '6.0.0-SNAPSHOT'
7880
zookeeperVersion = '3.6.3'
7981

8082
idPrefix = 'kafka'
@@ -241,7 +243,7 @@ subprojects { subproject ->
241243
}
242244

243245
task updateCopyrights {
244-
onlyIf { gitPresent && !System.getenv('GITHUB_ACTION') && !System.getenv('bamboo_buildKey') }
246+
onlyIf { !isCI }
245247
if (gitPresent) {
246248
inputs.files(modifiedFiles.filter { f -> f.path.contains(subproject.name) })
247249
}
@@ -300,13 +302,17 @@ subprojects { subproject ->
300302
tasks.withType(Javadoc) {
301303
options.addBooleanOption('Xdoclint:syntax', true) // only check syntax with doclint
302304
options.addBooleanOption('Werror', true) // fail build on Javadoc warnings
303-
}
305+
}
304306

305307
}
306308

307309
project ('spring-kafka') {
308310
description = 'Spring Kafka Support'
309311

312+
configurations {
313+
adoc
314+
}
315+
310316
dependencies {
311317
api 'org.springframework:spring-context'
312318
api 'org.springframework:spring-messaging'
@@ -346,7 +352,36 @@ project ('spring-kafka') {
346352
testImplementation 'io.micrometer:micrometer-tracing-bridge-brave'
347353
testImplementation 'io.micrometer:micrometer-tracing-test'
348354
testImplementation 'io.micrometer:micrometer-tracing-integration-test'
355+
356+
adoc "io.micrometer:micrometer-docs-generator-spans:$micrometerDocsVersion"
357+
adoc "io.micrometer:micrometer-docs-generator-metrics:$micrometerDocsVersion"
349358
}
359+
360+
def inputDir = file('src/main/java/org/springframework/kafka/support/micrometer').absolutePath
361+
def outputDir = rootProject.file('spring-kafka-docs/src/main/asciidoc').absolutePath
362+
363+
task generateObservabilityMetricsDocs(type: JavaExec) {
364+
onlyIf { !isCI }
365+
mainClass = 'io.micrometer.docs.metrics.DocsFromSources'
366+
inputs.dir(inputDir)
367+
outputs.dir(outputDir)
368+
classpath configurations.adoc
369+
args inputDir, '.*', outputDir
370+
}
371+
372+
task generateObservabilitySpansDocs(type: JavaExec) {
373+
onlyIf { !isCI }
374+
mainClass = 'io.micrometer.docs.spans.DocsFromSources'
375+
inputs.dir(inputDir)
376+
outputs.dir(outputDir)
377+
classpath configurations.adoc
378+
args inputDir, '.*', outputDir
379+
}
380+
381+
// javadoc {
382+
// finalizedBy generateObservabilityMetricsDocs, generateObservabilitySpansDocs
383+
// }
384+
350385
}
351386

352387
project ('spring-kafka-test') {
@@ -564,7 +599,7 @@ task distZip(type: Zip, dependsOn: [docsZip]) { //, schemaZip]) {
564599
into "${baseDir}"
565600
}
566601

567-
from("$project.rootDir") {
602+
from("$project.rootDir") {
568603
include 'LICENSE.txt'
569604
into "${baseDir}"
570605
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[[observability-conventions]]
2+
=== Observability - Conventions
3+
4+
Below you can find a list of all `GlobalObservabilityConventions` and `ObservabilityConventions` declared by this project.
5+
6+
.ObservationConvention implementations
7+
|===
8+
|ObservationConvention Class Name | Applicable ObservationContext Class Name
9+
|`KafkaListenerObservation$DefaultKafkaListenerObservationConvention`|`KafkaRecordReceiverContext`
10+
|`KafkaTemplateObservation$DefaultKafkaTemplateObservationConvention`|`KafkaRecordSenderContext`
11+
|===
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
[[observability-metrics]]
2+
=== Observability - Metrics
3+
4+
Below you can find a list of all samples declared by this project.
5+
6+
[[observability-metrics-listener-observation]]
7+
==== Listener Observation
8+
9+
____
10+
Observation for Apache Kafka listeners.
11+
____
12+
13+
**Metric name** `spring.kafka.listener` (defined by convention class `KafkaListenerObservation$DefaultKafkaListenerObservationConvention`). **Type** `timer` and **base unit** `seconds`.
14+
15+
Name of the enclosing class `KafkaListenerObservation`.
16+
17+
IMPORTANT: All tags must be prefixed with `spring.kafka.listener` prefix!
18+
19+
.Low cardinality Keys
20+
[cols="a,a"]
21+
|===
22+
|Name | Description
23+
|`spring.kafka.listener.id`|Listener id (or listener container bean name).
24+
|===
25+
26+
[[observability-metrics-template-observation]]
27+
==== Template Observation
28+
29+
____
30+
Observation for KafkaTemplates.
31+
____
32+
33+
**Metric name** `spring.kafka.template` (defined by convention class `KafkaTemplateObservation$DefaultKafkaTemplateObservationConvention`). **Type** `timer` and **base unit** `seconds`.
34+
35+
Name of the enclosing class `KafkaTemplateObservation`.
36+
37+
IMPORTANT: All tags must be prefixed with `spring.kafka.template` prefix!
38+
39+
.Low cardinality Keys
40+
[cols="a,a"]
41+
|===
42+
|Name | Description
43+
|`spring.kafka.template.name`|Bean name of the template.
44+
|===
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
[[observability-spans]]
2+
=== Observability - Spans
3+
4+
Below you can find a list of all spans declared by this project.
5+
6+
[[observability-spans-listener-observation]]
7+
==== Listener Observation Span
8+
9+
> Observation for Apache Kafka listeners.
10+
11+
**Span name** `spring.kafka.listener` (defined by convention class `KafkaListenerObservation$DefaultKafkaListenerObservationConvention`).
12+
13+
Name of the enclosing class `KafkaListenerObservation`.
14+
15+
IMPORTANT: All tags and event names must be prefixed with `spring.kafka.listener` prefix!
16+
17+
.Tag Keys
18+
|===
19+
|Name | Description
20+
|`spring.kafka.listener.id`|Listener id (or listener container bean name).
21+
|===
22+
23+
[[observability-spans-template-observation]]
24+
==== Template Observation Span
25+
26+
> Observation for KafkaTemplates.
27+
28+
**Span name** `spring.kafka.template` (defined by convention class `KafkaTemplateObservation$DefaultKafkaTemplateObservationConvention`).
29+
30+
Name of the enclosing class `KafkaTemplateObservation`.
31+
32+
IMPORTANT: All tags and event names must be prefixed with `spring.kafka.template` prefix!
33+
34+
.Tag Keys
35+
|===
36+
|Name | Description
37+
|`spring.kafka.template.name`|Bean name of the template.
38+
|===

‎spring-kafka-docs/src/main/asciidoc/appendix.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ dependencies {
4949

5050
The test scope dependencies are only needed if you are using the embedded Kafka broker in tests.
5151

52+
[appendix]
53+
[[observation-gen]]
54+
== Micrometer Observation Documentation
55+
56+
include::./_metrics.adoc[]
57+
58+
include::./_spans.adoc[]
59+
60+
include::./_conventions.adoc[]
61+
5262
[appendix]
5363
[[history]]
5464
== Change History

‎spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3415,6 +3415,8 @@ The default implementations add the `bean.name` tag for template observations an
34153415

34163416
You can either subclass `DefaultKafkaTemplateObservationConvention` or `DefaultKafkaListenerObservationConvention` or provide completely new implementations.
34173417

3418+
See <<observation-gen>> for details of the observations that are recorded.
3419+
34183420
[[transactions]]
34193421
==== Transactions
34203422

‎spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.springframework.context.ApplicationContextAware;
5858
import org.springframework.core.log.LogAccessor;
5959
import org.springframework.kafka.KafkaException;
60+
import org.springframework.lang.Nullable;
6061

6162
/**
6263
* An admin that delegates to an {@link AdminClient} to create topics defined
@@ -95,6 +96,8 @@ public class KafkaAdmin extends KafkaResourceFactory
9596

9697
private boolean modifyTopicConfigs;
9798

99+
private String clusterId;
100+
98101
/**
99102
* Create an instance with an {@link AdminClient} based on the supplied
100103
* configuration.
@@ -197,6 +200,10 @@ public final boolean initialize() {
197200
}
198201
if (adminClient != null) {
199202
try {
203+
synchronized (this) {
204+
this.clusterId = adminClient.describeCluster().clusterId().get(this.operationTimeout,
205+
TimeUnit.MILLISECONDS);
206+
}
200207
addOrModifyTopicsIfNeeded(adminClient, newTopics);
201208
return true;
202209
}
@@ -218,6 +225,20 @@ public final boolean initialize() {
218225
return false;
219226
}
220227

228+
@Override
229+
@Nullable
230+
public String clusterId() {
231+
if (this.clusterId == null) {
232+
try (AdminClient client = createAdmin()) {
233+
this.clusterId = client.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.MILLISECONDS);
234+
}
235+
catch (Exception ex) {
236+
LOGGER.error(ex, "Could not obtaine cluster info");
237+
}
238+
}
239+
return this.clusterId;
240+
}
241+
221242
@Override
222243
public void createOrModifyTopics(NewTopic... topics) {
223244
try (AdminClient client = createAdmin()) {

‎spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdminOperations.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,8 @@
2121
import org.apache.kafka.clients.admin.NewTopic;
2222
import org.apache.kafka.clients.admin.TopicDescription;
2323

24+
import org.springframework.lang.Nullable;
25+
2426
/**
2527
* Provides a number of convenience methods wrapping {@code AdminClient}.
2628
*
@@ -49,4 +51,12 @@ public interface KafkaAdminOperations {
4951
*/
5052
Map<String, TopicDescription> describeTopics(String... topicNames);
5153

54+
/**
55+
* Return the cluster id, if available.
56+
* @return the describe cluster id.
57+
* @since 3.0
58+
*/
59+
@Nullable
60+
String clusterId();
61+
5262
}

‎spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848

4949
import org.springframework.beans.factory.BeanNameAware;
5050
import org.springframework.beans.factory.DisposableBean;
51-
import org.springframework.beans.factory.ObjectProvider;
5251
import org.springframework.beans.factory.SmartInitializingSingleton;
5352
import org.springframework.context.ApplicationContext;
5453
import org.springframework.context.ApplicationContextAware;
@@ -64,9 +63,9 @@
6463
import org.springframework.kafka.support.TopicPartitionOffset;
6564
import org.springframework.kafka.support.converter.MessagingMessageConverter;
6665
import org.springframework.kafka.support.converter.RecordMessageConverter;
67-
import org.springframework.kafka.support.micrometer.DefaultKafkaTemplateObservationConvention;
6866
import org.springframework.kafka.support.micrometer.KafkaRecordSenderContext;
6967
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation;
68+
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
7069
import org.springframework.kafka.support.micrometer.KafkaTemplateObservationConvention;
7170
import org.springframework.kafka.support.micrometer.MicrometerHolder;
7271
import org.springframework.lang.Nullable;
@@ -145,7 +144,11 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo
145144

146145
private KafkaTemplateObservationConvention observationConvention;
147146

148-
private ObservationRegistry observationRegistry;
147+
private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
148+
149+
private KafkaAdmin kafkaAdmin;
150+
151+
private String clusterId;
149152

150153
/**
151154
* Create an instance using the supplied producer factory and autoFlush false.
@@ -418,16 +421,25 @@ public void setObservationConvention(KafkaTemplateObservationConvention observat
418421

419422
@Override
420423
public void afterSingletonsInstantiated() {
421-
if (this.observationEnabled && this.observationRegistry == null && this.applicationContext != null) {
422-
ObjectProvider<ObservationRegistry> registry =
423-
this.applicationContext.getBeanProvider(ObservationRegistry.class);
424-
this.observationRegistry = registry.getIfUnique();
424+
if (this.observationEnabled && this.applicationContext != null) {
425+
this.observationRegistry = this.applicationContext.getBeanProvider(ObservationRegistry.class).getIfUnique();
426+
this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
427+
if (this.kafkaAdmin != null) {
428+
this.clusterId = this.kafkaAdmin.clusterId();
429+
}
425430
}
426431
else if (this.micrometerEnabled) {
427432
this.micrometerHolder = obtainMicrometerHolder();
428433
}
429434
}
430435

436+
private String clusterId() {
437+
if (this.kafkaAdmin != null && this.clusterId == null) {
438+
this.clusterId = this.kafkaAdmin.clusterId();
439+
}
440+
return this.clusterId;
441+
}
442+
431443
@Override
432444
public void onApplicationEvent(ContextStoppedEvent event) {
433445
if (this.customProducerFactory) {
@@ -668,15 +680,10 @@ protected void closeProducer(Producer<K, V> producer, boolean inTx) {
668680
}
669681

670682
private CompletableFuture<SendResult<K, V>> observeSend(final ProducerRecord<K, V> producerRecord) {
671-
Observation observation;
672-
if (!this.observationEnabled || this.observationRegistry == null) {
673-
observation = Observation.NOOP;
674-
}
675-
else {
676-
observation = KafkaTemplateObservation.TEMPLATE_OBSERVATION.observation(
677-
this.observationConvention, DefaultKafkaTemplateObservationConvention.INSTANCE,
678-
new KafkaRecordSenderContext(producerRecord, this.beanName), this.observationRegistry);
679-
}
683+
Observation observation = KafkaTemplateObservation.TEMPLATE_OBSERVATION.observation(
684+
this.observationConvention, DefaultKafkaTemplateObservationConvention.INSTANCE,
685+
() -> new KafkaRecordSenderContext(producerRecord, this.beanName, this::clusterId),
686+
this.observationRegistry);
680687
try {
681688
observation.start();
682689
return doSend(producerRecord, observation);
@@ -695,7 +702,7 @@ private CompletableFuture<SendResult<K, V>> observeSend(final ProducerRecord<K,
695702
* RecordMetadata}.
696703
*/
697704
protected CompletableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord,
698-
@Nullable Observation observation) {
705+
Observation observation) {
699706

700707
final Producer<K, V> producer = getTheProducer(producerRecord.topic());
701708
this.logger.trace(() -> "Sending: " + KafkaUtils.format(producerRecord));

‎spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.springframework.core.task.SimpleAsyncTaskExecutor;
8080
import org.springframework.kafka.KafkaException;
8181
import org.springframework.kafka.core.ConsumerFactory;
82+
import org.springframework.kafka.core.KafkaAdmin;
8283
import org.springframework.kafka.core.KafkaResourceHolder;
8384
import org.springframework.kafka.event.ConsumerFailedToStartEvent;
8485
import org.springframework.kafka.event.ConsumerPartitionPausedEvent;
@@ -107,8 +108,8 @@
107108
import org.springframework.kafka.support.LogIfLevelEnabled;
108109
import org.springframework.kafka.support.TopicPartitionOffset;
109110
import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;
110-
import org.springframework.kafka.support.micrometer.DefaultKafkaListenerObservationConvention;
111111
import org.springframework.kafka.support.micrometer.KafkaListenerObservation;
112+
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
112113
import org.springframework.kafka.support.micrometer.KafkaRecordReceiverContext;
113114
import org.springframework.kafka.support.micrometer.MicrometerHolder;
114115
import org.springframework.kafka.support.serializer.DeserializationException;
@@ -365,9 +366,9 @@ protected void doStart() {
365366
}
366367
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
367368
ListenerType listenerType = determineListenerType(listener);
368-
ObservationRegistry observationRegistry = null;
369+
ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
369370
ApplicationContext applicationContext = getApplicationContext();
370-
if (applicationContext != null) {
371+
if (applicationContext != null && containerProperties.isObservationEnabled()) {
371372
ObjectProvider<ObservationRegistry> registry =
372373
applicationContext.getBeanProvider(ObservationRegistry.class);
373374
observationRegistry = registry.getIfUnique();
@@ -777,6 +778,11 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
777778

778779
private final ObservationRegistry observationRegistry;
779780

781+
@Nullable
782+
private final KafkaAdmin kafkaAdmin;
783+
784+
private String clusterId;
785+
780786
private Map<TopicPartition, OffsetMetadata> definedPartitions;
781787

782788
private int count;
@@ -827,7 +833,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
827833

828834
@SuppressWarnings(UNCHECKED)
829835
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType,
830-
@Nullable ObservationRegistry observationRegistry) {
836+
ObservationRegistry observationRegistry) {
831837

832838
this.observationRegistry = observationRegistry;
833839
Properties consumerProperties = propertiesFromProperties();
@@ -909,6 +915,31 @@ else if (listener instanceof MessageListener) {
909915
this.lastReceivePartition = new HashMap<>();
910916
this.lastAlertPartition = new HashMap<>();
911917
this.wasIdlePartition = new HashMap<>();
918+
this.kafkaAdmin = obtainAdmin();
919+
obtainClusterId();
920+
}
921+
922+
@Nullable
923+
private KafkaAdmin obtainAdmin() {
924+
ApplicationContext applicationContext = getApplicationContext();
925+
if (applicationContext != null) {
926+
return applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
927+
}
928+
return null;
929+
}
930+
931+
@Nullable
932+
private String clusterId() {
933+
if (this.clusterId == null && this.kafkaAdmin != null) {
934+
obtainClusterId();
935+
}
936+
return this.clusterId;
937+
}
938+
939+
private void obtainClusterId() {
940+
if (this.kafkaAdmin != null) {
941+
this.clusterId = this.kafkaAdmin.clusterId();
942+
}
912943
}
913944

914945
@Nullable
@@ -2706,16 +2737,11 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
27062737
Iterator<ConsumerRecord<K, V>> iterator) {
27072738

27082739
Object sample = startMicrometerSample();
2709-
Observation observation;
2710-
if (!this.containerProperties.isObservationEnabled() || this.observationRegistry == null) {
2711-
observation = Observation.NOOP;
2712-
}
2713-
else {
2714-
observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
2715-
this.containerProperties.getObservationConvention(),
2716-
DefaultKafkaListenerObservationConvention.INSTANCE,
2717-
new KafkaRecordReceiverContext(record, getListenerId()), this.observationRegistry);
2718-
}
2740+
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
2741+
this.containerProperties.getObservationConvention(),
2742+
DefaultKafkaListenerObservationConvention.INSTANCE,
2743+
() -> new KafkaRecordReceiverContext(record, getListenerId(), this::clusterId),
2744+
this.observationRegistry);
27192745
return observation.observe(() -> {
27202746
try {
27212747
invokeOnMessage(record);

‎spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/DefaultKafkaListenerObservationConvention.java

Lines changed: 0 additions & 47 deletions
This file was deleted.

‎spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/DefaultKafkaTemplateObservationConvention.java

Lines changed: 0 additions & 47 deletions
This file was deleted.

‎spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616

1717
package org.springframework.kafka.support.micrometer;
1818

19+
import io.micrometer.common.KeyValues;
1920
import io.micrometer.common.docs.KeyName;
2021
import io.micrometer.observation.Observation.Context;
2122
import io.micrometer.observation.ObservationConvention;
22-
import io.micrometer.observation.docs.DocumentedObservation;
23+
import io.micrometer.observation.docs.ObservationDocumentation;
2324

2425
/**
2526
* Spring for Apache Kafka Observation for listeners.
@@ -28,10 +29,10 @@
2829
* @since 3.0
2930
*
3031
*/
31-
public enum KafkaListenerObservation implements DocumentedObservation {
32+
public enum KafkaListenerObservation implements ObservationDocumentation {
3233

3334
/**
34-
* Observation for Kafka listeners.
35+
* Observation for Apache Kafka listeners.
3536
*/
3637
LISTENER_OBSERVATION {
3738

@@ -59,7 +60,7 @@ public KeyName[] getLowCardinalityKeyNames() {
5960
public enum ListenerLowCardinalityTags implements KeyName {
6061

6162
/**
62-
* Listener id.
63+
* Listener id (or listener container bean name).
6364
*/
6465
LISTENER_ID {
6566

@@ -72,4 +73,37 @@ public String asString() {
7273

7374
}
7475

76+
/**
77+
* Default {@link KafkaListenerObservationConvention} for Kafka listener key values.
78+
*
79+
* @author Gary Russell
80+
* @since 3.0
81+
*
82+
*/
83+
public static class DefaultKafkaListenerObservationConvention implements KafkaListenerObservationConvention {
84+
85+
/**
86+
* A singleton instance of the convention.
87+
*/
88+
public static final DefaultKafkaListenerObservationConvention INSTANCE =
89+
new DefaultKafkaListenerObservationConvention();
90+
91+
@Override
92+
public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) {
93+
return KeyValues.of(KafkaListenerObservation.ListenerLowCardinalityTags.LISTENER_ID.asString(),
94+
context.getListenerId());
95+
}
96+
97+
@Override
98+
public String getContextualName(KafkaRecordReceiverContext context) {
99+
return context.getSource() + " receive";
100+
}
101+
102+
@Override
103+
public String getName() {
104+
return "spring.kafka.listener";
105+
}
106+
107+
}
108+
75109
}

‎spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.support.micrometer;
1818

1919
import java.nio.charset.StandardCharsets;
20+
import java.util.function.Supplier;
2021

2122
import org.apache.kafka.clients.consumer.ConsumerRecord;
2223
import org.apache.kafka.common.header.Header;
@@ -36,7 +37,7 @@ public class KafkaRecordReceiverContext extends ReceiverContext<ConsumerRecord<?
3637

3738
private final ConsumerRecord<?, ?> record;
3839

39-
public KafkaRecordReceiverContext(ConsumerRecord<?, ?> record, String listenerId) {
40+
public KafkaRecordReceiverContext(ConsumerRecord<?, ?> record, String listenerId, Supplier<String> clusterId) {
4041
super((carrier, key) -> {
4142
Header header = carrier.headers().lastHeader(key);
4243
if (header == null) {
@@ -47,6 +48,8 @@ public KafkaRecordReceiverContext(ConsumerRecord<?, ?> record, String listenerId
4748
setCarrier(record);
4849
this.record = record;
4950
this.listenerId = listenerId;
51+
String cluster = clusterId.get();
52+
setRemoteServiceName("Apache Kafka" + (cluster != null ? ": " + cluster : ""));
5053
}
5154

5255
public String getListenerId() {

‎spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.support.micrometer;
1818

1919
import java.nio.charset.StandardCharsets;
20+
import java.util.function.Supplier;
2021

2122
import org.apache.kafka.clients.producer.ProducerRecord;
2223

@@ -35,11 +36,13 @@ public class KafkaRecordSenderContext extends SenderContext<ProducerRecord<?, ?>
3536

3637
private final String destination;
3738

38-
public KafkaRecordSenderContext(ProducerRecord<?, ?> record, String beanName) {
39+
public KafkaRecordSenderContext(ProducerRecord<?, ?> record, String beanName, Supplier<String> clusterId) {
3940
super((carrier, key, value) -> record.headers().add(key, value.getBytes(StandardCharsets.UTF_8)));
4041
setCarrier(record);
4142
this.beanName = beanName;
4243
this.destination = record.topic();
44+
String cluster = clusterId.get();
45+
setRemoteServiceName("Apache Kafka" + (cluster != null ? ": " + cluster : ""));
4346
}
4447

4548
public String getBeanName() {

‎spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaTemplateObservation.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616

1717
package org.springframework.kafka.support.micrometer;
1818

19+
import io.micrometer.common.KeyValues;
1920
import io.micrometer.common.docs.KeyName;
2021
import io.micrometer.observation.Observation.Context;
2122
import io.micrometer.observation.ObservationConvention;
22-
import io.micrometer.observation.docs.DocumentedObservation;
23+
import io.micrometer.observation.docs.ObservationDocumentation;
2324

2425
/**
2526
* Spring for Apache Kafka Observation for
@@ -29,10 +30,10 @@
2930
* @since 3.0
3031
*
3132
*/
32-
public enum KafkaTemplateObservation implements DocumentedObservation {
33+
public enum KafkaTemplateObservation implements ObservationDocumentation {
3334

3435
/**
35-
* {@link org.springframework.kafka.core.KafkaTemplate} observation.
36+
* Observation for KafkaTemplates.
3637
*/
3738
TEMPLATE_OBSERVATION {
3839

@@ -72,4 +73,37 @@ public String asString() {
7273

7374
}
7475

76+
/**
77+
* Default {@link KafkaTemplateObservationConvention} for Kafka template key values.
78+
*
79+
* @author Gary Russell
80+
* @since 3.0
81+
*
82+
*/
83+
public static class DefaultKafkaTemplateObservationConvention implements KafkaTemplateObservationConvention {
84+
85+
/**
86+
* A singleton instance of the convention.
87+
*/
88+
public static final DefaultKafkaTemplateObservationConvention INSTANCE =
89+
new DefaultKafkaTemplateObservationConvention();
90+
91+
@Override
92+
public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) {
93+
return KeyValues.of(KafkaTemplateObservation.TemplateLowCardinalityTags.BEAN_NAME.asString(),
94+
context.getBeanName());
95+
}
96+
97+
@Override
98+
public String getContextualName(KafkaRecordSenderContext context) {
99+
return context.getDestination() + " send";
100+
}
101+
102+
@Override
103+
public String getName() {
104+
return "spring.kafka.template";
105+
}
106+
107+
}
108+
75109
}

‎spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationIntegrationTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.TimeUnit;
2525
import java.util.stream.Collectors;
2626

27+
import org.apache.kafka.clients.admin.AdminClientConfig;
2728
import org.apache.kafka.clients.consumer.ConsumerRecord;
2829

2930
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
@@ -35,6 +36,7 @@
3536
import org.springframework.kafka.core.ConsumerFactory;
3637
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
3738
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
39+
import org.springframework.kafka.core.KafkaAdmin;
3840
import org.springframework.kafka.core.KafkaTemplate;
3941
import org.springframework.kafka.core.ProducerFactory;
4042
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -83,10 +85,16 @@ public SampleTestRunnerConsumer yourCode() {
8385
.collect(Collectors.toList());
8486
SpanAssert.assertThat(producerSpans.get(0))
8587
.hasTag("spring.kafka.template.name", "template");
88+
assertThat(producerSpans.get(0).getRemoteServiceName())
89+
.startsWith("Apache Kafka: ")
90+
.doesNotEndWith("Kafka: ");
8691
SpanAssert.assertThat(producerSpans.get(1))
8792
.hasTag("spring.kafka.template.name", "template");
8893
SpanAssert.assertThat(consumerSpans.get(0))
8994
.hasTagWithKey("spring.kafka.listener.id");
95+
assertThat(consumerSpans.get(0).getRemoteServiceName())
96+
.startsWith("Apache Kafka: ")
97+
.doesNotEndWith("Kafka: ");
9098
assertThat(consumerSpans.get(0).getTags().get("spring.kafka.listener.id")).isIn("obs1-0", "obs2-0");
9199
SpanAssert.assertThat(consumerSpans.get(1))
92100
.hasTagWithKey("spring.kafka.listener.id");
@@ -128,6 +136,11 @@ ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
128136
return new DefaultKafkaConsumerFactory<>(consumerProps);
129137
}
130138

139+
@Bean
140+
KafkaAdmin admin(EmbeddedKafkaBroker broker) {
141+
return new KafkaAdmin(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()));
142+
}
143+
131144
@Bean
132145
KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> pf) {
133146
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);

‎spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4545
import org.springframework.kafka.core.KafkaTemplate;
4646
import org.springframework.kafka.core.ProducerFactory;
47+
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
48+
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
4749
import org.springframework.kafka.test.EmbeddedKafkaBroker;
4850
import org.springframework.kafka.test.context.EmbeddedKafka;
4951
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -195,8 +197,11 @@ KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> pf) {
195197
}
196198

197199
@Bean
198-
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(ConsumerFactory<Integer, String> cf) {
199-
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
200+
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
201+
ConsumerFactory<Integer, String> cf) {
202+
203+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
204+
new ConcurrentKafkaListenerContainerFactory<>();
200205
factory.setConsumerFactory(cf);
201206
factory.getContainerProperties().setObservationEnabled(true);
202207
return factory;

0 commit comments

Comments
 (0)
Please sign in to comment.