日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java高可用grpc_GRPC java 分布式调用链跟踪实践

發布時間:2023/12/3 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java高可用grpc_GRPC java 分布式调用链跟踪实践 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Opentracing基本模型

image.png

如圖,在跟蹤鏈中有以下幾個比較重要的數據結構和概念:

span:標識一次分布式調用,其自身包含了id,parentId(指向上級Span的id), traceIds,服務名稱等重要屬性,其應盡量保持精簡;

trace:標識整個請求鏈,即一些列Span的組合。其自身的ID將貫穿整個調用鏈,其中的每個Span都必須攜帶這個traceId,因此traceId將在整個調用鏈中傳遞;

cs:客戶端發起請求,標志Span的開始;

sr:服務端接收到請求,并開始處理內部事務,其中sr - cs則為網絡延遲和時鐘抖動;

ss:服務端處理完請求,返回響應內容,其中ss - sr則為服務端處理請求耗時;

cr:客戶端接收到服務端響應內容,標志著Span的結束,其中cr - ss則為網絡延遲和時鐘抖動。

客戶端調用時間=cr-cs

服務端處理時間=sr-ss

分布式系統調用跟蹤的基本架構要求

低侵入性,高性能,高可用容錯,低丟失率等。

基于GRPC的分布式系統調用跟蹤實踐

創建TraceContext

TraceContext通過Threadlocal對span進行保存,并且將traceid和spanid向底層服務傳遞,zebra對線程上下文傳遞進行了封裝,具體參照GRPC如何實現公共參數與業務參數分離傳遞下面是TraceContext具體代碼

public class TraceContext{

private static final String SPAN_LIST_KEY = "spanList";

public static final String TRACE_ID_KEY = "traceId";

public static final String SPAN_ID_KEY = "spanId";

public static final String ANNO_CS = "cs";

public static final String ANNO_CR = "cr";

public static final String ANNO_SR = "sr";

public static final String ANNO_SS = "ss";

private TraceContext(){}

public static void setTraceId(String traceId) {

RpcContext.getContext().set(TRACE_ID_KEY, traceId);

}

public static String getTraceId() {

return (String) RpcContext.getContext().get(TRACE_ID_KEY);

}

public static String getSpanId() {

return (String) RpcContext.getContext().get(SPAN_ID_KEY);

}

public static void setSpanId(String spanId) {

RpcContext.getContext().set(SPAN_ID_KEY, spanId);

}

@SuppressWarnings("unchecked")

public static void addSpan(Span span){

((List)RpcContext.getContext().get(SPAN_LIST_KEY)).add(span);

}

@SuppressWarnings("unchecked")

public static List getSpans(){

return (List) RpcContext.getContext().get(SPAN_LIST_KEY);

}

public static void clear(){

RpcContext.getContext().remove(TRACE_ID_KEY);

RpcContext.getContext().remove(SPAN_ID_KEY);

RpcContext.getContext().remove(SPAN_LIST_KEY);

}

public static void start(){

clear();

RpcContext.getContext().set(SPAN_LIST_KEY, new ArrayList());

}

}

創建TraceAgent

TraceAgent將span信息上傳至kafka,代碼如下:

public class TraceAgent {

private GrpcProperties grpcProperties;

private KafkaSender sender;

private AsyncReporter report;

public TraceAgent() {

grpcProperties = SpringContextUtils.getBean(GrpcProperties.class);

sender = KafkaSender.newBuilder().bootstrapServers(grpcProperties.getCallChainUpdAddr()).topic("zipkin").encoding(Encoding.JSON).build();

report = AsyncReporter.builder(sender).build();

}

public void send(final List spans){

spans.forEach(item ->{

report.report(item);

});

}

}

創建ZebraClientTracing

ZebraClientTracing用于記錄調用端的span信息,具體代碼如下:

@Component

public class ZebraClientTracing {

public Span startTrace(String method) {

String id = IdUtils.get() + "";

String traceId = null;

if (null == TraceContext.getTraceId()) {

TraceContext.start();

traceId = id;

} else {

traceId = TraceContext.getTraceId();

}

long timestamp = System.currentTimeMillis() * 1000;

// 注冊本地信息

Endpoint endpoint = Endpoint.newBuilder().ip(NetUtils.getLocalHost()).serviceName(EtcdRegistry.serviceName)

.port(50003).build();

// 初始化span

Span consumerSpan = Span.newBuilder().localEndpoint(endpoint).id(id).traceId(traceId)

.parentId(TraceContext.getSpanId() + "").name(EtcdRegistry.serviceName).timestamp(timestamp)

.addAnnotation(timestamp, TraceContext.ANNO_CS).putTag("method", method)

.putTag("pkgId", RpcContext.getContext().getAttachment("pkg")).build();

// 將tracing id和spanid放到上下文

RpcContext.getContext().get().put(TraceContext.TRACE_ID_KEY, consumerSpan.traceId());

RpcContext.getContext().get().put(TraceContext.SPAN_ID_KEY, String.valueOf(consumerSpan.id()));

return consumerSpan;

}

public void endTrace(Span span, Stopwatch watch,int code) {

span = span.toBuilder().addAnnotation(System.currentTimeMillis() * 1000, TraceContext.ANNO_CR)

.duration(watch.stop().elapsed(TimeUnit.MICROSECONDS)).putTag("code", code+"").build();

TraceAgent traceAgent = new TraceAgent();

traceAgent.send(TraceContext.getSpans());

}

}

創建ZebraServerTracing

ZebraServerTracing用于記錄服務端的span信息,具體代碼如下:

@Component

public class ZebraServerTracing {

public Span startTrace(String method) {

String traceId = (String) RpcContext.getContext().get(TraceContext.TRACE_ID_KEY);

String parentSpanId = (String) RpcContext.getContext().get(TraceContext.SPAN_ID_KEY);

String id = IdUtils.get() + "";

TraceContext.start();

TraceContext.setTraceId(traceId);

TraceContext.setSpanId(parentSpanId);

long timestamp = System.currentTimeMillis() * 1000;

Endpoint endpoint = Endpoint.newBuilder().ip(NetUtils.getLocalHost()).serviceName(EtcdRegistry.serviceName)

.port(50003).build();

Span providerSpan = Span.newBuilder().id(id).parentId(parentSpanId).traceId(traceId)

.name(EtcdRegistry.serviceName).timestamp(timestamp).localEndpoint(endpoint)

.addAnnotation(timestamp, TraceContext.ANNO_SR).putTag("method", method)

.putTag("pkgId", RpcContext.getContext().getAttachment("pkg"))

.build();

TraceContext.addSpan(providerSpan);

return providerSpan;

}

public void endTrace(Span span, Stopwatch watch,int code) {

span = span.toBuilder().addAnnotation(System.currentTimeMillis() * 1000, TraceContext.ANNO_SS)

.duration(watch.stop().elapsed(TimeUnit.MICROSECONDS)).putTag("code", code+"").build();

TraceAgent traceAgent = new TraceAgent();

traceAgent.send(TraceContext.getSpans());

}

}

創建grpc client攔截器

public class HeaderClientInterceptor implements ClientInterceptor {

private static final Logger log = LogManager.getLogger(HeaderClientInterceptor.class);

private final ZebraClientTracing clientTracing;

public static ClientInterceptor instance() {

return new HeaderClientInterceptor();

}

private HeaderClientInterceptor() {

clientTracing = SpringContextUtils.getBean(ZebraClientTracing.class);

}

@Override

public ClientCall interceptCall(MethodDescriptor method,

CallOptions callOptions, Channel next) {

return new SimpleForwardingClientCall(next.newCall(method, callOptions)) {

//判斷API網關是否要打開調用鏈

boolean isGatewayTracing = "1".equals(RpcContext.getContext().getAttachment(ZebraConstants.ZEBRA_OPEN_TRACING))?true:false;

boolean isSubTracing = RpcContext.getContext().get(TraceContext.TRACE_ID_KEY)!=null?true:false;

Stopwatch watch =null;

Span span =null;

@Override

public void start(Listener responseListener, Metadata headers) {

if(isSubTracing||isGatewayTracing){

span =clientTracing.startTrace(method.getFullMethodName());

watch = Stopwatch.createStarted();

}

copyThreadLocalToMetadata(headers);

super.start(new SimpleForwardingClientCallListener(responseListener) {

@Override

public void onHeaders(Metadata headers) {

super.onHeaders(headers);

}

@Override

public void onClose(Status status, Metadata trailers) {

super.onClose(status, trailers);

if(isSubTracing||isGatewayTracing)

clientTracing.endTrace(span, watch,status.getCode().value());

}

}, headers);

}

};

}

private void copyThreadLocalToMetadata(Metadata headers) {

Map attachments = RpcContext.getContext().getAttachments();

Map values = RpcContext.getContext().get();

try {

if (!attachments.isEmpty()) {

headers.put(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS, SerializerUtil.toJson(attachments));

}

if (!values.isEmpty()) {

headers.put(GrpcUtil.GRPC_CONTEXT_VALUES, SerializerUtil.toJson(values));

}

} catch (Throwable e) {

log.error(e.getMessage(), e);

}

}

}

創建grpc server攔截器

public class HeaderServerInterceptor implements ServerInterceptor {

private static final Logger log = LogManager.getLogger(HeaderServerInterceptor.class);

private final ZebraServerTracing serverTracing;

public static ServerInterceptor instance() {

return new HeaderServerInterceptor();

}

private HeaderServerInterceptor() {

serverTracing = SpringContextUtils.getBean(ZebraServerTracing.class);

}

@Override

public Listener interceptCall(ServerCall call, final Metadata headers,

ServerCallHandler next) {

return next.startCall(new SimpleForwardingServerCall(call) {

boolean isSubTracing = RpcContext.getContext().get(TraceContext.TRACE_ID_KEY) != null ? true : false;

Stopwatch watch = null;

Span span = null;

@Override

public void request(int numMessages) {

if (isSubTracing) {

span = serverTracing.startTrace(call.getMethodDescriptor().getFullMethodName());

watch = Stopwatch.createStarted();

}

InetSocketAddress remoteAddress = (InetSocketAddress) call.getAttributes()

.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);

RpcContext.getContext().setAttachment(ZebraConstants.REMOTE_ADDRESS, remoteAddress.getHostString());

copyMetadataToThreadLocal(headers);

log.debug("FullMethodName:{},RemoteAddress={},attachments={},context={}",

call.getMethodDescriptor().getFullMethodName(), remoteAddress.getHostString(),

headers.get(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS), headers.get(GrpcUtil.GRPC_CONTEXT_VALUES));

super.request(numMessages);

}

@Override

public void close(Status status, Metadata trailers) {

delegate().close(status, trailers);

if(isSubTracing)

serverTracing.endTrace(span, watch,status.getCode().value());

}

}, headers);

}

private void copyMetadataToThreadLocal(Metadata headers) {

String attachments = headers.get(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS);

String values = headers.get(GrpcUtil.GRPC_CONTEXT_VALUES);

try {

if (attachments != null) {

Map attachmentsMap = SerializerUtil.fromJson(attachments,

new TypeToken>() {

}.getType());

RpcContext.getContext().setAttachments(attachmentsMap);

}

if (values != null) {

Map valuesMap = SerializerUtil.fromJson(values, new TypeToken>() {

}.getType());

for (Map.Entry entry : valuesMap.entrySet()) {

RpcContext.getContext().set(entry.getKey(), entry.getValue());

}

}

} catch (Throwable e) {

log.error(e.getMessage(), e);

}

}

}

總結

以上是生活随笔為你收集整理的java高可用grpc_GRPC java 分布式调用链跟踪实践的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。