zipkin微服务调用链分析

时间:2023-03-10 02:32:40
zipkin微服务调用链分析

1.zipkin的作用

在微服务架构下,一个http请求从发出到响应,中间可能经过了N多服务的调用,或者N多逻辑操作,

如何监控某个服务,或者某个逻辑操作的执行情况,对分析耗时操作,性能瓶颈具有很大价值,

zipkin帮助我们实现了这一监控功能。

2.启动zipkin

下载可执行文件: https://zipkin.io/quickstart.sh | bash -s

java -jar zipkin.jar

运行结果:

zipkin微服务调用链分析

zipkin监听9411端口,通过浏览器查看:

zipkin微服务调用链分析

3.python中zipkin的实现模块py_zipkin

创建flask项目,新建demo.py

import requests
from flask import Flask
from py_zipkin.zipkin import zipkin_span,create_http_headers_for_new_span
import time app = Flask(__name__) app.config.update({
"ZIPKIN_HOST":"127.0.0.1",
"ZIPKIN_PORT":"9411",
"APP_PORT":5000,
# any other app config-y things
}) def do_stuff():
time.sleep(2)
headers = create_http_headers_for_new_span()
requests.get('http://localhost:6000/service1/', headers=headers)
return 'OK' def http_transport(encoded_span):
# encoding prefix explained in https://github.com/Yelp/py_zipkin#transport
#body = b"\x0c\x00\x00\x00\x01"+encoded_span
body=encoded_span
zipkin_url="http://127.0.0.1:9411/api/v1/spans"
#zipkin_url = "http://{host}:{port}/api/v1/spans".format(
# host=app.config["ZIPKIN_HOST"], port=app.config["ZIPKIN_PORT"])
headers = {"Content-Type": "application/x-thrift"} # You'd probably want to wrap this in a try/except in case POSTing fails
r=requests.post(zipkin_url, data=body, headers=headers)
print(type(encoded_span))
print(encoded_span)
print(body)
print(r)
print(r.content) @app.route('/')
def index():
with zipkin_span(
service_name='webapp',
span_name='index',
transport_handler=http_transport,
port=5000,
sample_rate=100, #0.05, # Value between 0.0 and 100.0
):
with zipkin_span(service_name='webapp', span_name='do_stuff'):
do_stuff()
time.sleep(1)
return 'OK', 200 if __name__=='__main__':
app.run(host="0.0.0.0",port=5000,debug=True)

新建server1.py

from flask import request
import requests
from flask import Flask
from py_zipkin.zipkin import zipkin_span,ZipkinAttrs
import time import MySQLdb app = Flask(__name__)
app.config.update({
"ZIPKIN_HOST":"127.0.0.1",
"ZIPKIN_PORT":"9411",
"APP_PORT":5000,
# any other app config-y things
}) def do_stuff():
time.sleep(2)
with zipkin_span(service_name='service1', span_name='service1_db_search'):
db_search()
return 'OK' def db_search():
# 打开数据库连接
db = MySQLdb.connect("127.0.0.1", "username", "psw", "db", charset='utf8')
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# 使用execute方法执行SQL语句
cursor.execute("SELECT VERSION()")
# 使用 fetchone() 方法获取一条数据
data = cursor.fetchone()
print("Database version : %s " % data)
# 关闭数据库连接
db.close() def http_transport(encoded_span):
# encoding prefix explained in https://github.com/Yelp/py_zipkin#transport
#body = b"\x0c\x00\x00\x00\x01" + encoded_span
body=encoded_span
zipkin_url="http://127.0.0.1:9411/api/v1/spans"
#zipkin_url = "http://{host}:{port}/api/v1/spans".format(
# host=app.config["ZIPKIN_HOST"], port=app.config["ZIPKIN_PORT"])
headers = {"Content-Type": "application/x-thrift"} # You'd probably want to wrap this in a try/except in case POSTing fails
requests.post(zipkin_url, data=body, headers=headers) @app.route('/service1/')
def index():
with zipkin_span(
service_name='service1',
zipkin_attrs=ZipkinAttrs(
trace_id=request.headers['X-B3-TraceID'],
span_id=request.headers['X-B3-SpanID'],
parent_span_id=request.headers['X-B3-ParentSpanID'],
flags=request.headers['X-B3-Flags'],
is_sampled=request.headers['X-B3-Sampled'],
),
span_name='index_service1',
transport_handler=http_transport,
port=6000,
sample_rate=100, #0.05, # Value between 0.0 and 100.0
):
with zipkin_span(service_name='service1', span_name='service1_do_stuff'):
do_stuff()
return 'OK', 200 if __name__=='__main__':
app.run(host="0.0.0.0",port=6000,debug=True)

运行demo.py

zipkin微服务调用链分析

运行server1.py

zipkin微服务调用链分析

访问5000端口

zipkin微服务调用链分析

查看调用链:

zipkin微服务调用链分析

zipkin微服务调用链分析

可以看到,有webapp和services两个service,5个span标签,可以清楚看到service和service,service和span,span和span之间的关系,和各span耗时情况。

4.py_zipkin代码分析

上例中我们主要使用了py_zipkin的两个对象,zipkin_span和create_http_headers_for_new_span

1)zipkin_span

with zipkin_span(
service_name='webapp',
span_name='index',
transport_handler=http_transport,
port=5000,
sample_rate=100, #0.05, # Value between 0.0 and 100.0
):

service_name:服务名

span_name:标签名,用来标志服务里面的某个操作

transport_handler:处理函数,post数据到zipkin

port:服务端口号

sample_rate:待研究

需要注意的是,在一个服务里面,只有root-span需要定义transport_handler,port等参数,非root-span只有service_name是必须的,其他参数继承root-span

with zipkin_span(service_name='webapp', span_name='do_stuff'):
do_stuff()

zip_span还可以是用装饰器的方式,来包裹对应的操作

@zipkin_span(service_name='webapp', span_name='do_stuff')
def do_stuff():
time.sleep(2)
headers = create_http_headers_for_new_span()
requests.get('http://localhost:6000/service1/', headers=headers)
return 'OK'

但是,我这边试了装饰器的方式,不起作用

transport_handler的定义如下:

def http_transport(encoded_span):
# encoding prefix explained in https://github.com/Yelp/py_zipkin#transport
#body = b"\x0c\x00\x00\x00\x01"+encoded_span
body=encoded_span
zipkin_url="http://127.0.0.1:9411/api/v1/spans"
#zipkin_url = "http://{host}:{port}/api/v1/spans".format(
# host=app.config["ZIPKIN_HOST"], port=app.config["ZIPKIN_PORT"])
headers = {"Content-Type": "application/x-thrift"} # You'd probably want to wrap this in a try/except in case POSTing fails
r=requests.post(zipkin_url, data=body, headers=headers)

zipkin会把编码后的span,通过接口post到zipkin,我们通过浏览器就可以看到调用链详情了。

2)create_http_headers_for_new_span

跳到一个新的服务时,通过create_http_headers_for_new_span生成新的span信息,包括trace_id,span_id,parent_span_id等,服务之间就做好关联了

with zipkin_span(
service_name='service1',
zipkin_attrs=ZipkinAttrs(
trace_id=request.headers['X-B3-TraceID'],
span_id=request.headers['X-B3-SpanID'],
parent_span_id=request.headers['X-B3-ParentSpanID'],
flags=request.headers['X-B3-Flags'],
is_sampled=request.headers['X-B3-Sampled'],
),
span_name='index_service1',
transport_handler=http_transport,
port=6000,
sample_rate=100, #0.05, # Value between 0.0 and 100.0
):