日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

pthread异步_探索 Flutter 异步消息的实现

發布時間:2025/3/20 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 pthread异步_探索 Flutter 异步消息的实现 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文作者:趙旭陽
字節跳動資深工程師

一、簡介

我們在進行 Android 開發的時候,會通過創建一個 Handler 并調用其 sendMessage ?或 Post 方法來進行異步消息調用,其背后涉及到了三個面試經常被問的類:Handler,Looper,MessageQueue,內部原理我想做過 Android 開發的基本都了解。
Flutter 使用 dart 開發,其也有類似的異步消息機制,具體參見這篇文章:https://dart.dev/articles/archive/event-loop,按這個文章的說法,在 Dart 應用中有一個事件循環(message loop)和兩個隊列(Event queue 和 Microtask queue)。

  • Event queue : 包含了所有的外部事件:I/O,鼠標點擊,繪制,定時器,Dart isolate 的消息等,其實這塊又根據消息的優先級細分成了兩個隊列,后面會有介紹。

  • Microtask queue :事件處理代碼有時需要在當前 event 之后,且在下一個 event 之前做一些任務。

    • 兩種隊列的消息處理流程大致如圖所示:

    dart 提供了 dart:async 庫來對這兩個隊列進行操作,主要是如下兩個API:

  • Future 類,創建一個定時器事件到 Event queue 尾部。

  • scheduleMicrotask() 方法,添加一個事件到 Microtask queue 尾部

  • 下面將會分析這兩個 API 背后的實現,涉及的是 flutter engine 的源碼,可以參考官方的 wiki 來下載:
    https://github.com/flutter/flutter/wiki/Setting-up-the-Engine-development-environment。

    二、初始化

    1. ?創建 MessageLoop

    • 在啟動 Flutter 的時候,引擎會額外創建三個線程:UI Thread,IO Thread, GPU Thread,并為每個線程都創建一個 MessageLoop,之后各個線程就進入的消息循環的狀態,等待新的消息來處理,具體流程如下:
      /src/flutter/shell/platform/android/android_shell_holder.cc

    ThreadHost?thread_host_;?
    AndroidShellHolder::AndroidShellHolder(
    ????flutter::Settings?settings,
    ????fml::jni::JavaObjectWeakGlobalRef?java_object,
    ????bool?is_background_view)
    ????:?settings_(std::move(settings)),?java_object_(java_object)?{
    ......
    ??if?(is_background_view)?{
    ????thread_host_?=?{thread_label,?ThreadHost::Type::UI};
    ??}?else?{
    ????//?創建三個線程
    ????thread_host_?=?{thread_label,?ThreadHost::Type::UI?|?ThreadHost::Type::GPU?|
    ??????????????????????????????????????ThreadHost::Type::IO};
    ??}
    ......
    }
    • 進一步分析 ThreadHost 的創建,最后來到了創建 Thread,每個 Thread 都會創建一個 MessageLoop 并獲取其 TaskRunner,TaskRunner 是用來外部向 MessageLoop 中 Post Task 的。順帶說一下,在 Android 上 MessageLoop 的實現還是使用的系統自身的 Looper 機制,這里是通過 NDK 的 ALooper 相關接口來實現的。具體代碼如下:
      /src/flutter/fml/thread.cc

    Thread::Thread(const?std::string&?name)?:?joined_(false)?{
    ??fml::AutoResetWaitableEvent?latch;
    ??fml::RefPtr<:taskrunner>?runner;
    ??thread_?=?std::make_unique<:thread>([&latch,?&runner,?name]()?->?void?{
    ????SetCurrentThreadName(name);//?創建?MessageLoop
    ????fml::MessageLoop::EnsureInitializedForCurrentThread();
    ????auto&?loop?=?MessageLoop::GetCurrent();//?獲取?TaskRunner
    ????runner?=?loop.GetTaskRunner();
    ????latch.Signal();
    ????loop.Run();
    ??});
    ??latch.Wait();
    ??task_runner_?=?runner;
    }
    • MessageLoop 創建好后,我們就可以通過 TaskRunner 向其發送 Task 了,這里需要注意 MessageLoop 執行的 Task 僅是一個 無參的閉包 。類似這樣:

    auto?jni_exit_task([key?=?thread_destruct_key_]()?{
    ??FML_CHECK(pthread_setspecific(key,?reinterpret_cast<void*>(1))?==?0);
    });
    thread_host_.ui_thread->GetTaskRunner()->PostTask(jni_exit_task);

    2. ?創建 Root Isolate

    在 dart 語言中是沒有線程的,而是使用類似于線程但相互之間不共享堆內存的 isolate,代表一個獨立的 dart 程序執行環境。同樣 Flutter 的 dart 代碼是運行在一個叫 root isolate 的 isolate 中,下面簡要列下 root isolate 的創建過程。

    a.啟動 dart vm

    這個步驟的具體流程,大家可以順著 /engine-v1.5.4/src/flutter/runtime/dart_vm.cc 中的 DartVM 構造方法去跟進分析。在 dart vm 啟動過程中會創建 vm isolate 和 PortMap,這兩個的具體作用下面有介紹。

    b.創建 root isolate

    root isolate 是在 UI 線程中創建的,具體流程見 /src/flutter/runtime/dart_isolate.cc 的 CreateRootIsolate 方法。由于 isolate 是對當前線程執行環境的一個抽象表示,所以其內部存儲了很多信息,對于異步消息這塊有四個關鍵的信息是需要注意的。

    下面三個字段是定義在 dart vm 層的 Isolate 類中,具體見 /src/third_party/dart/runtime/vm/isolate.h。

    • main_port :可以看做該 isolate 的標識,是一個整數;

    • message_handler :顧名思義,用來管理每個 isolate 的 Event queue,其內部根據 message 的優先級將消息分為了兩個隊列:普通優先級的 queue 和 OOB 優先級的 oob_queue。了解 TCP 協議的應該了解 TCP 中有帶外數據(即優先數據),isolate 的 OOB 優先級也是類似的意思,OOB 優先級的消息會被優先處理,目前看到有這么幾種 OOB 消息:

    //?這些主要和?isolate?的生命周期相關
    kPauseMsg?=?1,
    kResumeMsg?=?2,
    kPingMsg?=?3,
    kKillMsg?=?4,
    kAddExitMsg?=?5,
    kDelExitMsg?=?6,
    kAddErrorMsg?=?7,
    kDelErrorMsg?=?8,
    kErrorFatalMsg?=?9,
    //?可以注意下?kLowMemoryMsg?,如果有大量?OOB?懷疑是內存不夠了
    kInterruptMsg?=?10,?????//?Break?in?the?debugger.
    kInternalKillMsg?=?11,??//?Like?kill,?but?does?not?run?exit?listeners,?etc.
    kLowMemoryMsg?=?12,?????//?Run?compactor,?etc.
    kDrainServiceExtensionsMsg?=?13,??//?Invoke?pending?service?extensions
    • message_notify_callback :message_handler 收到消息后會調用該變量指向的函數去處理;

    Flutter 引擎層會對 dart vm 的 isolate 實例做一層包裝( DartIsolate 類),其內部定義了:

    • microtask_queue: 用來存儲 microtask 消息,可見在 Flutter 引擎中,Microtask queue 并不是由 dart vm 層來管理的

      前面已經說過 isolate 之間是不能直接互相訪問,如圖:

    可以看出 isolate 之間是共享 vm isolate 的堆內存區域的,有點類似于操作系統的內核空間,vm isolate 的堆內存存儲了 dart vm 內部的核心數據(內置類,內置對象)。除了 vm isolate,不同 isolate 之間的堆內存是不能直接訪問的,為此 dart vm 提供了 isolate 之間的通信機制,負責通信路由的大管家就是 PortMap,其內部實現就是一張 Hash 表,Key 為 isolate 的 main_port,Value 為 isolate 的 message_handler。

    三、創建 Future

    使用 dart 開發一定會用到 Future,當我們通過 Future 構造方法創建一個實例的時候,就會創建一個定時器消息到 Event queue,下面我們將分析這個流程。整體架構圖:

    1. ?dart 層創建 Future

    創建 Future 的時候,內部會通過 Timer 構造一個定時器,具體代碼如下:

    /src/out/host_release/dart-sdk/lib/async/future.dart

    factory?Future(FutureOr?computation())?{
    ??_Future?result?=?new?_Future();
    ??Timer.run(()?{try?{
    ??????result._complete(computation());
    ????}?catch?(e,?s)?{
    ??????_completeWithErrorCallback(result,?e,?s);
    ????}
    ??});return?result;
    }

    跟進 Timer 的實現,具體代碼如下:

    /src/out/host_release/dart-sdk/lib/async/timer.dart

    static?void?run(void?callback())?{
    ??new?Timer(Duration.zero,?callback);
    }

    factory?Timer(Duration?duration,?void?callback())?{
    ??if?(Zone.current?==?Zone.root)?{
    ????return?Zone.current.createTimer(duration,?callback);
    ??}
    ......
    }

    這里假定 duration == Duration.zero,Zone.current == Zone.root ,進而到 rootZone 的 createTimer 方法,里面又調用了 Timer 的 _createTimer 方法:

    /src/out/host_release/dart-sdk/lib/async/zone.dart

    class?_RootZone?extends?_Zone?{
    ......
    ??Timer?createTimer(Duration?duration,?void?f())?{
    ????return?Timer._createTimer(duration,?f);
    ??}
    ......
    }

    /src/out/host_release/dart-sdk/lib/async/timer.dart

    external?static?Timer?_createTimer(Duration?duration,?void?callback());

    可以看到 _createTimer 方法是個 external 的,按照 dart 語言的規范,external 方法的實現都在對應的 patch 文件中( timer_patch.dart),內部通過 _TimerFactory._factory 來創建 Timer,具體代碼如下:

    /src/third_party/dart/runtime/lib/timer_patch.dart

    @patch
    class?Timer?{
    ......
    ??@patch
    ??static?Timer?_createTimer(Duration?duration,?void?callback())?{
    ????if?(_TimerFactory._factory?==?null)?{
    ??????_TimerFactory._factory?=?VMLibraryHooks.timerFactory;
    ????}
    ......
    ????int?milliseconds?=?duration.inMilliseconds;
    ????if?(milliseconds?0)?milliseconds?=?0;
    ???//?注意此處將外部的?callback?又包了一層
    ????return?_TimerFactory._factory(milliseconds,?(_)?{
    ??????callback();
    ????},?false);
    ??}
    ......
    }

    通過上面的代碼,我們知道 _TimerFactory._factory = VMLibraryHooks.timerFactory, VMLibraryHooks.timerFactory 又是在 root isolate 初始化時通過調用 _setupHooks 方法設置的,具體代碼如下:

    /src/third_party/dart/runtime/lib/timer_impl.dart

    @pragma("vm:entry-point",?"call")
    _setupHooks()?{
    ??VMLibraryHooks.timerFactory?=?_Timer._factory;
    }
    //?VMLibraryHooks.timerFactory?指向的該方法
    //?我們假設創建的是非?repeating?消息,并且?milliSeconds?為?0
    static?Timer?_factory(
    ??????int?milliSeconds,?void?callback(Timer?timer),?bool?repeating)?{
    ......
    ????return?new?_Timer(milliSeconds,?callback);
    ??}
    }

    factory?_Timer(int?milliSeconds,?void?callback(Timer?timer))?{
    ??return?_createTimer(callback,?milliSeconds,?false);
    }
    //?創建一個?Timer?實例并調用?_enqueue?將其加入到隊列
    static?Timer?_createTimer(
    ????void?callback(Timer?timer),?int?milliSeconds,?bool?repeating)?{
    ......
    ??_Timer?timer?=
    ??????new?_Timer._internal(callback,?wakeupTime,?milliSeconds,?repeating);
    ??timer._enqueue();
    ??return?timer;
    }

    _Timer._internal(
    ????this._callback,?this._wakeupTime,?this._milliSeconds,?this._repeating)
    ????:?_id?=?_nextId();

    //?這里?_milliSeconds?==?0,會向?ZeroTimer?隊列插入消息,然后調用?_notifyZeroHandler
    void?_enqueue()?{
    ??if?(_milliSeconds?==?0)?{
    ????if?(_firstZeroTimer?==?null)?{
    ??????_lastZeroTimer?=?this;
    ??????_firstZeroTimer?=?this;
    ????}?else?{
    ??????_lastZeroTimer._indexOrNext?=?this;
    ??????_lastZeroTimer?=?this;
    ????}
    ????//?Every?zero?timer?gets?its?own?event.
    ????_notifyZeroHandler();
    ??}?else?{
    ???......
    ????//?延遲消息這里先不分析
    ??}
    }

    折騰了一大圈,最后只是構造了一個 _Timer 實例并把其加入到 ZeroTimer 隊列中,如果是延遲消息則會加入到 TimeoutTimerHeap 中,最后調用 _notifyZeroHandler 方法, 其主要做如下操作:

    • 創建 RawReceivePort 并設置一個叫 _handleMessage 方法做為引擎層的回調方法

    • 向引擎層 Event queue 發送一個普通優先級的 ?_ZERO_EVENT ,引擎層處理該消息的時候會最終回調到上面設置的 _handleMessage 方法。

    具體代碼如下:

    /src/third_party/dart/runtime/lib/timer_impl.dart

    static?void?_notifyZeroHandler()?{
    ??if?(_sendPort?==?null)?{
    ????_createTimerHandler();
    ??}
    //?底層會調到?PortMap?的?PostMessage?方法,進而喚醒消息處理,后面會分析這個流程
    ??_sendPort.send(_ZERO_EVENT);
    }
    //?創建和引擎層通信的?RawReceivePort,并設置引擎層的回調方法?_handleMessage
    static?void?_createTimerHandler()?{
    ??assert(_receivePort?==?null);
    ??assert(_sendPort?==?null);
    ??_receivePort?=?new?RawReceivePort(_handleMessage);
    ??_sendPort?=?_receivePort.sendPort;
    ??_scheduledWakeupTime?=?null;
    }

    /src/third_party/dart/runtime/lib/isolate_patch.dart

    @patch
    class?RawReceivePort?{
    ??@patch
    ??factory?RawReceivePort([Function?handler])?{
    ????_RawReceivePortImpl?result?=?new?_RawReceivePortImpl();
    ????result.handler?=?handler;
    ????return?result;
    ??}
    }

    //?最終將回調設置到?_RawReceivePortImpl?的?_handlerMap?中,引擎層會從這個?map?尋找消息的?handler
    @pragma("vm:entry-point")
    class?_RawReceivePortImpl?implements?RawReceivePort?{
    ????void?set?handler(Function?value)?{
    ??????_handlerMap[this._get_id()]?=?value;
    ????}
    }

    _handleMessage 回調方法會收集 Timer 并執行,具體代碼實現如下:

    /src/third_party/dart/runtime/lib/timer_impl.dart

    static?void?_handleMessage(msg)?{
    ??var?pendingTimers;
    ??if?(msg?==?_ZERO_EVENT)?{
    ????//?找到所有的待處理?Timers
    ????pendingTimers?=?_queueFromZeroEvent();
    ????assert(pendingTimers.length?>?0);
    ??}?else?{
    ????......
    ????//?延時消息這里不分析
    ??}
    //?處理Timer,即調用設置的?callback
    ??_runTimers(pendingTimers);
    ......
    }

    2. ?向 Event Queue 發送消息

    前面說到 RawReceiverPort 會向引擎層 Event queue 發送一個 _ZERO_EVENT ?,其內部是通過調用 PortMap 的 PostMessage 方法將消息發送到 Event queue,該方法首先會根據接收方的 port id 找到對應的 message_handler,然后將消息根據優先級保存到相應的 queue 中,最后喚醒 message_notify_callback 回調函數 ,具體代碼如下:

    /src/third_party/dart/runtime/vm/port.cc

    bool?PortMap::PostMessage(Message*?message,?bool?before_events)?{
    ??......
    ??intptr_t?index?=?FindPort(message->dest_port());
    ??......
    ??MessageHandler*?handler?=?map_[index].handler;
    ?......
    ??handler->PostMessage(message,?before_events);
    ??return?true;
    }

    /src/third_party/dart/runtime/vm/message_handler.cc

    void?MessageHandler::PostMessage(Message*?message,?bool?before_events)?{
    ??Message::Priority?saved_priority;
    ??bool?task_running?=?true;
    ?......
    ??//?根據消息優先級進入不同的隊列
    ????if?(message->IsOOB())?{
    ??????oob_queue_->Enqueue(message,?before_events);
    ????}?else?{
    ??????queue_->Enqueue(message,?before_events);
    ????}
    ???......
    //喚醒并處理消息
    ??MessageNotify(saved_priority);
    }

    /src/third_party/dart/runtime/vm/isolate.cc

    void?IsolateMessageHandler::MessageNotify(Message::Priority?priority)?{
    ??if?(priority?>=?Message::kOOBPriority)?{
    ????I->ScheduleInterrupts(Thread::kMessageInterrupt);
    ??}
    //?最后調用的?message_notify_callback?所指向的函數
    ??Dart_MessageNotifyCallback?callback?=?I->message_notify_callback();
    ??if?(callback)?{
    ????(*callback)(Api::CastIsolate(I));
    ??}
    }

    3. ? Event Queue 消息處理

    前面消息已經發送成功并調用了消息處理喚醒的操作,下面我們需要知道 message_notify_callback 所指向的函數的實現, root isolate 在初始化時會設置該變量,具體代碼如下:

    /src/flutter/runtime/dart_isolate.cc

    bool?DartIsolate::Initialize(Dart_Isolate?dart_isolate,?bool?is_root_isolate)?{
    ??......
    ??//?設置?message?handler?的?task?runner?為?UI?Task?Runner
    ??SetMessageHandlingTaskRunner(GetTaskRunners().GetUITaskRunner(),
    ???????????????????????????????is_root_isolate);
    ??......
    ??return?true;
    }
    void?DartIsolate::SetMessageHandlingTaskRunner(
    ????fml::RefPtr<:taskrunner>?runner,bool?is_root_isolate)?{
    ?......
    ??message_handler().Initialize(
    ??????[runner](std::function<void()>?task)?{?runner->PostTask(task);?});
    }

    進一步跟進分析發現通過 Dart_SetMessageNotifyCallback 將 ?root isolate 的 message_notify_callback 設置為 MessageNotifyCallback 方法,具體代碼如下:

    /src/third_party/tonic/dart_message_handler.cc

    void?DartMessageHandler::Initialize(TaskDispatcher?dispatcher)?{
    ??TONIC_CHECK(!task_dispatcher_?&&?dispatcher);
    ??task_dispatcher_?=?dispatcher;
    ??Dart_SetMessageNotifyCallback(MessageNotifyCallback);
    }

    MessageNotifyCallback 會在 Event queue 收到消息后執行,但其執行過程中并沒有拿到 Event queue 中的消息,而是往 UI Thread 的 MessageLoop Post 了一個 Task 閉包,這個 Task 閉包會通過調用 Dart_HandleMessage 來處理 Event queue 中的消息,具體代碼流程如下:

    /src/third_party/tonic/dart_message_handler.cc

    void?DartMessageHandler::MessageNotifyCallback(Dart_Isolate?dest_isolate)?{
    ??auto?dart_state?=?DartState::From(dest_isolate);
    ??TONIC_CHECK(dart_state);
    ??dart_state->message_handler().OnMessage(dart_state);
    }

    void?DartMessageHandler::OnMessage(DartState*?dart_state)?{
    ??auto?task_dispatcher_?=?dart_state->message_handler().task_dispatcher_;
    ??//?往?ui?線程?MessageLoop?Post?了一個?Task
    ??auto?weak_dart_state?=?dart_state->GetWeakPtr();
    ??task_dispatcher_([weak_dart_state]()?{
    ????if?(auto?dart_state?=?weak_dart_state.lock())?{
    ??????dart_state->message_handler().OnHandleMessage(dart_state.get());
    ????}
    ??});
    }

    void?DartMessageHandler::OnHandleMessage(DartState*?dart_state)?{
    ??......
    ??if?(Dart_IsPausedOnStart())?{
    ????......
    ??}?else?if?(Dart_IsPausedOnExit())?{
    ???......
    ??}?else?{
    ?????//?調用?Dart_HandleMessage?方法處理消息
    ????result?=?Dart_HandleMessage();
    ???......
    ??}
    ?......
    }

    Dart_HandleMessage 的實現很簡單,只是調用 message_handler 的 HandleNextMessage 方法,具體代碼實現如下:

    /src/third_party/dart/runtime/vm/dart_api_impl.cc

    DART_EXPORT?Dart_Handle?Dart_HandleMessage()?{
    ......
    ??if?(I->message_handler()->HandleNextMessage()?!=?MessageHandler::kOK)?{
    ????return?Api::NewHandle(T,?T->StealStickyError());
    ??}
    ??return?Api::Success();
    }

    我們進一步跟進 ?HandleNextMessage 方法的實現,最終來到如下代碼:

    /src/third_party/dart/runtime/vm/message_handler.cc

    //?依次遍歷?message_handler?的消息隊列,對每個消息進程處理
    MessageHandler::MessageStatus?MessageHandler::HandleMessages(
    ????MonitorLocker*?ml,
    ????bool?allow_normal_messages,
    ????bool?allow_multiple_normal_messages)?{
    ......
    ??Message*?message?=?DequeueMessage(min_priority);
    ??while?(message?!=?NULL)?{
    ????......
    ????MessageStatus?status?=?HandleMessage(message);
    ???......
    ????message?=?DequeueMessage(min_priority);
    ??}
    ??return?max_status;
    }

    //?取消息的時候會優先處理?OOB?Message
    Message*?MessageHandler::DequeueMessage(Message::Priority?min_priority)?{
    ??Message*?message?=?oob_queue_->Dequeue();
    ??if?((message?==?NULL)?&&?(min_priority?????message?=?queue_->Dequeue();
    ??}
    ??return?message;
    }

    每個消息的處理都是在 HandleMessage 方法中,該方法會根據不同的消息優先級做相應的處理,具體代碼如下:

    /src/third_party/dart/runtime/vm/isolate.cc

    MessageHandler::MessageStatus?IsolateMessageHandler::HandleMessage(
    ????Message*?message)?{
    ......
    ??Object&?msg_handler?=?Object::Handle(zone);
    //?非?OOB?消息,需要獲取?dart?層的??handler?函數
    ??if?(!message->IsOOB()?&&?(message->dest_port()?!=?Message::kIllegalPort))?{
    ????msg_handler?=?DartLibraryCalls::LookupHandler(message->dest_port());
    ????......
    ??}
    ......
    ??MessageStatus?status?=?kOK;
    ??if?(message->IsOOB())?{
    ???//?處理?OOB?消息,詳細實現可自己看代碼,這里不分析OOB消息
    ???......
    ??}?else?if?(message->dest_port()?==?Message::kIllegalPort)?{
    ????......
    ??}?else?{
    ????......
    ????//?調用前面找到的?msg_handler?來處理普通消息
    ????const?Object&?result?=
    ????????Object::Handle(zone,?DartLibraryCalls::HandleMessage(msg_handler,?msg));
    ???......
    ??}
    ??delete?message;
    ??return?status;
    }

    這里我們主要看普通消息的處理邏輯,首先會通過調用 DartLibraryCalls::LookupHandler 方法來從 dart 層尋找相應的 handler 函數,然后通過 DartLibraryCalls::HandleMessage 執行相應的處理函數,具體實現代碼如下:

    /src/third_party/dart/runtime/vm/dart_entry.cc

    RawObject*?DartLibraryCalls::LookupHandler(Dart_Port?port_id)?{
    ??Thread*?thread?=?Thread::Current();
    ??Zone*?zone?=?thread->zone();
    ??Function&?function?=?Function::Handle(
    ??????zone,?thread->isolate()->object_store()->lookup_port_handler());
    ??const?int?kTypeArgsLen?=?0;
    ??const?int?kNumArguments?=?1;
    //?如果沒有消息處理方法,則進行查找,最終找到的是 RawReceivePortImpl 的?_lookupHandler 方法。
    ??if?(function.IsNull())?{
    ????Library&?isolate_lib?=?Library::Handle(zone,?Library::IsolateLibrary());
    ????ASSERT(!isolate_lib.IsNull());
    ????const?String&?class_name?=?String::Handle(
    ????????zone,?isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));
    ????const?String&?function_name?=?String::Handle(
    ????????zone,?isolate_lib.PrivateName(Symbols::_lookupHandler()));
    ????function?=?Resolver::ResolveStatic(isolate_lib,?class_name,?function_name,
    ???????????????????????????????????????kTypeArgsLen,?kNumArguments,
    ???????????????????????????????????????Object::empty_array());
    ????ASSERT(!function.IsNull());
    ????thread->isolate()->object_store()->set_lookup_port_handler(function);
    ??}
    //?執行消息處理函數
    ??const?Array&?args?=?Array::Handle(zone,?Array::New(kNumArguments));
    ??args.SetAt(0,?Integer::Handle(zone,?Integer::New(port_id)));
    ??const?Object&?result?=
    ??????Object::Handle(zone,?DartEntry::InvokeFunction(function,?args));
    ??return?result.raw();
    }

    最終執行的是 RawReceivePortImpl 的 _lookupHandler 方法,在前面在創建 Future 的時候我們已經設置 _handleMessage 到 _handlerMap 中,_lookupHandler 方法會從 _handlerMap 中找到設置的回調方法,最后執行回調方法。具體代碼如下:

    /src/third_party/dart/runtime/lib/isolate_patch.dart

    @pragma("vm:entry-point")
    class?_RawReceivePortImpl?implements?RawReceivePort?{
    ?......
    ??//?Called?from?the?VM?to?retrieve?the?handler?for?a?message.
    ??@pragma("vm:entry-point",?"call")
    ??static?_lookupHandler(int?id)?{
    ????var?result?=?_handlerMap[id];
    ????return?result;
    ??}
    ......
    }

    Future 的創建到這就分析完了,整個過程涉及到了 EventQueue 的消息收發。
    上面主要分析了非延遲消息的處理,如果是延遲的 Timer 會怎么處理呢?這里簡單提一下,在 dart vm 內部還有個 EventHandler 線程,如果是延遲消息則會通過管道向這個線程寫入延遲數據,這個線程會負責延遲計數,到時間了就往引擎層 Post 喚醒消息,具體代碼可參見/src/third_party/dart/runtime/bin/eventhandler.cc,這里就不再贅述,感興趣的可以自行分析。

    至此,通過分析 Future 我們已經把 Event Queue 的消息處理流程了解了。

    四、Microtask

    1. ?向 ?Microtask queue 發送消息

    假設 Zone. current 為 rootZone, 直接看 scheduleMicrotask 方法的實現:

    /src/third_party/dart/sdk/lib/async/schedule_microtask.dart

    void?scheduleMicrotask(void?callback())?{
    ??_Zone?currentZone?=?Zone.current;
    ??if?(identical(_rootZone,?currentZone))?{
    ????_rootScheduleMicrotask(null,?null,?_rootZone,?callback);
    ????return;
    ??}
    ......
    }

    跟進 _rootScheduleMicrotask 方法的實現,最終來到 _scheduleAsyncCallback 方法,該方法做了兩件事情:

    • 將傳入的 callback 加入 callback 隊列

    • 將 _startMicrotaskLoop 作為閉包參數調用 ?_AsyncRun._scheduleImmediate 方法,_startMicrotaskLoop 中會依次執行 callback 隊列保存的回調。

    具體代碼如下:

    /src/third_party/dart/sdk/lib/async/schedule_microtask.dart

    void?_scheduleAsyncCallback(_AsyncCallback?callback)?{
    ??_AsyncCallbackEntry?newEntry?=?new?_AsyncCallbackEntry(callback);
    ??if?(_nextCallback?==?null)?{
    ????_nextCallback?=?_lastCallback?=?newEntry;
    ????if?(!_isInCallbackLoop)?{
    ??????_AsyncRun._scheduleImmediate(_startMicrotaskLoop);
    ????}
    ??}?else?{
    ????_lastCallback.next?=?newEntry;
    ????_lastCallback?=?newEntry;
    ??}
    }
    //?該方法被作為回調設置到引擎,會在處理所有的?Microtask?的時候執行?
    void?_startMicrotaskLoop()?{
    ??_isInCallbackLoop?=?true;
    ??try?{
    ????_microtaskLoop();
    ??}?finally?{
    ????_lastPriorityCallback?=?null;
    ????_isInCallbackLoop?=?false;
    ????if?(_nextCallback?!=?null)?{
    ??????_AsyncRun._scheduleImmediate(_startMicrotaskLoop);
    ????}
    ??}
    }

    class?_AsyncRun?{
    ??external?static?void?_scheduleImmediate(void?callback());
    }

    根據前面的經驗,會在對應的 patch 文件中找到 _AsyncRun._scheduleImmediate 的實現,其內部調用了 _ScheduleImmediate._closure 指向的方法。

    具體代碼如下:

    /src/third_party/dart/runtime/lib/schedule_microtask_patch.dart

    @patch
    class?_AsyncRun?{
    ??@patch
    ??static?void?_scheduleImmediate(void?callback())?{
    ????if?(_ScheduleImmediate._closure?==?null)?{
    ??????throw?new?UnsupportedError("Microtasks?are?not?supported");
    ????}
    ????_ScheduleImmediate._closure(callback);
    ??}
    }
    //?通過該方法設置?_ScheduleImmediate._closure
    @pragma("vm:entry-point",?"call")
    void?_setScheduleImmediateClosure(_ScheduleImmediateClosure?closure)?{
    ??_ScheduleImmediate._closure?=?closure;
    }

    那么 _ScheduleImmediate._closure 指向的是什么呢?我們需要找到 _setScheduleImmediateClosure 的調用方。root isolate 初始化時會執行一系列的 vm hook 調用,我們從中找到了 _setScheduleImmediateClosure 的調用,具體代碼如下:

    /src/flutter/lib/ui/dart_runtime_hooks.cc

    static?void?InitDartAsync(Dart_Handle?builtin_library,?bool?is_ui_isolate)?{
    ??Dart_Handle?schedule_microtask;
    ??if?(is_ui_isolate)?{
    //?這里的?builtin_library?是?Flutter?擴展的?ui?library
    ????schedule_microtask?=
    ????????GetFunction(builtin_library,?"_getScheduleMicrotaskClosure");
    ??}?else?{
    ????......
    ??}
    ??Dart_Handle?async_library?=?Dart_LookupLibrary(ToDart("dart:async"));
    ??Dart_Handle?set_schedule_microtask?=?ToDart("_setScheduleImmediateClosure");
    ??Dart_Handle?result?=?Dart_Invoke(async_library,?set_schedule_microtask,?1,
    ???????????????????????????????????&schedule_microtask);
    ??PropagateIfError(result);
    }

    進一步跟進,最終找到了 _ScheduleImmediate._closure 指向的方法,是一個 native 實現的函數,具體代碼如下:

    /src/flutter/lib/ui/natives.dart

    Function?_getScheduleMicrotaskClosure()?=>?_scheduleMicrotask;?

    void?_scheduleMicrotask(void?callback())?native?'ScheduleMicrotask';

    跟進 _scheduleMicrotask 的 native 實現,發現其會把傳入的 _startMicrotaskLoop 方法加入到底層的Microtask queue,具體代碼如下:

    /src/flutter/lib/ui/dart_runtime_hooks.cc

    void?ScheduleMicrotask(Dart_NativeArguments?args)?{
    ??Dart_Handle?closure?=?Dart_GetNativeArgument(args,?0);
    ??UIDartState::Current()->ScheduleMicrotask(closure);
    }

    /src/flutter/lib/ui/ui_dart_state.cc

    void?UIDartState::ScheduleMicrotask(Dart_Handle?closure)?{
    ??if?(tonic::LogIfError(closure)?||?!Dart_IsClosure(closure))?{
    ????return;
    ??}
    ??microtask_queue_.ScheduleMicrotask(closure);
    }

    2. ?Microtask queue 消息處理

    前面已經將 _startMicrotaskLoop 方法加入到了 Microtask queue ,那么 Microtask queue 內的方法何時執行呢?我們通過跟進 Microtask queue ?的 RunMicrotasks 方法的調用方,最終找到 Microtask queue 內方法的執行時機 FlushMicrotasksNow,具體代碼如下:

    /src/flutter/lib/ui/ui_dart_state.cc

    void?UIDartState::FlushMicrotasksNow()?{
    ??microtask_queue_.RunMicrotasks();
    }

    再跟進 FlushMicrotasksNow 方法的調用方,發現有兩處調用:

    • 這里是在每一幀開始的時候去執行 Microtask

    /src/flutter/lib/ui/window/window.cc

    void?Window::BeginFrame(fml::TimePoint?frameTime)?{
    ......
    ??UIDartState::Current()->FlushMicrotasksNow();
    ......
    }
    • 另外一處調用是通過 TaskObserve 的形式,具體代碼如下:

    /src/flutter/lib/ui/ui_dart_state.cc

    void?UIDartState::AddOrRemoveTaskObserver(bool?add)?{
    ......
    ??if?(add)?{
    //?這個 add_callback_?是啥呢?
    ????add_callback_(reinterpret_cast<intptr_t>(this),
    ??????????????????[this]()?{?this->FlushMicrotasksNow();?});
    ??}?else?{
    ????remove_callback_(reinterpret_cast<intptr_t>(this));
    ??}
    }

    跟進 add_callback_ 的賦值,這里是android的實現

    /src/flutter/shell/platform/android/flutter_main.cc

    void?FlutterMain::Init(JNIEnv*?env,
    ???????????????????????jclass?clazz,
    ???????????????????????jobject?context,
    ???????????????????????jobjectArray?jargs,
    ???????????????????????jstring?bundlePath,
    ???????????????????????jstring?appStoragePath,
    ???????????????????????jstring?engineCachesPath)?{
    ?......
    ??settings.task_observer_add?=?[](intptr_t?key,?fml::closure?callback)?{
    ????fml::MessageLoop::GetCurrent().AddTaskObserver(key,?std::move(callback));
    ??};
    ......
    }

    FlushMicrotasksNow() 是作為 MessageLoop 的 TaskObserver 來執行的, TaskObserver 會在處理完task之后把該 Task 創建的 MicroTask 全部執行,也就是說在下一個 Task 運行前執行。代碼如下:

    /src/flutter/fml/message_loop_impl.cc

    void?MessageLoopImpl::FlushTasks(FlushType?type)?{
    ......
    ??for?(const?auto&?invocation?:?invocations)?{
    ????invocation();
    ????for?(const?auto&?observer?:?task_observers_)?{
    ??????observer.second();
    ????}
    ??}
    }

    五、結束

    通過前面的分析,Flutter 的異步消息處理流程還是挺復雜的,主要是代碼寫的比較亂,跳轉層次太多,可以通過對整個流程的掌控來尋找UI 線程的優化及監控點,進而降低 UI 線程的處理時間,希望本篇文章讓大家對 Flutter 的異步消息的整體處理流程有更深的理解。

    推薦閱讀
    你們吹捧的鴻蒙,只是另一個Fuchsia
    Android仿微信QQ圖片裁剪
    互聯網 HR 黑話大全,太真實了!

    編程·思維·職場
    歡迎掃碼關注

    總結

    以上是生活随笔為你收集整理的pthread异步_探索 Flutter 异步消息的实现的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。