日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

简易RPC框架实现

發布時間:2025/3/8 编程问答 13 豆豆
生活随笔 收集整理的這篇文章主要介紹了 简易RPC框架实现 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

寫在最前面

PRC(Remote Procedure Call) 遠程過程調用。通俗的講就是程序通過RPC框架調用遠程主機的方法就如同調用本地方法一樣。Dubbo就是這樣一個Rpc框架,本文主要參考Dubbo的設計思路,簡易實現了Rpc框架。 本文涉及到知識點包括:

  • Jdk 動態代理
  • serialization 序列化
  • Netty 相關
  • Zookeeper 使用

1、Rpc框架

Rpc 框架一般分為三個部分,Registry(注冊中心)、Provider(提供者)、Consumer(消費者)。

  • Registry 服務的注冊中心,可以通過zookeeper、redis等實現。
  • Provider 服務提供者被調用方,提供服務供消費者調用
  • Consumer 消費者,通過訂閱相應的服務獲取需要調用服務的ip和端口號調用遠程provider提供的服務。
  • 2、代理

    java中常見的代理有JDK動態代理、Cglib動態代理、靜態代理(ASM等字節碼技術)。

    2.1、JDK 代理

    舉個例子

    @Override@SuppressWarnings("unchecked")public <T> T createProxyBean(Class<T> rpcServiceInterface) {return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{rpcServiceInterface}, new AryaRpcInvocationHandler());} 復制代碼

    JDK代理生成代理對象主要通過java.lang.reflect.Proxy類的newProxyInstance方法。JDK代理需要被代理對象必須實現接口。

    2.2、Cglib

    Cglib實際上是對ASM的易用性封裝,Cglib不需要目標對象必須實現某一個接口,相對JDK動態代理更加靈活。

    Enhancer en = new Enhancer(); en.setSuperclass(clazz); en.setCallback(new MethodInterceptor() { @Override public Object intercept(Object arg0, Method method, Object[] args, MethodProxy arg3) throws Throwable { Object o = method.invoke(object, args); return o; } }); return en.create(); 復制代碼

    2.3、靜態代理

    通過字節碼技術對class文件進行修改,使用和學習成本相對較高,需要對Class的文件結構以及各種符號引用有比較深的認識,才能較好的使用,因為是對字節碼的修改所以相對的性能上也比動態代理要好一些。

    3、序列化

    我們知道數據在網絡上傳輸都是通過二進制流的形式進行進行的。當Consumer調用Provider時傳輸的參數需要先進行序列化,provider接收到參數時需要進行反序列化才能拿到需要的參數數據,所以序列化的性能對RPC的調用性能有很大的影響。目前主流的序列化方式有很多包括:Kryo、Protostuff、hessian。等

    Protostuff是google序列化Protosbuff的開源實現,項目中我們用到它的序列化方式

    /** * @author HODO */ public class ProtostuffSerializer implements Serializer {@Overridepublic byte[] serialize(Object object) {Class targetClass = object.getClass();RuntimeSchema schema = RuntimeSchema.createFrom(targetClass);LinkedBuffer linkedBuffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);return ProtostuffIOUtil.toByteArray(object, schema, linkedBuffer);}@SuppressWarnings("unchecked")@Overridepublic <T> T deserialize(byte[] bytes, Class<T> targetClass) {RuntimeSchema schema = RuntimeSchema.createFrom(targetClass);T object = (T) schema.newMessage();ProtostuffIOUtil.mergeFrom(bytes, object, schema);return object;} }復制代碼

    4、Netty

    Netty是一個高性能、異步事件驅動的NIO框架,它提供了對TCP、UDP和文件傳輸的支持。舉個例子: Netty 服務端代碼

    public class NettyServer {private ApplicationContext applicationContext;public NettyServer(ApplicationContext applicationContext) {this.applicationContext = applicationContext;}public void init(int port) {EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, worker);bootstrap.channel(NioServerSocketChannel.class);bootstrap.option(ChannelOption.SO_BACKLOG, 1024);bootstrap.option(ChannelOption.TCP_NODELAY, true);bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);bootstrap.localAddress(port);bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline channelPipeline = socketChannel.pipeline();channelPipeline.addLast(new NettyServerHandler(applicationContext));}});ChannelFuture f = bootstrap.bind().sync();if (f.isSuccess()) {System.out.println("Netty端口號:" + port);}f.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}} 復制代碼

    Netty 客服端代碼

    public class NettyClient {private int port;private String host;private final CountDownLatch countDownLatch = new CountDownLatch(1);SerializerFactory serializerFactory = new SerializerFactory();Serializer serializer = serializerFactory.getSerialize(ProtostuffSerializer.class);public NettyClient(String host, int port) {this.port = port;this.host = host;}public NettyClient(String inetAddress) {if (inetAddress != null && inetAddress.length() != 0) {String[] strings = inetAddress.split(":");this.host = strings[0];this.port = Integer.valueOf(strings[1]);}}public RpcResponse invoker(RpcRequest rpcRequest) throws InterruptedException {EventLoopGroup eventLoopGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();final NettyClientHandler clientHandler = new NettyClientHandler();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(clientHandler);}});ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)).sync();serializer.serialize(rpcRequest);future.channel().writeAndFlush(Unpooled.buffer().writeBytes(serializer.serialize(rpcRequest)));countDownLatch.await();// 等待鏈接關閉//future.channel().closeFuture().sync();return clientHandler.getRpcResponse();} finally {eventLoopGroup.shutdownGracefully();}}public class NettyClientHandler extends ChannelInboundHandlerAdapter {private RpcResponse rpcResponse;/*** 接收 Rpc 調用結果** @param ctx netty 容器* @param msg 服務端答復消息* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);rpcResponse = serializer.deserialize(req, RpcResponse.class);countDownLatch.countDown();}RpcResponse getRpcResponse() {return rpcResponse;}} }復制代碼

    5、注冊中心 zookeeper

    選用了zookeeper作為注冊中心,在建議Rpc框架中提供了注冊中心的擴展。只要實現RegistryManager接口即可。zookeeper常用的命令行:

    1、客服端腳本連接zookeeper服務器不指定-server默認連接本地服務

    ./zkCli -service ip:port

    2、創建

    create [-s] [-e] path data acl

    創建一個節點-s -e分別指定節點的類型和特性:順序和臨時節點默認創建的是臨時節點,acl用于權限控制

    3、讀取

    ls path只能看指定節點下的一級節點

    get path查看指定節點的數據和屬性信息

    4、更新

    set path data [version]

    可以指定更新操作是基于哪一個版本當更新的 path 不存在時報 Node does not exist

    5、刪除

    `delete path [version]``

    6、Spring 支持

    在框架中還提供了兩個注解@RpcConsumer和RpcProvider 在項目中只要引入

    <dependency><groupId>com.yoku.arya</groupId><artifactId>arya</artifactId><version>1.0-SNAPSHOT</version></dependency> 復制代碼

    在provider端容器注入

    @Beanpublic RpcProviderProcessor rpcProviderProcessor() {return new RpcProviderProcessor();} 復制代碼

    在comsumer端容器注入

    @Beanpublic RpcConsumerProcessor rpcConsumerProcessor() {return new RpcConsumerProcessor();} 復制代碼

    項目完整的代碼 arya github.com/hoodoly/ary…

    框架使用Demo github.com/hoodoly/ary…

    歡迎 star

    聯系方式:gunriky@163.com 有問題可以直接聯系

    總結

    以上是生活随笔為你收集整理的简易RPC框架实现的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。