当前位置: 首页 > news >正文

Skywalking Kafka Tracing实现

背景

Skywalking默认场景下,Tracing对于消息队列的发送场景,无法将TraceId传递到下游消费者,但对于微服务场景下,是有大量消息队列的业务场景的,这显然无法满足业务预期。

解决方案

Skywalking的官方社区中,有用户提出了该场景问题,Skywalking在补充工具包中,提供了对Kafka的tracing支持。

skywalking kafka problem

代码实现:

<dependency><groupId>org.apache.skywalking</groupId><artifactId>apm-toolkit-kafka</artifactId><version>${skywalking.version}</version></dependency>

对于该工具包,默认情况下,是针对KafkaTemplate进行trace,即如果使用KafkaTemplate发送消息,代码层面无需做任何改动。

如果没有使用KafkaTemplate的场景,toolkit也提供的了注解的支持:

public class ConsumerThread2 extends Thread {@Overridepublic void run() {Properties consumerProperties = new Properties();//...consumerProperties.put()KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);consumer.subscribe(topicPattern, new NoOpConsumerRebalanceListener());while (true) {if (pollAndInvoke(consumer)) break;}consumer.close();}@KafkaPollAndInvokeprivate boolean pollAndInvoke(KafkaConsumer<String, String> consumer) {try {Thread.sleep(1000);} catch (InterruptedException e) {}ConsumerRecords<String, String> records = consumer.poll(100);if (!records.isEmpty()) {OkHttpClient client = new OkHttpClient.Builder().build();Request request = new Request.Builder().url("http://localhost:8080/kafka-scenario/case/kafka-thread2-ping").build();Response response = null;try {response = client.newCall(request).execute();} catch (IOException e) {}response.body().close();return true;}return false;}
}

异步线程Tracing

对于Kafka消息的发送,经常会配合异步线程池的场景使用,Tracing的基本原理是基于ThreadLocal进行实现的,那么对于异步场景,是会丢失TraceId,通常的解决方式,是需要手动将主线程的TraceId手动赋值给子线程,但这种方式需要手动代码侵入,并不友好。

幸运的是,Skywalking的toolkit中提供了对于异步线程tracing的支持。

<dependency><groupId>org.apache.skywalking</groupId><artifactId>apm-toolkit-trace</artifactId><version>${skywalking.version}</version>
</dependency>

推荐用法:

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(RunnableWrapper.of(new Runnable() {@Override public void run() {//your code}
}));

或者:

 @TraceCrossThreadpublic static class MyCallable<String> implements Callable<String> {@Overridepublic String call() throws Exception {return null;}}
...ExecutorService executorService = Executors.newFixedThreadPool(1);executorService.submit(new MyCallable());

PS:事实上,RunnableWrapper也是基于@TraceCrossThread实现。

相关文档:
https://skywalking.apache.org/docs/skywalking-java/v8.16.0/en/setup/service-agent/java-agent/application-toolkit-kafka/

https://skyapm.github.io/document-cn-translation-of-skywalking/zh/6.1.0/setup/service-agent/java-agent/Application-toolkit-trace-cross-thread.html

https://blog.51cto.com/knifeedge/5268667

https://blog.csdn.net/lijunwyf/article/details/107954543

http://www.lryc.cn/news/136683.html

相关文章:

  • Perl 解析字符串为日期对象并获取多天前的日期字符串
  • C语言问题 - 关于一维数组和二维数组用*a+i形式表达
  • 验证码识别DLL ,滑块识别SDK,OCR图片转文字,机器视觉找物品
  • 【图论】最小生成树的应用
  • C++类模板的特化(三)
  • 基于YOLOV8模型的课堂场景下人脸目标检测系统(PyTorch+Pyside6+YOLOv8模型)
  • java八股文面试[数据结构]——Map有哪些子类
  • 司徒理财:8.23今日黄金原油走势分析附操作策略
  • 使用动态IP是否会影响网络
  • Linux学习笔记-常用指令说明
  • MyBatisPlus进阶版
  • 安防视频云平台EasyNVR视频汇聚平台硬件无法进入服务器的问题处理方法
  • 流媒体内容分发终极解决方案:当融合CDN与P2P视频交付结合
  • 根据源码,模拟实现 RabbitMQ - 内存数据管理(4)
  • Apache Flume架构和原理
  • 代码随想录算法训练营day38 | LeetCode 509. 斐波那契数 70. 爬楼梯 746. 使用最小花费爬楼梯
  • Linux基本指令【下】
  • 向量检索:基于ResNet预训练模型构建以图搜图系统
  • SpringBoot 响应头添加版本号、打包项目后缀添加版本号和时间
  • 优化指南:带宽限制的可行策略
  • 计算机提示mfc120u.dll缺失(找不到)怎么解决
  • Java基于SpringBoot+Vue实现酒店客房管理系统(2.0 版本)
  • 微服务架构2.0--云原生时代
  • C++day2作业(2023.8.22)
  • 在 Spring Boot 中使用 OpenAI ChatGPT API
  • 【leetcode】225.用队列实现栈
  • 机器学习中XGBoost算法调参技巧
  • 第1章:计算机网络体系结构
  • 【Java 动态数据统计图】动态数据统计思路Demo(动态,排序,containsKey)三(115)
  • 【游戏评测】河洛群侠传一周目玩后感