分布式系统监控系统zipkin入门
zipkin為分布式鏈路調(diào)用監(jiān)控系統(tǒng),聚合各業(yè)務(wù)系統(tǒng)調(diào)用延遲數(shù)據(jù),達(dá)到鏈路調(diào)用監(jiān)控跟蹤。
?
architecture
如圖,在復(fù)雜的調(diào)用鏈路中假設(shè)存在一條調(diào)用鏈路響應(yīng)緩慢,如何定位其中延遲高的服務(wù)呢?
- 日志: 通過分析調(diào)用鏈路上的每個服務(wù)日志得到結(jié)果
- zipkin:使用zipkin的web UI可以一眼看出延遲高的服務(wù)
如圖所示,各業(yè)務(wù)系統(tǒng)在彼此調(diào)用時,將特定的跟蹤消息傳遞至zipkin,zipkin在收集到跟蹤信息后將其聚合處理、存儲、展示等,用戶可通過web UI方便?
獲得網(wǎng)絡(luò)延遲、調(diào)用鏈路、系統(tǒng)依賴等等。
zipkin主要涉及四個組件?collector?storage?search?web UI
- Collector接收各service傳輸?shù)臄?shù)據(jù)
- Cassandra作為Storage的一種,也可以是mysql等,默認(rèn)存儲在內(nèi)存中,配置cassandra可以參考這里
- Query負(fù)責(zé)查詢Storage中存儲的數(shù)據(jù),提供簡單的JSON API獲取數(shù)據(jù),主要提供給web UI使用
- Web?提供簡單的web界面
2.安裝
執(zhí)行如下命令下載jar包
wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec'- 1
- 1
其為一個spring?boot?工程,直接運(yùn)行jar
nohup java -jar zipkin.jar &- 1
- 1
訪問?http://ip:9411?
概念:
使用zipkin涉及幾個概念
-
Span:基本工作單元,一次鏈路調(diào)用(可以是RPC,DB等沒有特定的限制)創(chuàng)建一個span,通過一個64位ID標(biāo)識它,?
span通過還有其他的數(shù)據(jù),例如描述信息,時間戳,key-value對的(Annotation)tag信息,parent-id等,其中parent-id?
可以表示span調(diào)用鏈路來源,通俗的理解span就是一次請求信息 -
Trace:類似于樹結(jié)構(gòu)的Span集合,表示一條調(diào)用鏈路,存在唯一標(biāo)識
-
Annotation: 注解,用來記錄請求特定事件相關(guān)信息(例如時間),通常包含四個注解信息
cs - Client Start,表示客戶端發(fā)起請求
sr - Server Receive,表示服務(wù)端收到請求
ss - Server Send,表示服務(wù)端完成處理,并將結(jié)果發(fā)送給客戶端
cr - Client Received,表示客戶端獲取到服務(wù)端返回信息
-
BinaryAnnotation:提供一些額外信息,一般已key-value對出現(xiàn)
概念說完,來看下完整的調(diào)用鏈路?
上圖表示一請求鏈路,一條鏈路通過Trace Id唯一標(biāo)識,Span標(biāo)識發(fā)起的請求信息,各span通過parent id?關(guān)聯(lián)起來,如圖?
整個鏈路的依賴關(guān)系如下:?
完成鏈路調(diào)用的記錄后,如何來計算調(diào)用的延遲呢,這就需要利用Annotation信息
sr-cs 得到請求發(fā)出延遲
ss-sr 得到服務(wù)端處理延遲
cr-cs 得到真?zhèn)€鏈路完成延遲
?
brave
作為各調(diào)用鏈路,只需要負(fù)責(zé)將指定格式的數(shù)據(jù)發(fā)送給zipkin即可,利用brave可快捷完成操作。
首先導(dǎo)入jar包pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.3.6.RELEASE</version>
</parent>
<!-- https://mvnrepository.com/artifact/io.zipkin.brave/brave-core -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-core</artifactId>
<version>3.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.zipkin.brave/brave-http -->
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-http</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-spancollector-http</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-web-servlet-filter</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-okhttp</artifactId>
<version>3.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.13</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.1</version>
</dependency>
</dependencies>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
利用spring boot創(chuàng)建工程
Application.Java
package com.lkl.zipkin;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
*
* Created by liaokailin on 16/7/27.
*/
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication app = new SpringApplication(Application.class);
app.run(args);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
建立controller對外提供服務(wù)
HomeController.java
RestController
@RequestMapping("/")
public class HomeController {
@Autowired
private OkHttpClient client;
private Random random = new Random();
@RequestMapping("start")
public String start() throws InterruptedException, IOException {
int sleep= random.nextInt(100);
TimeUnit.MILLISECONDS.sleep(sleep);
Request request = new Request.Builder().url("http://localhost:9090/foo").get().build();
Response response = client.newCall(request).execute();
return " [service1 sleep " + sleep+" ms]" + response.body().toString();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
HomeController中利用OkHttpClient調(diào)用發(fā)起http請求。在每次發(fā)起請求時則需要通過brave記錄Span信息,并異步傳遞給zipkin?
作為被調(diào)用方(服務(wù)端)也同樣需要完成以上操作.
ZipkinConfig.java
package com.lkl.zipkin.config;
import com.github.kristofa.brave.Brave;
import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler;
import com.github.kristofa.brave.SpanCollector;
import com.github.kristofa.brave.http.DefaultSpanNameProvider;
import com.github.kristofa.brave.http.HttpSpanCollector;
import com.github.kristofa.brave.okhttp.BraveOkHttpRequestResponseInterceptor;
import com.github.kristofa.brave.servlet.BraveServletFilter;
import okhttp3.OkHttpClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by liaokailin on 16/7/27.
*/
@Configuration
public class ZipkinConfig {
@Autowired
private ZipkinProperties properties;
@Bean
public SpanCollector spanCollector() {
HttpSpanCollector.Config config = HttpSpanCollector.Config.builder().connectTimeout(properties.getConnectTimeout()).readTimeout(properties.getReadTimeout())
.compressionEnabled(properties.isCompressionEnabled()).flushInterval(properties.getFlushInterval()).build();
return HttpSpanCollector.create(properties.getUrl(), config, new EmptySpanCollectorMetricsHandler());
}
@Bean
public Brave brave(SpanCollector spanCollector){
Brave.Builder builder = new Brave.Builder(properties.getServiceName()); //指定state
builder.spanCollector(spanCollector);
builder.traceSampler(Sampler.ALWAYS_SAMPLE);
Brave brave = builder.build();
return brave;
}
@Bean
public BraveServletFilter braveServletFilter(Brave brave){
BraveServletFilter filter = new BraveServletFilter(brave.serverRequestInterceptor(),brave.serverResponseInterceptor(),new DefaultSpanNameProvider());
return filter;
}
@Bean
public OkHttpClient okHttpClient(Brave brave){
OkHttpClient client = new OkHttpClient.Builder()
.addInterceptor(new BraveOkHttpRequestResponseInterceptor(brave.clientRequestInterceptor(), brave.clientResponseInterceptor(), new DefaultSpanNameProvider()))
.build();
return client;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
-
SpanCollector?配置收集器
-
Brave?各工具類的封裝,其中builder.traceSampler(Sampler.ALWAYS_SAMPLE)設(shè)置采樣比率,0-1之間的百分比
-
BraveServletFilter?作為攔截器,需要serverRequestInterceptor,serverResponseInterceptor?分別完成sr和ss操作
-
OkHttpClient?添加攔截器,需要clientRequestInterceptor,clientResponseInterceptor?分別完成cs和cr操作,該功能由?
brave中的brave-okhttp模塊提供,同樣的道理如果需要記錄數(shù)據(jù)庫的延遲只要在數(shù)據(jù)庫操作前后完成cs和cr即可,當(dāng)然brave提供其封裝。
以上還缺少一個配置信息ZipkinProperties.java
package com.lkl.zipkin.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* Created by liaokailin on 16/7/28.
*/
@Configuration
@ConfigurationProperties(prefix = "com.zipkin")
public class ZipkinProperties {
private String serviceName;
private String url;
private int connectTimeout;
private int readTimeout;
private int flushInterval;
private boolean compressionEnabled;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public int getConnectTimeout() {
return connectTimeout;
}
public void setConnectTimeout(int connectTimeout) {
this.connectTimeout = connectTimeout;
}
public int getReadTimeout() {
return readTimeout;
}
public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}
public int getFlushInterval() {
return flushInterval;
}
public void setFlushInterval(int flushInterval) {
this.flushInterval = flushInterval;
}
public boolean isCompressionEnabled() {
return compressionEnabled;
}
public void setCompressionEnabled(boolean compressionEnabled) {
this.compressionEnabled = compressionEnabled;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
則可以在配置文件application.properties中配置相關(guān)信息
com.zipkin.serviceName=service1
com.zipkin.url=http://110.173.14.57:9411
com.zipkin.connectTimeout=6000
com.zipkin.readTimeout=6000
com.zipkin.flushInterval=1
com.zipkin.compressionEnabled=true
server.port=8080
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 1
- 2
- 3
- 4
- 5
- 6
- 7
那么其中的service1即完成,同樣的道理,修改配置文件(調(diào)整com.zipkin.serviceName,以及server.port)以及controller對應(yīng)的方法構(gòu)造若干服務(wù)
service1?中訪問http://localhost:8080/start需要訪問http://localhost:9090/foo,則構(gòu)造server2提供該方法
server2配置
com.zipkin.serviceName=service2
com.zipkin.url=http://110.173.14.57:9411
com.zipkin.connectTimeout=6000
com.zipkin.readTimeout=6000
com.zipkin.flushInterval=1
com.zipkin.compressionEnabled=true
server.port=9090
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
controller方法
@RequestMapping("foo")
public String foo() throws InterruptedException, IOException {
Random random = new Random();
int sleep= random.nextInt(100);
TimeUnit.MILLISECONDS.sleep(sleep);
Request request = new Request.Builder().url("http://localhost:9091/bar").get().build(); //service3
Response response = client.newCall(request).execute();
String result = response.body().string();
request = new Request.Builder().url("http://localhost:9092/tar").get().build(); //service4
response = client.newCall(request).execute();
result += response.body().string();
return " [service2 sleep " + sleep+" ms]" + result;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
在server2中調(diào)用server3和server4中的方法
方法分別為
@RequestMapping("bar")
public String bar() throws InterruptedException, IOException { //service3 method
Random random = new Random();
int sleep= random.nextInt(100);
TimeUnit.MILLISECONDS.sleep(sleep);
return " [service3 sleep " + sleep+" ms]";
}
@RequestMapping("tar")
public String tar() throws InterruptedException, IOException { //service4 method
Random random = new Random();
int sleep= random.nextInt(1000);
TimeUnit.MILLISECONDS.sleep(sleep);
return " [service4 sleep " + sleep+" ms]";
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
將工程修改后編譯成jar形式
執(zhí)行
nohup java -jar server4.jar &
nohup java -jar server3.jar &
nohup java -jar server2.jar &
nohup java -jar server1.jar &
- 1
- 2
- 3
- 4
- 5
- 6
- 1
- 2
- 3
- 4
- 5
- 6
訪問http://localhost:8080/start后查看zipkin的web UI
點(diǎn)擊條目可以查看具體的延遲信息
服務(wù)之間的依賴為?
?
brave 源碼
以上完成了基本的操作,下面將從源碼角度來看下brave的實(shí)現(xiàn)
首先從SpanCollector來入手
@Bean
public SpanCollector spanCollector() {
HttpSpanCollector.Config config = HttpSpanCollector.Config.builder().connectTimeout(properties.getConnectTimeout()).readTimeout(properties.getReadTimeout())
.compressionEnabled(properties.isCompressionEnabled()).flushInterval(properties.getFlushInterval()).build();
return HttpSpanCollector.create(properties.getUrl(), config, new EmptySpanCollectorMetricsHandler());
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
從名稱上看HttpSpanCollector是基于http的span收集器,因此超時配置是必須的,默認(rèn)給出的超時時間較長,flushInterval表示span的傳遞?
間隔,實(shí)際為定時任務(wù)執(zhí)行的間隔時間.在HttpSpanCollector中覆寫了父類方法sendSpans
@Override
protected void sendSpans(byte[] json) throws IOException {
// intentionally not closing the connection, so as to use keep-alives
HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
connection.setConnectTimeout(config.connectTimeout());
connection.setReadTimeout(config.readTimeout());
connection.setRequestMethod("POST");
connection.addRequestProperty("Content-Type", "application/json");
if (config.compressionEnabled()) {
connection.addRequestProperty("Content-Encoding", "gzip");
ByteArrayOutputStream gzipped = new ByteArrayOutputStream();
try (GZIPOutputStream compressor = new GZIPOutputStream(gzipped)) {
compressor.write(json);
}
json = gzipped.toByteArray();
}
connection.setDoOutput(true);
connection.setFixedLengthStreamingMode(json.length);
connection.getOutputStream().write(json);
try (InputStream in = connection.getInputStream()) {
while (in.read() != -1) ; // skip
} catch (IOException e) {
try (InputStream err = connection.getErrorStream()) {
if (err != null) { // possible, if the connection was dropped
while (err.read() != -1) ; // skip
}
}
throw e;
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
可以看出最終span信息是通過HttpURLConnection實(shí)現(xiàn)的,同樣道理就可以推理brave對brave-spring-resttemplate-interceptors模塊的實(shí)現(xiàn),?
只是換了一種http封裝。
Brave
@Bean
public Brave brave(SpanCollector spanCollector){
Brave.Builder builder = new Brave.Builder(properties.getServiceName()); //指定state
builder.spanCollector(spanCollector);
builder.traceSampler(Sampler.ALWAYS_SAMPLE);
Brave brave = builder.build();
return brave;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
Brave類包裝了各種工具類
public Brave build() {
return new Brave(this);
}
- 1
- 2
- 3
- 1
- 2
- 3
創(chuàng)建一個Brave
private Brave(Builder builder) {
serverTracer = ServerTracer.builder()
.randomGenerator(builder.random)
.spanCollector(builder.spanCollector)
.state(builder.state)
.traceSampler(builder.sampler).build();
clientTracer = ClientTracer.builder()
.randomGenerator(builder.random)
.spanCollector(builder.spanCollector)
.state(builder.state)
.traceSampler(builder.sampler).build();
localTracer = LocalTracer.builder()
.randomGenerator(builder.random)
.spanCollector(builder.spanCollector)
.spanAndEndpoint(SpanAndEndpoint.LocalSpanAndEndpoint.create(builder.state))
.traceSampler(builder.sampler).build();
serverRequestInterceptor = new ServerRequestInterceptor(serverTracer);
serverResponseInterceptor = new ServerResponseInterceptor(serverTracer);
clientRequestInterceptor = new ClientRequestInterceptor(clientTracer);
clientResponseInterceptor = new ClientResponseInterceptor(clientTracer);
serverSpanAnnotationSubmitter = AnnotationSubmitter.create(SpanAndEndpoint.ServerSpanAndEndpoint.create(builder.state));
serverSpanThreadBinder = new ServerSpanThreadBinder(builder.state);
clientSpanThreadBinder = new ClientSpanThreadBinder(builder.state);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
封裝了*Tracer,*Interceptor,*Binder等
其中?serverTracer當(dāng)服務(wù)作為服務(wù)端時處理span信息,clientTracer當(dāng)服務(wù)作為客戶端時處理span信息
Filter
BraveServletFilter是http模塊提供的攔截器功能,傳遞serverRequestInterceptor,serverResponseInterceptor,spanNameProvider等參數(shù)?
其中spanNameProvider表示如何處理span的名稱,默認(rèn)使用method名稱,spring boot中申明的filter bean?默認(rèn)攔截所有請求
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws IOException, ServletException {
String alreadyFilteredAttributeName = getAlreadyFilteredAttributeName();
boolean hasAlreadyFilteredAttribute = request.getAttribute(alreadyFilteredAttributeName) != null;
if (hasAlreadyFilteredAttribute) {
// Proceed without invoking this filter...
filterChain.doFilter(request, response);
} else {
final StatusExposingServletResponse statusExposingServletResponse = new StatusExposingServletResponse((HttpServletResponse) response);
requestInterceptor.handle(new HttpServerRequestAdapter(new ServletHttpServerRequest((HttpServletRequest) request), spanNameProvider));
try {
filterChain.doFilter(request, statusExposingServletResponse);
} finally {
responseInterceptor.handle(new HttpServerResponseAdapter(new HttpResponse() {
@Override
public int getHttpStatusCode() {
return statusExposingServletResponse.getStatus();
}
}));
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
首先來看requestInterceptor.handle方法,
public void handle(ServerRequestAdapter adapter) {
serverTracer.clearCurrentSpan();
final TraceData traceData = adapter.getTraceData();
Boolean sample = traceData.getSample();
if (sample != null && Boolean.FALSE.equals(sample)) {
serverTracer.setStateNoTracing();
LOGGER.fine("Received indication that we should NOT trace.");
} else {
if (traceData.getSpanId() != null) {
LOGGER.fine("Received span information as part of request.");
SpanId spanId = traceData.getSpanId();
serverTracer.setStateCurrentTrace(spanId.traceId, spanId.spanId,
spanId.nullableParentId(), adapter.getSpanName());
} else {
LOGGER.fine("Received no span state.");
serverTracer.setStateUnknown(adapter.getSpanName());
}
serverTracer.setServerReceived();
for(KeyValueAnnotation annotation : adapter.requestAnnotations())
{
serverTracer.submitBinaryAnnotation(annotation.getKey(), annotation.getValue());
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
其中serverTracer.clearCurrentSpan()清除當(dāng)前線程上的span信息,調(diào)用ThreadLocalServerClientAndLocalSpanState中的
@Override
public void setCurrentServerSpan(final ServerSpan span) {
if (span == null) {
currentServerSpan.remove();
} else {
currentServerSpan.set(span);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
currentServerSpan為ThreadLocal對象
private final static ThreadLocal<ServerSpan> currentServerSpan = new ThreadLocal<ServerSpan>() {- 1
- 1
回到ServerRequestInterceptor#handle()方法中final TraceData traceData = adapter.getTraceData()
@Override
public TraceData getTraceData() {
final String sampled = serverRequest.getHttpHeaderValue(BraveHttpHeaders.Sampled.getName());
if (sampled != null) {
if (sampled.equals("0") || sampled.toLowerCase().equals("false")) {
return TraceData.builder().sample(false).build();
} else {
final String parentSpanId = serverRequest.getHttpHeaderValue(BraveHttpHeaders.ParentSpanId.getName());
final String traceId = serverRequest.getHttpHeaderValue(BraveHttpHeaders.TraceId.getName());
final String spanId = serverRequest.getHttpHeaderValue(BraveHttpHeaders.SpanId.getName());
if (traceId != null && spanId != null) {
SpanId span = getSpanId(traceId, spanId, parentSpanId);
return TraceData.builder().sample(true).spanId(span).build();
}
}
}
return TraceData.builder().build();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
其中SpanId span = getSpanId(traceId, spanId, parentSpanId)?將構(gòu)造一個SpanId對象
private SpanId getSpanId(String traceId, String spanId, String parentSpanId) {
return SpanId.builder()
.traceId(convertToLong(traceId))
.spanId(convertToLong(spanId))
.parentId(parentSpanId == null ? null : convertToLong(parentSpanId)).build();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 1
- 2
- 3
- 4
- 5
- 6
- 7
將traceId,spanId,parentId關(guān)聯(lián)起來,其中設(shè)置parentId方法為
public Builder parentId(@Nullable Long parentId) {
if (parentId == null) {
this.flags |= FLAG_IS_ROOT;
} else {
this.flags &= ~FLAG_IS_ROOT;
}
this.parentId = parentId;
return this;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
如果parentId為空為根節(jié)點(diǎn),則執(zhí)行this.flags |= FLAG_IS_ROOT?,因此后續(xù)在判斷節(jié)點(diǎn)是否為根節(jié)點(diǎn)時,只需要執(zhí)行(flags & FLAG_IS_ROOT) == FLAG_IS_ROOT即可.
構(gòu)造完SpanId后看
serverTracer.setStateCurrentTrace(spanId.traceId, spanId.spanId,
spanId.nullableParentId(), adapter.getSpanName());
- 1
- 2
- 1
- 2
設(shè)置當(dāng)前Span
public void setStateCurrentTrace(long traceId, long spanId, @Nullable Long parentSpanId, @Nullable String name) {
checkNotBlank(name, "Null or blank span name");
spanAndEndpoint().state().setCurrentServerSpan(
ServerSpan.create(traceId, spanId, parentSpanId, name));
}
- 1
- 2
- 3
- 4
- 5
- 6
- 1
- 2
- 3
- 4
- 5
- 6
ServerSpan.create創(chuàng)建Span信息
static ServerSpan create(long traceId, long spanId, @Nullable Long parentSpanId, String name) {
Span span = new Span();
span.setTrace_id(traceId);
span.setId(spanId);
if (parentSpanId != null) {
span.setParent_id(parentSpanId);
}
span.setName(name);
return create(span, true);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
構(gòu)造了一個包含Span信息的AutoValue_ServerSpan對象
通過setCurrentServerSpan設(shè)置到當(dāng)前線程上
繼續(xù)看serverTracer.setServerReceived()方法
public void setServerReceived() {
submitStartAnnotation(zipkinCoreConstants.SERVER_RECV);
}
- 1
- 2
- 3
- 1
- 2
- 3
為當(dāng)前請求設(shè)置了server received event
void submitStartAnnotation(String annotationName) {
Span span = spanAndEndpoint().span();
if (span != null) {
Annotation annotation = Annotation.create(
currentTimeMicroseconds(),
annotationName,
spanAndEndpoint().endpoint()
);
synchronized (span) {
span.setTimestamp(annotation.timestamp);
span.addToAnnotations(annotation);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
在這里為Span信息設(shè)置了Annotation信息,后續(xù)的
for(KeyValueAnnotation annotation : adapter.requestAnnotations())
{
serverTracer.submitBinaryAnnotation(annotation.getKey(), annotation.getValue());
}
- 1
- 2
- 3
- 4
- 5
- 1
- 2
- 3
- 4
- 5
設(shè)置了BinaryAnnotation信息,adapter.requestAnnotations()在構(gòu)造HttpServerRequestAdapter時已完成
@Override
public Collection<KeyValueAnnotation> requestAnnotations() {
KeyValueAnnotation uriAnnotation = KeyValueAnnotation.create(
TraceKeys.HTTP_URL, serverRequest.getUri().toString());
return Collections.singleton(uriAnnotation);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 1
- 2
- 3
- 4
- 5
- 6
以上將Span信息(包括sr)存儲在當(dāng)前線程中,接下來繼續(xù)看BraveServletFilter#doFilter方法的finally部分
responseInterceptor.handle(new HttpServerResponseAdapter(new HttpResponse() {
@Override //獲取http狀態(tài)碼
public int getHttpStatusCode() {
return statusExposingServletResponse.getStatus();
}
}));
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 1
- 2
- 3
- 4
- 5
- 6
- 7
handle方法
public void handle(ServerResponseAdapter adapter) {
// We can submit this in any case. When server state is not set or
// we should not trace this request nothing will happen.
LOGGER.fine("Sending server send.");
try {
for(KeyValueAnnotation annotation : adapter.responseAnnotations())
{
serverTracer.submitBinaryAnnotation(annotation.getKey(), annotation.getValue());
}
serverTracer.setServerSend();
} finally {
serverTracer.clearCurrentSpan();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
首先配置BinaryAnnotation信息,然后執(zhí)行serverTracer.setServerSend,在finally中清除當(dāng)前線程中的Span信息(不管前面是否清楚成功,最終都將執(zhí)行該不走),ThreadLocal中的數(shù)據(jù)要做到有始有終
看serverTracer.setServerSend()
public void setServerSend() {
if (submitEndAnnotation(zipkinCoreConstants.SERVER_SEND, spanCollector())) {
spanAndEndpoint().state().setCurrentServerSpan(null);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 1
- 2
- 3
- 4
- 5
- 6
終于看到spanCollector收集器了,說明下面將看是收集Span信息,這里為ss注解
boolean submitEndAnnotation(String annotationName, SpanCollector spanCollector) {
Span span = spanAndEndpoint().span();
if (span == null) {
return false;
}
Annotation annotation = Annotation.create(
currentTimeMicroseconds(),
annotationName,
spanAndEndpoint().endpoint()
);
span.addToAnnotations(annotation);
if (span.getTimestamp() != null) {
span.setDuration(annotation.timestamp - span.getTimestamp());
}
spanCollector.collect(span);
return true;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
首先獲取當(dāng)前線程中的Span信息,然后處理注解信息,通過annotation.timestamp - span.getTimestamp()計算延遲,?
調(diào)用spanCollector.collect(span)進(jìn)行收集Span信息,那么Span信息是同步收集的嗎?肯定不是的,接著看
調(diào)用spanCollector.collect(span)則執(zhí)行FlushingSpanCollector中的collect方法
@Override
public void collect(Span span) {
metrics.incrementAcceptedSpans(1);
if (!pending.offer(span)) {
metrics.incrementDroppedSpans(1);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
首先進(jìn)行的是metrics統(tǒng)計信息,可以自定義該SpanCollectorMetricsHandler信息收集各指標(biāo)信息,利用如grafana等展示信息
pending.offer(span)將span信息存儲在BlockingQueue中,然后通過定時任務(wù)去取出阻塞隊列中的值,偷偷摸摸的上傳span信息
定時任務(wù)利用了Flusher類來執(zhí)行,在構(gòu)造FlushingSpanCollector時構(gòu)造了Flusher類
static final class Flusher implements Runnable {
final Flushable flushable;
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Flusher(Flushable flushable, int flushInterval) {
this.flushable = flushable;
this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS);
}
@Override
public void run() {
try {
flushable.flush();
} catch (IOException ignored) {
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
創(chuàng)建了一個核心線程數(shù)為1的線程池,每間隔flushInterval秒執(zhí)行一次Span信息上傳,執(zhí)行flush方法
@Override
public void flush() {
if (pending.isEmpty()) return;
List<Span> drained = new ArrayList<Span>(pending.size());
pending.drainTo(drained);
if (drained.isEmpty()) return;
int spanCount = drained.size();
try {
reportSpans(drained);
} catch (IOException e) {
metrics.incrementDroppedSpans(spanCount);
} catch (RuntimeException e) {
metrics.incrementDroppedSpans(spanCount);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
首先將阻塞隊列中的值全部取出存如集合中,最后調(diào)用reportSpans(List<Span> drained)抽象方法,該方法在AbstractSpanCollector得到覆寫
@Override
protected void reportSpans(List<Span> drained) throws IOException {
byte[] encoded = codec.writeSpans(drained);
sendSpans(encoded);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 1
- 2
- 3
- 4
- 5
- 6
轉(zhuǎn)換成字節(jié)流后調(diào)用sendSpans抽象方法發(fā)送Span信息,此時就回到一開始說的HttpSpanCollector通過HttpURLConnection實(shí)現(xiàn)的sendSpans方法。
具體使用可以參考:https://github.com/liaokailin/zipkin#architecture,下載這個maven項目并按照里面的說明運(yùn)行即可。
總結(jié)
以上是生活随笔為你收集整理的分布式系统监控系统zipkin入门的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MySQL主从延时这么长,要怎么优化?
- 下一篇: 推荐系统算法总结(三)——FM与DNN