Today I learned OpenTelemetry with SmallRye reactive messaging and KafkaStream
Finally my application is on production : several micro-services using event-driven architecture where several producers can trigger same behaviour.
Suddenly a bug! Some triggers are weird, but who is triggering them? Which micro-services? Impossible to tell...so I put logs in the code trying to point the issue, but it is hard to isolate the root cause in a system that produces thousands of message per second.
Then one of my coworkers mention OpenTelemetry and Jaeger...
So I look a bit, and to make it simple, I understand that OpenTelemetry track a call by using headers and Jaeger is the tool to visualise them.
Seems like it is the perfect solution!
But to every perfect solution comes some unexpected issues...
The stack
The services are Quarkus apps where some use :
- SmallRye Reactive Messaging annotations Incoming / Outgoing
- Apache KafkaStream
SmallRye Reactive Messaging
To use OpenTelemetry with SmallRye Reactive Messaging, you can follow Quarkus guide here I will summarise it as what is interesting for us is at the beginning and the end of the guide.
I have to add those dependencies in the pom:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-opentelemetry-exporter-jaeger</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-opentelemetry</artifactId>
</dependency>
And configure my exporter in applications.properties
quarkus.application.name=kafka-opentelemetry
quarkus.opentelemetry.enabled=true
quarkus.opentelemetry.tracer.exporter.otlp.endpoint=http://localhost:4317
To start the jaeger locally, I followed this chapter . I add to fix the port setup to make it works:
jaeger-all-in-one:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "14268:14268"
- "14250:14250"
So if you have a method that only consume or produce data, then the lib does the job. When you have a method that consume and produce data, then the trouble starts.
To show you the issue, I have done this example:
produceData produces a "SmallRye Hello X" messages every second (produceData) on channel "demo-kafka-tracing-string-out"
toUpperCase consume from this channel, transform the String to uppercase and send the new String to channel "demo-kafka-tracing-uppercase-out"
- sortString will consume from this channel, sort the String and send the sorted value to channel "demo-kafka-tracing-sort-out"
- displayMessage will consume from last channel and log the value.
@ApplicationScoped
public class KafkaTracingService {
public static final Logger LOGGER = Logger.getLogger(KafkaTracingService.class);
@Outgoing("demo-kafka-tracing-string-out")
public Multi<Message<String>> produceData() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1)).map(tick -> Message.of("SmallRye Hello " + tick));
}
@Incoming("demo-kafka-tracing-uppercase-in")
@Outgoing("demo-kafka-tracing-uppercase-out")
public Message<String> toUpperCase(ConsumerRecord<String, String> message) {
return Message.of(message.value().toUpperCase(Locale.ROOT));
}
@Incoming("demo-kafka-tracing-sort-in")
@Outgoing("demo-kafka-tracing-sort-out")
public Message<String> sortString(ConsumerRecord<String, String> message) {
var tempArray = message.value().toCharArray();
Arrays.sort(tempArray);
return Message.of(new String(tempArray));
}
@Incoming("demo-kafka-tracing-display-in")
public void displayMessage(ConsumerRecord<String, String> message) {
LOGGER.infov("Message received {0}", message.value());
}
}
And the properties for that:
mp.messaging.outgoing.demo-kafka-tracing-string-out.connector=smallrye-kafka
mp.messaging.outgoing.demo-kafka-tracing-uppercase-out.connector=smallrye-kafka
mp.messaging.outgoing.demo-kafka-tracing-sort-out.connector=smallrye-kafka
mp.messaging.incoming.demo-kafka-tracing-uppercase-in.connector=smallrye-kafka
mp.messaging.incoming.demo-kafka-tracing-sort-in.connector=smallrye-kafka
mp.messaging.incoming.demo-kafka-tracing-display-in.connector=smallrye-kafka
mp.messaging.outgoing.demo-kafka-tracing-string-out.topic=string-out
mp.messaging.outgoing.demo-kafka-tracing-uppercase-out.topic=uppercase-out
mp.messaging.outgoing.demo-kafka-tracing-sort-out.topic=sort-out
mp.messaging.incoming.demo-kafka-tracing-uppercase-in.topic=string-out
mp.messaging.incoming.demo-kafka-tracing-sort-in.topic=uppercase-out
mp.messaging.incoming.demo-kafka-tracing-display-in.topic=sort-out
mp.messaging.outgoing.demo-kafka-tracing-string-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.demo-kafka-tracing-uppercase-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.demo-kafka-tracing-sort-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.incoming.demo-kafka-tracing-uppercase-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.demo-kafka-tracing-sort-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.demo-kafka-tracing-display-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
When the application is running, the trace looks like that:
If we open one string-out for example, we only see two span
So the guide says to add:
Metadata.of(TracingMetadata.withPrevious(Context.current()));
I tried, but it does not fix our issue. The correct thing to add to our Consumer/Producer is :
TracingMetadata.withCurrent(Context.current())
Now the code is:
@Outgoing("demo-kafka-tracing-string-out")
public Multi<Message<String>> produceData() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1)).map(tick -> Message.of("SmallRye Hello " + tick));
}
@Incoming("demo-kafka-tracing-uppercase-in")
@Outgoing("demo-kafka-tracing-uppercase-out")
public Message<String> toUpperCase(ConsumerRecord<String, String> message) {
return Message.of(message.value().toUpperCase(Locale.ROOT)).addMetadata(TracingMetadata.withCurrent(
Context.current()));
}
@Incoming("demo-kafka-tracing-sort-in")
@Outgoing("demo-kafka-tracing-sort-out")
public Message<String> sortString(ConsumerRecord<String, String> message) {
var tempArray = message.value().toCharArray();
Arrays.sort(tempArray);
return Message.of(new String(tempArray)).addMetadata(TracingMetadata.withCurrent(
Context.current()));
}
@Incoming("demo-kafka-tracing-display-in")
public void displayMessage(ConsumerRecord<String, String> message) {
LOGGER.infov("Message received {0}", message.value());
}
And if I run the application again, then I can see this trace:
And if i select one string-out, i will see the whole chain.
KafkaStream
For KafkaStream, there is no solution. I had to create my own implementation to make it work. I merged the TracingKafkaClientSupplier from opentracing-kafka-stream with KafkaSource / KafkaSink from smallrye-reactive-messaging-kafka.
Now I have my own KafkaClientSupplier that is used by KafkaStream to create Consumer and Producer.
So this is my KafkaClientSupplier. I decided to extends DefaultKafkaClientSupplier instead of duplicate it.
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
public class KafkaStreamClientSupplier extends DefaultKafkaClientSupplier {
@Override
public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
return new TracingKafkaProducer<>(super.getProducer(config));
}
@Override
public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
return new TracingKafkaConsumer<>(super.getConsumer(config));
}
}
Now I have my two new classes:
- TracingKafkaProducer
- TracingKafkaConsumer
TracingKafkaProducer
TracingKafkaProducer is a copy of the same class in opentracing-kafka-client. I have changed the send method to add the tracing part using OpenTelemetry:
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
createOutgoingTrace(record);
return producer.send(record, callback);
}
private void createOutgoingTrace(ProducerRecord<K, V> message) {
TracingMetadata tracingMetadata =TracingMetadata.empty();
if (message.headers() != null) {
// Read tracing headers
Context context = GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
.extract(Context.current(), message.headers(), HeaderExtractAdapter.GETTER);
tracingMetadata = TracingMetadata.withCurrent(context);
}
final SpanBuilder spanBuilder = TRACER.spanBuilder(message.topic() + " send")
.setSpanKind(SpanKind.PRODUCER);
if (tracingMetadata.getCurrentContext() != null) {
// Handle possible parent span
final Context parentSpanContext = tracingMetadata.getCurrentContext();
if (parentSpanContext != null) {
spanBuilder.setParent(parentSpanContext);
} else {
spanBuilder.setNoParent();
}
} else {
spanBuilder.setNoParent();
}
final Span span = spanBuilder.startSpan();
Scope scope = span.makeCurrent();
// Set Span attributes
if (message.partition() != null) {
span.setAttribute(SemanticAttributes.MESSAGING_KAFKA_PARTITION, message.partition());
}
span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "kafka");
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, message.topic());
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic");
// Set span onto headers
GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
.inject(Context.current(), message.headers(), HeaderInjectAdapter.SETTER);
span.end();
scope.close();
}
TracingKafkaConsumer
TracingKafkaProducer is a copy of the same class in opentracing-kafka-client. I have changed the poll method to add the tracing part using OpenTelemetry:
@Override
public ConsumerRecords<K, V> poll(Duration duration) {
ConsumerRecords<K, V> records = consumer.poll(duration);
for (ConsumerRecord<K, V> record : records) {
incomingTrace(record);
}
return records;
}
public void incomingTrace(ConsumerRecord<K, V> kafkaRecord) {
if (TRACER != null) {
TracingMetadata tracingMetadata =TracingMetadata.empty();
if (kafkaRecord.headers() != null) {
// Read tracing headers
Context context = GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
.extract(Context.root(), kafkaRecord.headers(), HeaderExtractAdapter.GETTER);
tracingMetadata = TracingMetadata.withPrevious(context);
}
final SpanBuilder spanBuilder = TRACER.spanBuilder(kafkaRecord.topic() + " receive")
.setSpanKind(SpanKind.CONSUMER);
// Handle possible parent span
final Context parentSpanContext = tracingMetadata.getPreviousContext();
if (parentSpanContext != null) {
spanBuilder.setParent(parentSpanContext);
} else {
spanBuilder.setNoParent();
}
final Span span = spanBuilder.startSpan();
// Set Span attributes
span.setAttribute(SemanticAttributes.MESSAGING_KAFKA_PARTITION, kafkaRecord.partition());
span.setAttribute("offset", kafkaRecord.offset());
span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "kafka");
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, kafkaRecord.topic());
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic");
final String groupId = consumer.groupMetadata().groupId();
final String clientId = consumer.groupMetadata().groupInstanceId().orElse("");
span.setAttribute("messaging.consumer_id", constructConsumerId(groupId, clientId));
span.setAttribute(SemanticAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, groupId);
if (!clientId.isEmpty()) {
span.setAttribute(SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID, clientId);
}
span.makeCurrent();
// Set span onto headers
GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
.inject(Context.current(), kafkaRecord.headers(), HeaderInjectAdapter.SETTER);
span.end();
}
}
private String constructConsumerId(String groupId, String clientId) {
String consumerId = groupId;
if (!clientId.isEmpty()) {
consumerId += " - " + clientId;
}
return consumerId;
}
Usage
To inject it in our Quarkus apps, I just have to create a Produces bean:
@Produces
public KafkaClientSupplier kafkaClientSupplier() {
return new KafkaStreamClientSupplier();
}
Example
For the demo, I have a KafkaStream that reproduce the workflow of the example I have in SmallRye part.
@ApplicationScoped
public class KafkaStreamService {
public static final Logger LOGGER = Logger.getLogger(KafkaStreamService.class);
@Outgoing("demo-kafka-stream-string-out")
public Multi<Message<String>> produceData() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(tick -> Message.of("Stream Hello " + tick));
}
@Produces
public Topology buildTopology() {
var builder = new StreamsBuilder();
builder.stream(
"stream-string-out",
Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(object -> object.toUpperCase(Locale.ROOT))
.to("stream-uppercase-out",
Produced.with(Serdes.String(), Serdes.String()));
builder.stream(
"stream-uppercase-out",
Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(object -> {
var tempArray = object.toCharArray();
Arrays.sort(tempArray);
return new String(tempArray);
})
.to("stream-sort-out",
Produced.with(Serdes.String(), Serdes.String()));
builder.stream(
"stream-sort-out",
Consumed.with(Serdes.String(), Serdes.String()))
.process(() -> new Processor<String, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(String s, String s2) {
LOGGER.infov("Message received from stream {0}", s2);
}
@Override
public void close() {
}
});
return builder.build();
}
@Produces
public KafkaClientSupplier kafkaClientSupplier() {
return new KafkaStreamClientSupplier();
}
}
And if I run my app, I will see the trace: And the chaining:
Conclusion
It was not a simple journey, debugging inside of OpenTelemetry lib, to understand OpenTracing lib also and learning about Metadata but at the end, we have a nice tracing solution and now, I can finally go back to investigate my bug !!! You can find the whole demo project on my github