源码角度了解Skywalking之服务端OAP对Trace的处理

时间:2022-10-11 19:14:51

源码角度了解Skywalking之服务端OAP对Trace的处理

从前几篇的文章我们知道Skywalking对Trace信息进行生成收集后,将TraceSegment对象转换为UpstreamSegment对象,通过GRPC发送给OAP服务端,服务端处理对应的模块是skywalking-trace-receiver-plugin模块

TraceModuleProvider向GRPCHandlerRegister中添加处理器TraceSegmentReportServiceHandler

接收Agent数据

TraceSegmentReportServiceHandler的collect()方法接收Agent的数据,调用SegmentParseV2.Producer的send()方法发送

SegmentParseV2.Producer的send()方法:

public void send(UpstreamSegment segment, SegmentSource source) {
    SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config);
    segmentParse.setStandardizationWorker(standardizationWorker);
    segmentParse.parse(new BufferData<>(segment), source);
}
  1. 创建SegmentParseV2解析器
  2. 调用SegmentParseV2的parse()方法

SegmentParseV2的parse()方法:

源码角度了解Skywalking之服务端OAP对Trace的处理

  1. 创建SpanListener集合
  2. 获取UpstreamSegment对象
  3. 获取UpstreamSegment对象中关联的所有的TraceID
  4. 如果UpstreamSegment中的SegmentObject实例为空,就解析UpstreamSegment实例得到SegmentObject对象进行填充
  5. 重新检查段信息是否来自文件缓冲区,如果缓存中不存这个段信息对应的服务实例Id,然后返回true
  6. 把SegmentObject对象封装成SegmentDecorator对象,这里是装饰者模式的体现
  7. 调用preBuild()方法进行预构建操作,预构建不成功写入缓存文件中,构建成功会通知具体的监听器来进行构建

预构建

SegmentParseV2的preBuild()方法:

源码角度了解Skywalking之服务端OAP对Trace的处理

  1. 构建SegmentCoreInfo对象中的segmentId
  2. 调用notifyGlobalsListener()方法通知这个TraceSegment所关联的TraceId对应的监听器进行解析TraceId,需要采样的进行采样
  3. 将Segment信息填充到SegmentCoreInfo对象中
  4. 遍历TraceSegment的所有span,如果是TraceSegment的第一个span,调用notifyFirstListener()方法解析第一个span,将SegmentCoreInfo对象的属性添加到Segment对象中,记录firstEndpointId和firstEndpointName,其实就是对应的请求URL,根据Span类型通知不同的监听类

通知监听器者构建

这个方法遍历所有的span调用SegmentSpanListener的build()方法,设置Segment信息的端点id和端点名后调用SourceReceiverImpl的receive()方法,最终调用SegmentDispatcher的dispatch()方法

SegmentDispatcher的dispatch()方法:

@Override public void dispatch(Segment source) {
    SegmentRecord segment = new SegmentRecord();
    segment.setSegmentId(source.getSegmentId());
    segment.setTraceId(source.getTraceId());
    segment.setServiceId(source.getServiceId());
    segment.setServiceInstanceId(source.getServiceInstanceId());
    segment.setEndpointName(source.getEndpointName());
    segment.setEndpointId(source.getEndpointId());
    segment.setStartTime(source.getStartTime());
    segment.setEndTime(source.getEndTime());
    segment.setLatency(source.getLatency());
    segment.setIsError(source.getIsError());
    segment.setDataBinary(source.getDataBinary());
    segment.setTimeBucket(source.getTimeBucket());
    segment.setVersion(source.getVersion());

    RecordStreamProcessor.getInstance().in(segment);
}

组装SegmentRecord对象,通过RecordStreamProcessor创建实例,in()方法中调用RecordPersistentWorker来批量异步插入ES数据库中。

总结

这篇文章主要讲解了Skywalking的OAP接收到Agent发来的Trace信息的处理逻辑,入口是TraceSegmentReportServiceHandler的collect()方法,会对Agent封装的UpstreamSegment对象进行反序列化,构建Segment、Span等信息,最终由RecordStreamProcessor来批量异步把SegmentRecord写入ES数据库中

❤️ 感谢大家

如果你觉得这篇内容对你挺有有帮助的话:

  1. 欢迎关注我❤️,点赞????????,评论????,转发????
  2. 有不当之处欢迎批评指正。