日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RPC框架系列——Protocol Buffers

發布時間:2024/4/17 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RPC框架系列——Protocol Buffers 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

http://blog.jeoygin.org/2011/09/rpc-framework-protocol-buffers.html

1.下載與安裝

  官方網站:http://code.google.com/p/protobuf/

  下載地址:http://protobuf.googlecode.com/files/protobuf-2.4.1.tar.bz2

  protocol buffers并沒有實現RPC通信,可以使用第三方的RPC實現protobuf-socket-rpc,下載地址是:http://protobuf-socket-rpc.googlecode.com/files/protobuf-socket-rpc-2.0.jar

  • cd /usr/local/src
  • wget http://protobuf.googlecode.com/files/protobuf-2.4.1.tar.bz2
  • tar jxvf protobuf-2.4.1.tar.bz2
  • cd protobuf-2.4.1
  • ./configure
  • make
  • make check
  • make install
  •   下面將編譯生成jar包,以便在java中使用Protocol Buffers,需確保已安裝了maven。

  • cd java
  • mvn test
  • mvn install
  • mvn package
  •   安裝、編譯后在target/目錄下會生成protobuf-java-2.4.1.jar。

    2.消息結構與服務接口

      首先需要編寫一個.proto文件,結構化數據被稱為Message。

  • package protobuf;
  • ?
  • option java_package = "protobuf";
  • option java_outer_classname = "PersonProtos";
  • option java_generic_services = true;
  • ?
  • message Person {
  • ?? ?required string name = 1;
  • ?? ?required int32 id = 2;
  • ?? ?optional string email = 3;
  • ?
  • ?? ?enum PhoneType {
  • ?? ? ? ?MOBILE = 0;
  • ?? ? ? ?HOME = 1;
  • ?? ? ? ?WORK = 2;
  • ?? ?}
  • ?
  • ?? ?message PhoneNumber {
  • ?? ? ? ?required string number = 1;
  • ?? ? ? ?optional PhoneType type = 2 [default = HOME];
  • ?? ?}
  • ?
  • ?? ?repeated PhoneNumber phone = 4;
  • ?
  • ?? ?service PhoneService {
  • ?? ? ? ?rpc GetPhone (Phone) returns (Phone);
  • ?? ?}
  • }
  •   消息的成員需要指定其規則:

        (1) required:這個域在消息中必須剛好有1個;

        (2) optional:這個域在消息中可以有0或1個;

        (3) repeated:這個域在消息中可以有從多個,包括0個。

      Protobuf的類型與Java類型的映射關系:

  • double? ?->? double
  • float? ? ->? float
  • int32? ? ->? int
  • int64? ? ->? long
  • uint32? ?->? int[1]
  • uint64? ?->? long[1]
  • sint32? ?->? int
  • sint64? ?->? long
  • fixed32? ->? int[1]
  • fixed64? ->? long[1]
  • sfixed32 ->? int
  • sfixed64 ->? long
  • bool? ? ?->? boolean
  • string? ?->? String
  • bytes? ? ->? ByteString
  •   編寫完.proto文件后,就可以使用下面的命令將會在protobuf目錄中生成源文件PersonProtos.java

  • protoc –java_out=. person.proto
  • 3.序列化

      先看下面一個例子:

  • message Test1 {
  • ?? ?required int32 a = 1;
  • }
  •   創建一個Test1消息,并且把a設置為150,那么序列化后有如下3個字節:

  • 08 96 01
  • 3.1.varint編碼

      varint編碼的序列化使用一個或多個字節,數字越大使用的字節數越多。對于序列化后的字節,除了最后一個字節,都有一個most significant bit(msb):表示后邊是否有更多的字節。整數序列化時按7位一組,每個字節的低7位保存一組,第一個字節存儲最低位一組,即使用little endian。

      比如300序列化后的字節序列是:

  • 10101100 00000010
  •   先去掉每個字節的msb:

  • 0101100 0000010
  •   交換字節的順序:

  • 0000010 0101100 -> 100101100 -> 256 + 32 + 8 + 4 = 300
  • 3.2.消息結構

      一個protocol buffer message是一個key/value對序列。每一key/value對的key實際是兩個值:.proto文件中的field number以及wire type。可用的wire type如下所示:


    Type Meaning Used For
    0 Varint int32, int64, uint32, uint64, sint32, sint64, bool, enum
    1 64-bit fixed64, sfixed64, double
    2 Length-delimited string, bytes, embedded messages, packed repeated fields
    3 Start group groups (deprecated)
    4 End group groups (deprecated)
    5 32-bit fixed32, sfixed32, float

      每一個key是一個varint,值是(field_number << 3) | wire_type,即低三位存儲wire type。

    3.3.有符號整數

      有符號整數使用ZigZag編碼來將有符號整數映射到無符號整數。


    Signed Original Encoded As
    0 0
    -1 1
    1 2
    -2 3
    2147483647 4294967294
    -2147483648 4294967294

    3.4.非varint編碼

  • message Test2 {
  • ?? ?required string b = 2;
  • }
  •   將b的值設置為“testing”,編碼結果為:

  • 12 07 74 65 73 74 69 6e 67
  •   這里的key是0×12:field_number = 2, type = 2。字符串的長度是7。

    3.5.嵌套消息

  • message Tes3 {
  • ?? ?required Test1 c = 3;
  • }
  •   c的成員a的值設置為150,編碼結果為:

  • 1a 03 08 96 01
  •   后三個字節和Test1一樣,之前的數字3表示長度。

    3.5.Repeated域

  • message Test4 {
  • ?? ?repeated int32 d = 4;
  • }
  •   {3, 270, 86942}編碼結果為:

  • 22? ? ? ? // tag (field number 4, wire type 2)
  • 06? ? ? ? // payload size (6 bytes)
  • 03? ? ? ? // first element (varint 3)
  • 8E 02? ? ?// second element (varint 270)
  • 9E A7 05? // third element (varint 86942)
  • 4.rpc通信實現

      使用protocol buffers的第三方rpc實現protobuf-socket-rpc。

      假設protocol buffers生成的類是protobuf. MessageProtos,其中定義了一個消息類Message和一個服務類MessageService,MessageService中定義了一個接口getMessage(RpcController, Message request)。

      服務接口實現MessageServiceImpl.java:

  • package?protobuf;
  • ?
  • import?com.google.protobuf.RpcController;
  • import?com.google.protobuf.ServiceException;
  • import?protobuf.MessageProtos.Message;
  • import?protobuf.MessageProtos.MessageService.BlockingInterface;
  • ?
  • public?class?MessageServiceImpl?implements?BlockingInterface?{
  • ? ? @Override
  • ? ??public?Message?getMessage(RpcController?controller,?Message?request)
  • ? ? ? ? ? ??throws?ServiceException?{
  • ? ? ? ??// process request? ? ? ?
  •     ……
  • ? ? ? ??return?request;
  • ? ??}
  • }
  •   服務端實現Server.java:

  • package?protobuf;
  • ?
  • import?java.util.concurrent.Executors;
  • ?
  • import?com.googlecode.protobuf.socketrpc.RpcServer;
  • import?com.googlecode.protobuf.socketrpc.ServerRpcConnectionFactory;
  • import?com.googlecode.protobuf.socketrpc.SocketRpcConnectionFactories;
  • import?protobuf.MessageProtos.MessageService;
  • ?
  • public?class?Server?{
  • ? ??private?int?port;
  • ? ??private?int?threadPoolSize;
  • ?
  • ? ??public?Server(int?port,?int?threadPoolSize)?{
  • ? ? ? ??this.port?=?port;
  • ? ? ? ??this.threadPoolSize?=?threadPoolSize;
  • ? ??}
  • ?
  • ? ??public?void?run()?{
  • ? ? ? ??// Start server
  • ? ? ? ??ServerRpcConnectionFactory?rpcConnectionFactory?=SocketRpcConnectionFactories
  • ? ? ? ? ? ? ? ? .createServerRpcConnectionFactory(port);
  • ? ? ? ??RpcServer?server?=?new?RpcServer(rpcConnectionFactory,
  • ? ? ? ? ? ? ? ??Executors.newFixedThreadPool(threadPoolSize),?true);
  • ? ? ? ??server.registerBlockingService(MessageService
  • ? ? ? ? ? ? ? ? .newReflectiveBlockingService(new?MessageServiceImpl()));
  • ? ? ? ??server.run();
  • ? ??}
  • ?
  • ? ??public?static?void?main(String[]?args)?{
  • ? ? ? ??if?(args.length?!=?2)?{
  • ? ? ? ? ? ??System.out.println("Usage: Server port thread_pool_size");
  • ? ? ? ? ? ??return;
  • ? ? ? ??}
  • ?
  • ? ? ? ??int?port?=?Integer.parseInt(args[0]);
  • ? ? ? ??int?size?=?Integer.parseInt(args[1]);
  • ?
  • ? ? ? ??new?Server(port,?size).run();
  • ? ??}
  • }
  •   客戶端實現Client.java:

  • package?protobuf;
  • ?
  • import?protobuf.MessageProtos.Message;
  • import?protobuf.MessageProtos.MessageService;
  • import?protobuf.MessageProtos.MessageService.BlockingInterface;
  • ?
  • import?com.google.protobuf.BlockingRpcChannel;
  • import?com.google.protobuf.ByteString;
  • import?com.google.protobuf.RpcController;
  • import?com.google.protobuf.ServiceException;
  • import?com.googlecode.protobuf.socketrpc.RpcChannels;
  • import?com.googlecode.protobuf.socketrpc.RpcConnectionFactory;
  • import?com.googlecode.protobuf.socketrpc.SocketRpcConnectionFactories;
  • import?com.googlecode.protobuf.socketrpc.SocketRpcController;
  • ?
  • public?class?Client?{
  • ? ??private?int?port;
  • ? ??private?String?host;
  • ? ??private?int?size;
  • ? ??private?int?count;
  • ?
  • ? ??public?Client(int?port,?String?host,?int?size,?int?count)?{
  • ? ? ? ??super();
  • ? ? ? ??this.port?=?port;
  • ? ? ? ??this.host?=?host;
  • ? ? ? ??this.size?=?size;
  • ? ? ? ??this.count?=?count;
  • ? ??}
  • ?
  • ? ??public?long?run()?{
  • ? ? ? ??// Create channel
  • ? ? ? ??RpcConnectionFactory?connectionFactory?=?SocketRpcConnectionFactories
  • ? ? ? ? ? ? ? ? .createRpcConnectionFactory(host,?port);
  • ? ? ? ??BlockingRpcChannel?channel?=?RpcChannels
  • ? ? ? ? ? ? ? ? .newBlockingRpcChannel(connectionFactory);
  • ?
  • ? ? ? ??// Call service
  • ? ? ? ??BlockingInterface?service?=?MessageService.newBlockingStub(channel);
  • ? ? ? ??RpcController?controller?=?new?SocketRpcController();
  • ? ? ? ??Message.Builder?message?=?Message.newBuilder();
  • ? ? ? ??// initiate the message
  • ? ? ? ? …
  • ?
  • ? ? ? ??long?start?=?0;
  • ? ? ? ??long?end?=?0;
  • ? ? ? ??try?{
  • ? ? ? ? ? ??start?=?System.currentTimeMillis();
  • ? ? ? ? ? ??for?(int?i?=?0;?i?<?count;?i++)?{
  • ? ? ? ? ? ? ? ??service.getMessage(controller,?message.build());
  • ? ? ? ? ? ??}
  • ? ? ? ? ? ??end?=?System.currentTimeMillis();
  • ? ? ? ? ? ??System.out.println(end?-?start);
  • ? ? ? ??}?catch?(ServiceException?e)?{
  • ? ? ? ? ? ??e.printStackTrace();
  • ? ? ? ??}
  • ?
  • ? ? ? ??// Check success
  • ? ? ? ??if?(controller.failed())?{
  • ? ? ? ? ? ??System.err.println(String.format("Rpc failed %s : %s",
  • ? ? ? ? ? ? ? ? ? ??((SocketRpcController)?controller).errorReason(),
  • ? ? ? ? ? ? ? ? ? ??controller.errorText()));
  • ? ? ? ??}
  • ?
  • ? ? ? ??return?end?-?start;
  • ? ??}
  • ?
  • ? ??public?static?void?main(String[]?args)?{
  • ? ? ? ??if?(args.length?!=?4)?{
  • ? ? ? ? ? ??System.out.println("Usage: Client host port dataSize count");
  • ? ? ? ? ? ??return;
  • ? ? ? ??}
  • ? ? ? ??String?host?=?args[0];
  • ? ? ? ??int?port?=?Integer.parseInt(args[1]);
  • ? ? ? ??int?size?=?Integer.parseInt(args[2]);
  • ? ? ? ??int?count?=?Integer.parseInt(args[3]);
  • ?
  • ? ? ? ??new?Client(port,?host,?size,?count).run();
  • ? ??}
  • }
  • 5.參考資料

      (1) Protocol Buffers Documentation:?http://code.google.com/apis/protocolbuffers/docs/overview.html


    總結

    以上是生活随笔為你收集整理的RPC框架系列——Protocol Buffers的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。