当前位置: 首页 > news >正文

腾讯mini项目-【指标监控服务重构】2023-08-26

今日已办

Venus 的 Trace 无感化

定义 handler 函数

  1. fiber.Handler 的主要处理逻辑
  2. 返回处理中出现的 error
  3. 返回处理中响应 json 的函数
// handler
// @Description:
// @Author xzx 2023-08-26 18:00:03
// @Param c
// @Return error
// @Return func() error : function for response json
type handler func(c *fiber.Ctx) (error, func() error)

定义TraceWrapper函数

  1. otel-trace 的逻辑嵌入 handler 中

  2. 启动 span

  3. 执行 handler,记录 span 的 attributes

  4. 根据返回的 err, jsonRespFunc 分情况讨论

  5. 条件处理逻辑
    err != nil && jsonRespFunc == nil存在错误,将错误记录到Span中,结束Span,执行c.Next()
    err != nil && jsonRespFunc != nil存在错误,将错误记录到Span中,结束Span,响应JSON
    err == nil && jsonRespFunc == nil结束Span,执行c.Next()
    err == nil && jsonRespFunc != nil结束Span,响应JSON
  6. 代码实现

// TraceWrapper return fiber.Handler integrate report otel-trace
// @Description integrate otel-trace logic in handler
// @Author xzx 2023-08-26 18:00:03
// @Param f
// @Param opts
// @Return fiber.Handler
func TraceWrapper(f handler, opts ...trace.SpanStartOption) fiber.Handler {return func(c *fiber.Ctx) error {_, span := otelclient.Tracer.Start(c.UserContext(), runtime.GetFunctionName(f), opts...)// execute handler logicerr, jsonRespFunc := f(c)// todo: setSpanAttributesif err != nil {// record span errorspan.RecordError(err)span.SetStatus(codes.Error, err.Error())span.End()if jsonRespFunc != nil {// response error resultreturn jsonRespFunc()}// ignore error, continue handlersreturn c.Next()}span.End()if jsonRespFunc != nil {// response success resultreturn jsonRespFunc()}// err == nil, jsonRespFunc == nilreturn c.Next()}
}

具体的 handler 逻辑

// SplitAndValidate split multi-events and validate uploaded data
// @Description
// @Author xzx 2023-08-26 17:54:21
// @Param c
// @Return Err
// @Return f
func SplitAndValidate(c *fiber.Ctx) (Err error, f func() error) {log.Logger().Debug("split and validate", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "all_upload"),)))RequestIdBaggage, err := baggage.NewMember("request_id", c.GetRespHeader(fiber.HeaderXRequestID))if err != nil {log.Logger().Error("Create Baggage Member Failed", zap.Error(err))}Baggage, err := baggage.New(RequestIdBaggage)if err != nil {log.Logger().Error("Create Baggage Failed", zap.Error(err))}c.SetUserContext(baggage.ContextWithBaggage(c.UserContext(), Baggage))c.Accepts(fiber.MIMEMultipartForm)uploadedData, err := parseJSON(c)if err != nil {log.Logger().Error("failed to parse json", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))c.Status(fiber.StatusBadRequest)return err, func() error {return c.JSON(protocol.Response{Code: protocol.AllFail,Data: err.Error(),})}}if err = uploadedData.Meta.Validate(); err != nil {log.Logger().Error("failed to validate meta", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "wrong_meta"),)))c.Status(fiber.StatusBadRequest)return err, func() error {return c.JSON(protocol.Response{Code: protocol.AllFail,Data: err.Error(),})}}// must use pointer when using Locals of fiber if it's about to modifyevents := make([]*schema.Event, 0, len(uploadedData.Data))parts := make([]*protocol.Part, 0, len(uploadedData.Data))for idx, data := range uploadedData.Data {log.Logger().Debug("split event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Int("idx", idx))event := &schema.Event{}part := &protocol.Part{}if err = data.Validate(); err != nil {Err = errotelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "wrong_data"),)))part.Code = protocol.ValidationErrorpart.Data = err.Error()}part.ID = data.IDevent.Meta = uploadedData.Metaevent.Data = dataevents = append(events, event)parts = append(parts, part)}c.Locals("meta", uploadedData.Meta)c.Locals("events", events)c.Locals("parts", parts)return Err, nil
}// HandleEvent handle event
// @Description
// @Author xzx 2023-08-26 17:54:23
// @Param c
// @Return Err
// @Return f
func HandleEvent(c *fiber.Ctx) (Err error, f func() error) {log.Logger().Debug("handle event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))events := c.Locals("events").([]*schema.Event)parts := c.Locals("parts").([]*protocol.Part)for idx, event := range events {log.Logger().Debug("handle event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Int("idx", idx))event.BackendID = uuid.NewString()event.UploadTime = time.Now().UnixMilli()part := parts[idx]part.BackendID = event.BackendIDcountry, region, city, err := ip.ParseIP(event.IP)if err != nil {Err = errlog.Logger().Warn("failed to parse ip", zap.Error(err), zap.String("client", c.IP()), zap.String("ip", event.IP), zap.String("agent", string(c.Context().UserAgent())))part.Code = protocol.IPParsingErrorpart.Data = err.Error()}log.Logger().Debug("parsed ip", zap.String("client", c.IP()), zap.String("ip", event.IP), zap.String("agent", string(c.Context().UserAgent())), zap.String("country", country), zap.String("region", region), zap.String("city", city))event.Country = countryevent.Region = regionevent.City = city}return Err, nil
}// WriteKafka write to kafka
// @Description
// @Author xzx 2023-08-26 17:54:26
// @Param c
// @Return Err
// @Return f
func WriteKafka(c *fiber.Ctx) (Err error, f func() error) {log.Logger().Debug("write kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))meta := c.Locals("meta").(schema.Meta)events := c.Locals("events").([]*schema.Event)parts := c.Locals("parts").([]*protocol.Part)// mark if all parts are succeed, which is response for codeisAllSuccess := true// topic is like to_analyzer__0.PERF_CRASHtopic := fmt.Sprintf("to_analyzer__0.%s", meta.Category)traceparent := c.Get(traceparentHeaderKey)if len(traceparent) == 55 {spanId := trace.SpanFromContext(c.UserContext()).SpanContext().SpanID().String()traceparent = traceparent[:36] + spanId + traceparent[52:]}messages := make([]kafka.Message, 0, len(events))for idx, event := range events {// skip if event was failedif parts[idx].Code != 0 {isAllSuccess = falsecontinue}bytes, err := sonic.Marshal(event)if err != nil {Err = errlog.Logger().Error("failed to marshal event", zap.Error(err), zap.Any("event", event), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))parts[idx].Code = protocol.SerializationErrorparts[idx].Data = err.Error()}messages = append(messages, kafka.Message{Topic: topic,Value: bytes,Headers: []kafka.Header{{Key: traceparentHeaderKey, Value: []byte(traceparent)},},})}if len(messages) == 0 { // would not write to kafka since every part were failed for some reasonlog.Logger().Warn("every data were failed to handle, would not write to kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))c.Status(fiber.StatusBadRequest)return errors.New("every data were failed to handle, check their code and data"), func() error {return c.JSON(protocol.Response{Code:  protocol.AllFail,Data:  "every data were failed to handle, check their code and data",Parts: parts,})}}log.Logger().Info("would write to kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))kafkaProducer := connector.GetEventKafka()if err := kafkaProducer.WriteMessages(context.Background(), messages...); err != nil {log.Logger().Error("failed to write to kafka", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Any("messages", messages))c.Status(fiber.StatusInternalServerError)return err, func() error {return c.JSON(protocol.Response{Code:  protocol.AllFail,Data:  fmt.Sprintf("failed to write to kafka: %s", err.Error()),Parts: parts,})}}if isAllSuccess {c.Status(fiber.StatusOK)otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(attribute.String("type", "success_upload"))))return nil, func() error {return c.JSON(protocol.Response{Code:  protocol.AllSuccess,Parts: parts,})}} else {c.Status(fiber.StatusPartialContent)return Err, func() error {return c.JSON(protocol.Response{Code:  protocol.PartialSuccess,Data:  "some data were failed to handle, check their code and data",Parts: parts,})}}
}

同步会议

进度

  1. 完成了 venus、profile 逻辑的 otel-trace 接入 handler 无感化,提高代码可扩展性
  2. otel上报的开关,已经 review 完合并了
  3. 部署了jaeger的整套方案
  4. 关于 watermill 和 baserunner 的 banchmark

Review和测试方案

  1. 代码 review
    1. fiber.Handler 接入 otel 无感化,代码可扩展性
    2. 修复部分命名规范、注释规范和代码质量检查中指出的问题
    3. 移除 profile-compose 的 grafana 容器
    4. log 的初始化
    5. 移除 venus-compose 的无用配置
  2. 测试方案
    1. 接入 ck 集群,benchmark 进行 otel-sdk 上报,压测 otel-collector 和 ck 集群
      1. 【卡点】对 collector 压测暂时对 ck 没有造成很大压力
    2. 部署了 jaeger 的整套方案,使用 es 存储 trace、Prometheus 存储 metrics
      1. 【差异】jaeger 的 trace 可视化出来,使用 span_references - followsform 会展示为父子关系的 span
      2. es 已购买集群,等待接入
      3. 【方案】对 jaeger 进行压测,找到 jaeger 出现问题的 qps,要该 qps 来测试 a、b两组的方案
      4. 【方法】在压测的过程用 pprof 来抓取 venus 和 profile 的 cpu、内存的使用情况
    3. 对于 ck 集群的指标
      1. 【插入】ck 的 写入耗时,1/5分钟的写入成功率
      2. 【查询】ck 的 查询耗时,查询成功率
      3. 【资源利用率】ck 的 cpu、内存利用率
    4. 【问题】如何证明我们在监控服务差异、优劣方面的断言,具体的测试流程、测试对象、测试指标对比等

总结

  1. 对比上报相同的 metrics 测试,收集器(jaeger、otel)collector(容器)的差异,展示otel的优势,cpu、内存的,监控 collector 的差异,cadvisor
  2. 相同上报比如 trace,对比 es、ck 的存储大小对比,kibana
  3. 简单拓展说明:otel-log,故障的trace的地方,具体问题的原因
  4. 查询性能方面:性能测试-响应耗时,用 signoz,jaeger 的 web-url 来压测-主要使用Web,控制变量(同一个trace和span,清空缓存,主要 tcp 压测),
  5. 扩展:watermill/baserunner 的对比,高并发场景下比较优秀
  6. 扩展:benchmark 的 hyperscan 和官方正则处理的对比

数据记录、控制变量

cadvisor

image-20230826231631007

services:cadvisor:image: gcr.io/cadvisor/cadvisor:latestcontainer_name: cadvisornetworks:- backendcommand: --url_base_prefix=/cadvisorvolumes:- /:/rootfs:ro- /var/run:/var/run:rw- /sys:/sys:ro- /var/lib/docker/:/var/lib/docker:ro- /dev/disk/:/dev/disk:ro

jaeger-all-in-one 分开部署

为了收集 jaeger-collector 的指标,把 jaeger-all-in-one 分开部署

services:jaeger-collector:image: jaegertracing/jaeger-collectorcontainer_name: jaeger-collectornetworks:- backendcommand: ["--es.server-urls=http://elasticsearch:9200","--es.num-shards=1","--es.num-replicas=0","--log-level=info"]environment:- SPAN_STORAGE_TYPE=elasticsearch- METRICS_STORAGE_TYPE=prometheus- PROMETHEUS_SERVER_URL=http://prometheus:9090depends_on:- elasticsearchjaeger-query:image: jaegertracing/jaeger-querycontainer_name: jaeger-querynetworks:- backendcommand: ["--es.server-urls=http://elasticsearch:9200","--span-storage.type=elasticsearch","--log-level=info"]environment:- SPAN_STORAGE_TYPE=elasticsearch- METRICS_STORAGE_TYPE=prometheus- PROMETHEUS_SERVER_URL=http://prometheus:9090- no_proxy=localhost- QUERY_BASE_PATH=/jaegerdepends_on:- jaeger-agentjaeger-agent:image: jaegertracing/jaeger-agentcontainer_name: jaeger-agentnetworks:- backendcommand: [ "--reporter.grpc.host-port=jaeger-collector:14250" ]environment:- SPAN_STORAGE_TYPE=elasticsearch- METRICS_STORAGE_TYPE=prometheus- PROMETHEUS_SERVER_URL=http://prometheus:9090depends_on:- jaeger-collector

明日待办

  1. 压测
http://www.lryc.cn/news/175909.html

相关文章:

  • 《Essential C++》之(面向过程泛型编程)
  • 机器学习笔记:adaBoost
  • Anchor DETR
  • 适合在家做的副业 整理5个,有电脑就行
  • Android WebSocket
  • Android 按键流程
  • C语言——运算符
  • MySQL数据库入门到精通8--进阶篇( MySQL管理)
  • 硬件基本功--MOS管
  • xdebug3开启profile和trace
  • EfficientFormer:高效低延迟的Vision Transformers
  • 【咕咕送书第二期】| 计算机网络对于考研的重要性?
  • 【力扣】58. 最后一个单词的长度
  • Java编程的精髓:深入理解JVM和性能优化
  • 易云维®智慧工厂数字化管理平台助推工业制造企业数字化转型新动能
  • 0.基本概念——数据结构学习
  • Redis可视化工具-Another Redis Desktop Manager 安装
  • ETLCloud工具让美团数据管理更简单
  • ctfshow 命令执行 (29-39)
  • 如何玩转CSDN AI工具集
  • 软件测试/测试开发丨利用人工智能ChatGPT自动生成PPT
  • Vue 正计时器组件
  • 神仙打架!谷歌和OpenAI竞相推出多模式AI
  • MySQL 字符集
  • java生成PDF的Util
  • 【openwrt学习笔记】新patch的制作和旧patch的修改
  • 【GIT】Git clone https://xxx.git 报错仓库找不到,ssh却可以。
  • Vue系列(三)之 基础语法下篇【事件处理,表单综合案例,组件通信】
  • AI 编码助手 Codewhisperer 安装步骤和使用初体验
  • Python操作Elasticsearch