日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

关于“豪猪”,你理解的透彻吗?【Hystrix是个什么玩意儿】

發(fā)布時間:2024/2/28 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 关于“豪猪”,你理解的透彻吗?【Hystrix是个什么玩意儿】 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1. 什么是Hystrix

?

?

Hystrix是Netflix的一個開源框架,地址如下:https://github.com/Netflix/Hystrix

?

中文名為“豪豬”,即平時很溫順,在感受到危險(xiǎn)的時候,用刺保護(hù)自己;在危險(xiǎn)過去后,還是一個溫順的肉球。

?

所以,整個框架的核心業(yè)務(wù)也就是這2點(diǎn):

?

  • 何時需要保護(hù)

  • 如何保護(hù)

  • ?

    2. 何時需要保護(hù)

    ?

    對于一個系統(tǒng)而言,它往往承擔(dān)著2層角色,服務(wù)提供者與服務(wù)消費(fèi)者。對于服務(wù)消費(fèi)者而言最大的痛苦就是如何“明哲保身”,做過網(wǎng)關(guān)項(xiàng)目的同學(xué)肯定感同身受

    上面是一個常見的系統(tǒng)依賴關(guān)系,底層的依賴往往很多,通信協(xié)議包括 socket、HTTP、Dubbo、WebService等等。當(dāng)通信層發(fā)生網(wǎng)絡(luò)抖動以及所依賴的系統(tǒng)發(fā)生業(yè)務(wù)響應(yīng)異常時,我們業(yè)務(wù)本身所提供的服務(wù)能力也直接會受到影響。

    ?

    這種效果傳遞下去就很有可能造成雪崩效應(yīng),即整個業(yè)務(wù)聯(lián)調(diào)發(fā)生異常,比如業(yè)務(wù)整體超時,或者訂單數(shù)據(jù)不一致。

    ?

    那么核心問題就來了,如何檢測業(yè)務(wù)處于異常狀態(tài)?

    ?

    成功率!成功率直接反映了業(yè)務(wù)的數(shù)據(jù)流轉(zhuǎn)狀態(tài),是最直接的業(yè)務(wù)表現(xiàn)。

    ?

    當(dāng)然,也可以根據(jù)超時時間做判斷,比如 Sentinel 的實(shí)現(xiàn)。其實(shí)這里概念上可以做一個轉(zhuǎn)化,用時間做超時控制,超時=失敗,這依然是一個成功率的概念。

    ?

    3. 如何保護(hù) 

    ?

    如同豪豬一樣,“刺”就是他的保護(hù)工具,所有的攻擊都會被刺無情的懟回去。

    ?

    在 Hystrix 的實(shí)現(xiàn)中,這就出現(xiàn)了“熔斷器”的概念,即當(dāng)前的系統(tǒng)是否處于需要保護(hù)的狀態(tài)。

    ?

    當(dāng)熔斷器處于開啟的狀態(tài)時,所有的請求都不會真正的走之前的業(yè)務(wù)邏輯,而是直接返回一個約定的信息,即 FallBack。通過這種快速失敗原則保護(hù)我們的系統(tǒng)。?

    ?

    但是,系統(tǒng)不應(yīng)該永遠(yuǎn)處于“有刺”的狀態(tài),當(dāng)危險(xiǎn)過后需要恢復(fù)正常。

    ?

    于是對熔斷器的核心操作就是如下幾個功能:

    ?

  • 如果成功率過低,就打開熔斷器,阻止正常業(yè)務(wù)

  • 隨著時間的流動,熔斷器處于半打開狀態(tài),嘗試性放入一筆請求

  •   熔斷器的核心 API 如下圖:?

    ?

    ?

    4. 限流、熔斷、隔離、降級

    ?

    這四個概念是我們談起微服務(wù)會經(jīng)常談到的概念,這里我們討論的是 Hystrix 的實(shí)現(xiàn)方式。

    ?

    限流

    ?

    • 這里的限流與 Guava 的 RateLimiter 的限流差異比較大,一個是為了“保護(hù)自我”,一個是“保護(hù)下游”

    • 當(dāng)對服務(wù)進(jìn)行限流時,超過的流量將直接 Fallback,即熔斷。而 RateLimiter 關(guān)心的其實(shí)是“流量整形”,將不規(guī)整流量在一定速度內(nèi)規(guī)整

    ?

    熔斷

    ?

    • 當(dāng)我的應(yīng)用無法提供服務(wù)時,我要對上游請求熔斷,避免上游把我壓垮

    • 當(dāng)我的下游依賴成功率過低時,我要對下游請求熔斷,避免下游把我拖垮

    ?

    降級

    ?

    • 降級與熔斷緊密相關(guān),熔斷后業(yè)務(wù)如何表現(xiàn),約定一個快速失敗的 Fallback,即為服務(wù)降級

    ?

    隔離

    ?

    • 業(yè)務(wù)之間不可互相影響,不同業(yè)務(wù)需要有獨(dú)立的運(yùn)行空間

    • 最徹底的,可以采用物理隔離,不同的機(jī)器部

    • 次之,采用進(jìn)程隔離,一個機(jī)器多個 Tomcat

    • 次之,請求隔離

    • 由于 Hystrix 框架所屬的層級為代碼層,所以實(shí)現(xiàn)的是請求隔離,線程池或信號量

    ?

    5. 源碼分析

    ?

    ?

    先上一個 Hystrix 的業(yè)務(wù)流程圖

    ?

    ?

    可以看到 Hystrix 的請求都要經(jīng)過 HystrixCommand 的包裝,其核心邏輯在 AbstractComman.java 類中。

    ?

    下面的源碼是基于 RxJava 的,看之前最好先了解下 RxJava 的常見用法與邏輯,否則看起來會很迷惑。

    ?

    簡單的說,RxJava 就是基于回調(diào)的函數(shù)式編程。通俗的說,就等同于策略模式的匿名內(nèi)部類實(shí)現(xiàn)。

    ?

    5.1 熔斷器

    ?

    首先看信號量是如何影響我們請求的:

    ?

    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {// 自定義擴(kuò)展executionHook.onStart(_cmd);//判斷熔斷器是否允許請求過來if (circuitBreaker.attemptExecution()) {//獲得分組信號量,如果沒有采用信號量分組,返回默認(rèn)通過的信號量實(shí)現(xiàn)final TryableSemaphore executionSemaphore = getExecutionSemaphore();final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);//調(diào)用終止的回調(diào)函數(shù)final Action0 singleSemaphoreRelease = new Action0() {@Overridepublic void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) {executionSemaphore.release();}}};//調(diào)用異常的回調(diào)函數(shù)final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {@Overridepublic void call(Throwable t) {eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);}};//根據(jù)信號量嘗試競爭信號量if (executionSemaphore.tryAcquire()) {try {//競爭成功,注冊執(zhí)行參數(shù)executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());return executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);} catch (RuntimeException e) {return Observable.error(e);}} else {//競爭失敗,進(jìn)入fallbackreturn handleSemaphoreRejectionViaFallback();}} else {//熔斷器已打開,進(jìn)入fallbackreturn handleShortCircuitViaFallback();}}

    ?

    什么時候熔斷器可以放請求進(jìn)來:

    ?

    @Overridepublic boolean attemptExecution() {//動態(tài)屬性判斷,熔斷器是否強(qiáng)制開著,如果強(qiáng)制開著,就不允許請求if (properties.circuitBreakerForceOpen().get()) {return false;}//如果強(qiáng)制關(guān)閉,就允許請求if (properties.circuitBreakerForceClosed().get()) {return true;}//如果當(dāng)前是關(guān)閉,就允許請求if (circuitOpened.get() == -1) {return true;} else {//如果當(dāng)前開著,就看是否已經(jīng)過了"滑動窗口",過了就可以請求,不過就不可以if (isAfterSleepWindow()) {//only the first request after sleep window should execute//if the executing command succeeds, the status will transition to CLOSED//if the executing command fails, the status will transition to OPEN//if the executing command gets unsubscribed, the status will transition to OPEN//這里使用CAS的方式,只有一個請求能過來,即"半關(guān)閉"狀態(tài)if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {return true;} else {return false;}} else {return false;}}}}

    ?

    這里有個重要概念就是"滑動窗口":

    ?

    ?

    private boolean isAfterSleepWindow() {final long circuitOpenTime = circuitOpened.get();final long currentTime = System.currentTimeMillis();final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();//滑動窗口的判斷就是看看熔斷器打開的時間與現(xiàn)在相比是否超過了配置的滑動窗口return currentTime > circuitOpenTime + sleepWindowTime;}

    ?

    5.2 隔離

    ?

    如果將業(yè)務(wù)請求進(jìn)行隔離?

    ?

    private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {//判斷隔離策略是什么,是線程池隔離還是信號量隔離    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)//線程池隔離的運(yùn)行邏輯如下return Observable.defer(new Func0<Observable<R>>() {@Overridepublic Observable<R> call() {executionResult = executionResult.setExecutionOccurred();if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));}//按照配置生成監(jiān)控?cái)?shù)據(jù)metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {// the command timed out in the wrapping thread so we will return immediately// and not increment any of the counters below or other such logicreturn Observable.error(new RuntimeException("timed out before executing run()"));}if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {//we have not been unsubscribed, so should proceedHystrixCounters.incrementGlobalConcurrentThreads();threadPool.markThreadExecution();// store the command that is being runendCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());executionResult = executionResult.setExecutedInThread();/*** If any of these hooks throw an exception, then it appears as if the actual execution threw an error*/try {//執(zhí)行擴(kuò)展點(diǎn)邏輯executionHook.onThreadStart(_cmd);executionHook.onRunStart(_cmd);executionHook.onExecutionStart(_cmd);return getUserExecutionObservable(_cmd);} catch (Throwable ex) {return Observable.error(ex);}} else {//command has already been unsubscribed, so return immediatelyreturn Observable.empty();}}//注冊各種場景的回調(diào)函數(shù)}).doOnTerminate(new Action0() {@Overridepublic void call() {if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {handleThreadEnd(_cmd);}if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {//if it was never started and received terminal, then no need to clean up (I don't think this is possible)}//if it was unsubscribed, then other cleanup handled it}}).doOnUnsubscribe(new Action0() {@Overridepublic void call() {if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {handleThreadEnd(_cmd);}if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {//if it was never started and was cancelled, then no need to clean up}//if it was terminal, then other cleanup handled it}//將邏輯放在線程池的調(diào)度器上執(zhí)行,即將上述邏輯放入線程池中}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {@Overridepublic Boolean call() {return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;}}));} else {//走到這里就是信號量隔離,在當(dāng)前線程中執(zhí)行,沒有調(diào)度器return Observable.defer(new Func0<Observable<R>>() {@Overridepublic Observable<R> call() {executionResult = executionResult.setExecutionOccurred();if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));}metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);// semaphore isolated// store the command that is being runendCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());try {executionHook.onRunStart(_cmd);executionHook.onExecutionStart(_cmd);return getUserExecutionObservable(_cmd); ?//the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw} catch (Throwable ex) {//If the above hooks throw, then use that as the result of the run methodreturn Observable.error(ex);}}});}}

    ?

    5.3 核心運(yùn)行流程

    ?

    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();//執(zhí)行發(fā)生的回調(diào)final Action1<R> markEmits = new Action1<R>() {@Overridepublic void call(R r) {if (shouldOutputOnNextEvents()) {executionResult = executionResult.addEvent(HystrixEventType.EMIT);eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);}if (commandIsScalar()) {long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());circuitBreaker.markSuccess();}}};//執(zhí)行成功的回調(diào),標(biāo)記下狀態(tài),熔斷器根據(jù)這個狀態(tài)維護(hù)熔斷邏輯final Action0 markOnCompleted = new Action0() {@Overridepublic void call() {if (!commandIsScalar()) {long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());circuitBreaker.markSuccess();}}};//執(zhí)行失敗的回調(diào)final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {@Overridepublic Observable<R> call(Throwable t) {circuitBreaker.markNonSuccess();Exception e = getExceptionFromThrowable(t);executionResult = executionResult.setExecutionException(e);//各種回調(diào)進(jìn)行各種fallbackif (e instanceof RejectedExecutionException) {return handleThreadPoolRejectionViaFallback(e);} else if (t instanceof HystrixTimeoutException) {return handleTimeoutViaFallback();} else if (t instanceof HystrixBadRequestException) {return handleBadRequestByEmittingError(e);} else {/** Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.*/if (e instanceof HystrixBadRequestException) {eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);return Observable.error(e);}return handleFailureViaFallback(e);}}};final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {@Overridepublic void call(Notification<? super R> rNotification) {setRequestContextIfNeeded(currentRequestContext);}};Observable<R> execution;if (properties.executionTimeoutEnabled().get()) {execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));} else {execution = executeCommandWithSpecifiedIsolation(_cmd);}//注冊各種回調(diào)函數(shù)return execution.doOnNext(markEmits).doOnCompleted(markOnCompleted).onErrorResumeNext(handleFallback).doOnEach(setRequestContext);}

    ?

    6. 小結(jié)

    ?

    • Hystrix 是基于單機(jī)應(yīng)用的熔斷限流框架

    • 根據(jù)熔斷器的滑動窗口判斷當(dāng)前請求是否可以執(zhí)行

    • 線程競爭實(shí)現(xiàn)“半關(guān)閉”狀態(tài),拿一個請求試試是否可以關(guān)閉熔斷器

    • 線程池隔離將請求丟到線程池中運(yùn)行,限流依靠線程池拒絕策略

    • 信號量隔離在當(dāng)前線程中運(yùn)行,限流依靠并發(fā)請求數(shù)

    • 當(dāng)信號量競爭失敗/線程池隊(duì)列滿,就進(jìn)入限流模式,執(zhí)行 Fallback

    • 當(dāng)熔斷器開啟,就熔斷請求,執(zhí)行 Fallback 

    • 整個框架采用的 RxJava 的編程模式,回調(diào)函數(shù)滿天飛 

    總結(jié)

    以上是生活随笔為你收集整理的关于“豪猪”,你理解的透彻吗?【Hystrix是个什么玩意儿】的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。