日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Web3j监听功能代码研究

發布時間:2024/1/1 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Web3j监听功能代码研究 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Web3j監聽功能代碼研究

高洪濤 2021-03-19

?

本周深入研究了web3j工具包實現以太坊的監聽功能,實現了交易監聽和代幣監聽的方法,對監聽過程中的常見問題進行了處理,本文就是對這部分開發經驗的總結。

1 web3j 版本

Web3官網:https://www.web3labs.com/web3j-sdk

Docs: https://docs.web3j.io/latest/quickstart/

?

我使用了3個版本的web3j, 3.6、4.5.5、4.8.4,分別進行說明。

1.1 Web3j 3.6版本

3.6版本可以實現各種監聽。來源已經搞不清楚了,是一個文件夾,包含有許多jar文件。

?

這個文件夾打包鏈接:?

使用時需要添加到編譯路徑中。

?

1.2 4.5.5版本

我下載了4.5.5版本工具包,只有一個文件,里面包含了各種jar包:console-4.5.5-all.jar

?

經過測試,該包可以正常的完成查詢和交易,但是無法監聽,提示缺少rxjava相關jar,我就沒有再折騰,放棄了。

?

1.3 4.8.4版本

該版本可以實現監聽。從官網自動下載,maven工程中添加依賴:

<dependency>

????? ? <groupId>org.web3j</groupId>

????? ? <artifactId>core</artifactId>

????? ? <version>4.8.4</version>

?? ? </dependency>

要實現監聽,還需要添加另一個依賴:

<dependency>

????? <groupId>io.reactivex.rxjava3</groupId>

????? <artifactId>rxjava</artifactId>

????? <version>3.0.11</version>

?? </dependency>

使用到了json需要添加依賴:

<!-- JSONObject對象依賴的jar 開始 -->

????? <dependency>

????????? <groupId>commons-beanutils</groupId>

????????? <artifactId>commons-beanutils</artifactId>

????????? <version>1.9.3</version>

????? </dependency>

????? <dependency>

????????? <groupId>commons-collections</groupId>

????????? <artifactId>commons-collections</artifactId>

????????? <version>3.2.1</version>

????? </dependency>

????? <dependency>

????????? <groupId>commons-lang</groupId>

????????? <artifactId>commons-lang</artifactId>

????????? <version>2.6</version>

????? </dependency>

????? <dependency>

????????? <groupId>commons-logging</groupId>

????????? <artifactId>commons-logging</artifactId>

????????? <version>1.1.1</version>

????? </dependency>

????? <dependency>

????????? <groupId>net.sf.ezmorph</groupId>

????????? <artifactId>ezmorph</artifactId>

????????? <version>1.0.6</version>

????? </dependency>

????? <dependency>

????????? <groupId>net.sf.json-lib</groupId>

????????? <artifactId>json-lib</artifactId>

????????? <version>2.2.3</version>

????????? <classifier>jdk15</classifier>

????????? <!-- jdk版本 -->

????? </dependency>

?? ?<!-- Json依賴架包下載結束 -->

2 3.6版本監聽

?

我認為監聽有3種類型,分別是:

代幣監聽:監聽ERC20代幣交易,從startBlock區塊開始監聽token轉賬事件

重放交易:監聽過往交易,需要指定開始和結束區塊號

交易監聽:從當前區塊開始監聽交易

其中交易監聽收到的交易事件最多,包含了代幣交易。代幣監聽優點是直接過濾指定的代幣轉賬事件,用起來方便。重放交易是查詢歷史交易記錄,可以針對某段時間查詢交易。

?

2.1 代幣監聽

一般步驟:

  • 指定監聽事件event;
  • 指定過濾器filter,包含起始區塊,代幣合約列表;
  • 啟動監聽,
  • 檢查交易地址是否是自己需要的,是的話就調用具體的事件處理函數。
  • ?

    說明: 再檢查交易地址是否是自己需要的這一步,一般做法是采用地址字符串比較,這樣非常費時間,我把關注的地址保存在hashMap中,查找時直接調用htAddress.containsKey(fromAddress),這樣速度最快。

    ?

    public List<String> contracts;? //代幣合約地址列表,可以存放多個地址

    public Subscription tokenSubscription;?? //token事件訂閱對象

    public Subscription ethMissSubscription; //ETH交易空檔事件訂閱對象

    public Subscription ethSubscription;???? //ETH交易事件訂閱對象

    /*啟動監聽, startBlock區塊開始監聽token轉賬事件

    ?????????? 代幣監聽會出現的問題: 如果啟動區塊距離當前區塊稍遠,非常可能的情況是中間出現的交易太多,監視代碼內部出現空指針異常。

    ?????????? 如果監聽啟動時接近當前區塊問題出現概率小。

    ?? ?*/

    ??? public void startTransferListen_Token(BigInteger startBlock) {

    ????? // 要監聽的合約事件

    ????? Event event = new Event("Transfer",

    ???????????? Arrays.asList(

    ?????????????????? new TypeReference<Address>() {},

    ?????????????????? new TypeReference<Address>() {},

    ?????????????????? new TypeReference<Uint>(){}));

    ????? //過濾器

    ????? EthFilter filter = new EthFilter(

    ???????????? DefaultBlockParameter.valueOf(startBlock),

    ???????????? DefaultBlockParameterName.LATEST,

    ???????????? contracts);

    ????? filter.addSingleTopic(EventEncoder.encode(event));

    ?????

    ??????? //注冊監聽,解析日志中的事件?????

    ????? block_TokenSub = startBlock.intValue();

    ??????????? tokenSubscription = web3j.ethLogObservable(filter).subscribe(log -> {

    ??????????? ?

    ??????? ???? block_TokenSub = log.getBlockNumber().intValue();

    ?

    ??????? ???? String token = log.getAddress();? //這是Token合約地址??????????? ??

    ?? ??????????? String txHash = log.getTransactionHash();

    ?? ??????????? List<String> topics = log.getTopics();? // 提取轉賬記錄???????????????

    ?? ??????????? String fromAddress = "0x"+topics.get(1).substring(26);

    ?? ????????? ??String toAddress = "0x"+topics.get(2).substring(26);

    ?? ???????????

    ?? ??????????? System.out.println("? ---token ="+token+",? txHash ="+txHash);

    ?? ???????????

    ?? ??????????? //檢查發送地址、接收地址是否屬于系統用戶, 不是系統用戶就不予處理

    ?? ??????????? if(htAddress.containsKey(fromAddress) || htAddress.containsKey(toAddress)) {???????????????

    ?? ??????????? ? String value1 = log.getData();

    ?? ??????????? ? BigInteger big = new BigInteger(value1.substring(2), 16);

    ?? ??????????? ? BigDecimal value = Convert.fromWei(big.toString(), Convert.Unit.ETHER);

    ?? //??????????????????? System.out.println("value="+value);

    ?? ??????????? ? String timestamp = "";

    ?? ??????????? ?

    ?? ??????????? ? try {

    ?? ??????????? ???? EthBlock ethBlock = web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf(log.getBlockNumber()), false).send();

    ?? ??????????? ???? timestamp = String.valueOf(ethBlock.getBlock().getTimestamp());

    ?? ??????????? ? } catch (IOException e) {

    ?? ??????????? ???? System.out.println("Block timestamp get failure,block number is {}" + log.getBlockNumber());

    ?? ??????????? ???? System.out.println("Block timestamp get failure,{}"+? e.getMessage());

    ?? ??????????? ? }

    ?? ??????????? ?

    ?? ??????????? ? //執行關鍵的回調函數

    ?? ??????????? callBack_Token(token,txHash,fromAddress,toAddress,value,timestamp);

    ?? ??????????? }

    ?? ??????? }, error->{

    ?? ??????? ? System.out.println(" ### tokenSubscription?? error= "+ error);

    ?? ??????? ? error.printStackTrace();

    ?? ??????? });

    ???????? System.out.println("tokenSubscription ="+tokenSubscription);

    ???????? System.out.println(tokenSubscription.isUnsubscribed());

    ??? }

    ?

    2.2 重放交易

    重放交易功能很重要,尤其涉及充幣業務時,如果充幣運行服務器停機維護,那么在此期間的代幣充值就無法知曉造成遺漏損失。解決方法時充幣運行服務器實時記錄自己監聽的區塊高度,記錄在數據庫中,下次啟動時查找這個區塊到最新區塊之間的交易。

    說明: 當指定的區塊交易重放完畢,該監聽就自動終止。ethMissSubscription.isUnsubscribed()返回值就是false。

    //啟動監聽以太坊上的過往交易

    ??? public void startReplayListen_ETH(BigInteger startBlockNum) {

    ??? ?? System.out.println("? startReplayListen_ETH:? startBlockNum="+startBlockNum);

    ??? ?? //回放空檔期間的交易

    ??? ?? BigInteger currentBlockNum=null;

    ????? try {

    ????????? //獲取當前區塊號

    ????????? currentBlockNum = web3j.ethBlockNumber().send().getBlockNumber();

    ????????? System.out.println("? 000 currentBlockNum= "+currentBlockNum.intValue());

    ????????? if(startBlockNum.compareTo(currentBlockNum) > 0) {

    ???????????? return;? //測試曾經出現 currentBlockNum得到錯誤數字,比startBlockNum還小,這時不能啟動監聽

    ????????? }

    ????? } catch (IOException e) {

    ????????? // TODO Auto-generated catch block

    ????????? System.out.println("? 111 getBlockNumber() Error: ");

    ????????? e.printStackTrace();

    ????????? return;?? //出現異常不能啟動監聽

    ????? }????

    ?????

    ?????

    ??????? //創建開始與結束區塊, 重放這段時間內的交易,防止遺漏

    ??????? DefaultBlockParameter startBlock = new DefaultBlockParameterNumber(startBlockNum);

    ??????? DefaultBlockParameter endBlock = new DefaultBlockParameterNumber(currentBlockNum);

    ??????? System.out.println("[ startTransferListen_ETH:? miss? startBlock="+startBlockNum+", endBlock="+currentBlockNum+"]");

    ???????

    ??????? block_EthMissSub = startBlockNum.intValue();

    ??????? ethMissSubscription = web3j.replayTransactionsObservable(startBlock, endBlock)

    ???????? ???.subscribe(tx -> {

    ??????????? ? //更新檢查過的區塊高度

    ??????????? ? block_EthMissSub = tx.getBlockNumber().intValue();??????????????? ??

    ??????????? ? System.out.println("? ---replayPastTransactionsFlowable??? block_EthMissSub = "+block_EthMissSub);

    ??????????? ?

    ??????????? ? String fromAddress = tx.getFrom();

    ??????????? ? String toAddress?? = tx.getTo();

    // ??????????????? ?System.out.println("toAddress="+toAddress);

    ??????????? ? if(htAddress.containsKey(fromAddress) || htAddress.containsKey(toAddress)) {? //發現了指定地址上的交易

    ??????????? ???? ?String txHash = tx.getHash();

    ??????????????? ?

    ???????????????????? BigDecimal value = Convert.fromWei(tx.getValue().toString(), Convert.Unit.ETHER);

    ???????????????????? String timestamp = "";

    ???????????????????? try {

    ???????????????????????? EthBlock ethBlock = web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf(tx.getBlockNumber()), false).send();

    ???????????????????????? timestamp = String.valueOf(ethBlock.getBlock().getTimestamp());

    ???????????????????? } catch (IOException e) {

    ???????????????????? ?? ?System.out.println("Block timestamp get failure,block number is {}" + tx.getBlockNumber());

    ???????????????????????? System.out.println("Block timestamp get failure,{}"+? e.getMessage());

    ???????????????????? }

    ?

    ???????????????????? // 監聽以太坊上是否有系統生成地址的交易

    ???????????????????? callBack_ETH(txHash,fromAddress,toAddress,value,timestamp);

    ??????????? ? }

    ??????????? ? ?

    ??????????? }, error->{

    ??????????? ? System.out.println("?? ### replayPastTransactionsFlowable? error= "+ error);

    ??????????? ? error.printStackTrace();

    ??????????? });

    ???????

    ??? }

    ?

    2.3 交易監聽

    這種方式監聽每一筆交易,以太坊上交易量太大,只能自己過濾出關注的交易進行處理。要盡可能的快速處理。可以考慮線程池模型進行處理。

    //啟動監聽以太坊上的交易

    ??? public void startTransactionListen_ETH() {??? ?

    ??? ??

    ??????? //監聽當前區塊以后的交易

    ??????? ethSubscription = web3j.transactionObservable().subscribe(tx -> {

    ?? ??????? ? //更新檢查過的區塊高度

    ?? ??????? ? block_EthSub = tx.getBlockNumber().intValue();?

    ?? ??????? ? System.out.println("? ---transactionFlowable? block_EthSub = "+block_EthSub);

    ?? ??????? ?

    ?? ??????? ? String txHash = tx.getHash();??????? ?

    ?? ??????? ? String fromAddress = tx.getFrom();

    ?? ??????? ? String toAddress = tx.getTo();

    ?? ??????? ? if(htAddress.containsKey(fromAddress) || htAddress.containsKey(toAddress)) {? //發現了指定地址上的交易

    ??????????????? BigDecimal value = Convert.fromWei(tx.getValue().toString(), Convert.Unit.ETHER);

    ???????????? ??? String timestamp = "";

    ??????????????? try {

    ??????????????? ??? EthBlock ethBlock = web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf(tx.getBlockNumber()), false).send();

    ??????????????? ??? timestamp = String.valueOf(ethBlock.getBlock().getTimestamp());

    ??????????????? } catch (IOException e) {

    ?????????????????? ?System.out.println("Block timestamp get failure,block number is {}" + tx.getBlockNumber());

    ?????????????????? ?System.out.println("Block timestamp get failure,{}"+? e.getMessage());

    ??????????????? }

    ????????????

    ??????????????? // 監聽以太坊上是否有系統生成地址的交易

    ???????????? ??? callBack_ETH(txHash,fromAddress,toAddress,value,timestamp);

    ???????????? ?}

    ?? ??????? }, error->{

    ?? ??????? ? System.out.println("?? ### transactionFlowable? error= "+ error);

    ?? ??????? ? error.printStackTrace();

    ?? ??????? });

    ??? }

    ?

    最后回調函數示例:

    //token轉賬事件的處理函數

    ??? public void? callBack_Token(String token, String txHash, String from, String to, BigDecimal value, String timestamp) {

    ??? ?? System.out.println("----callBack_Token:");

    ??? ?? System.out.println("??? token = "+token);

    ??? ?? System.out.println("??? txHash = "+token);

    ??? ?? System.out.println("??? from = "+from);

    ??? ?? System.out.println("??? to = "+to);

    ??? ?? System.out.println("??? value = "+value.doubleValue());

    ??? ??

    ??? }

    ?

    3 4.8.4版本監聽

    版本升級后原來的監聽函數改變了,用法如下:

    public Disposable? tokenSubscription;?? //token事件訂閱對象, 如果監視啟動成功,isDisposed()返回false;否則監視失敗返回true

    ?? public Disposable? ethMissSubscription; //ETH交易空檔事件訂閱對象

    ?? public Disposable? ethSubscription;???? //ETH交易事件訂閱對象

    ?

    tokenSubscription = web3j.ethLogFlowable(filter)

    ??????? ? .subscribe(log -> {……});

    ?

    ??????????

    ?

    ethMissSubscription = web3j.replayPastTransactionsFlowable(startBlock, endBlock)

    ??????????? .subscribe(tx -> {……});

    ?

    ethSubscription = web3j.transactionFlowable()

    ??????? ? .subscribe(tx -> {……});

    ?

    判斷監聽對象是否運行:

    tokenSubscription.isDisposed()

    ?

    原來通過監聽對象取消監聽:

    ethSubscription.cancel();

    現在沒有這個方法啦, 就是不能主動停止監聽啦。

    ?

    4 常見問題

    4.1 監聽無法啟動

    指定監聽開始區塊高度后,出現啟動監聽失敗,監聽對象為false。原因未知,我多次實踐經驗:

    開始區塊距離最新區塊越遠越容易失敗;

    一個開始區塊啟動監視成功,以后該區塊重新監聽也大概率成功,小概率失敗;

    即使監聽成功,持續運行期間內部常常出現空指針異常,可能導致監視停止運行;

    對于監聽成功啟動后出現的停止運行問題,我的做法是另開一個線程專門檢查監聽對象的狀態,一旦發現停止運行就立即重新啟動監聽,該方法有效。

    ?

    -----End-----

    總結

    以上是生活随笔為你收集整理的Web3j监听功能代码研究的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。