日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

实现一个简易的RPC

發(fā)布時間:2025/3/20 编程问答 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 实现一个简易的RPC 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

??點擊上方?好好學(xué)java?,選擇?星標(biāo)?公眾號

重磅資訊、干貨,第一時間送達(dá) 今日推薦:用好Java中的枚舉,真的沒有那么簡單!個人原創(chuàng)+1博客:點擊前往,查看更多 作者:黃青石 鏈接:https://www.cnblogs.com/huangqingshi/p/12289820.html

之前寫了一些關(guān)于RPC原理的文章,但是覺得還得要實現(xiàn)一個。之前看到一句話覺得非常有道理,與大家共勉。不是“不要重復(fù)造輪子”,而是“不要發(fā)明輪子”,所以能造輪子還是需要造的。

這篇文章的梗概如下:

1. ?介紹一下這篇RPC的大致梗概。

2. ?說一下這篇文章需要的技術(shù)和實現(xiàn)。

3. ?用測試用例測試一下。

一、梗概

這篇RPC主要提示了服務(wù)注冊,服務(wù)發(fā)現(xiàn),同步調(diào)用,異步調(diào)用,回調(diào)功能。這些功能已經(jīng)足夠?qū)W習(xí)RPC使用了,對其中的原理就了解的非常清楚了。

二、技術(shù)和實現(xiàn)

采用的技術(shù)有Netty, Zookeeper, Protostuff, Spring,Cglib,Log4j這些基本就能夠達(dá)到這個功能。

Netty的作用就是用于客戶端向服務(wù)端發(fā)送請求,服務(wù)端接收請求后進(jìn)行處理,Netty是一個基于異步事件處理的程序,客戶端和服務(wù)端采用的LengthFieldBasedFrameDecoder,這種解碼方法是最通用的,也就是把長度寫到數(shù)據(jù)包中,即 Length + Data,用這種解碼方法解決拆包粘包的問題。

Zookeeper的作用是用于服務(wù)的注冊,分布式解決方案中的很重要的一個工具。里邊主要做兩件事,一個是創(chuàng)建一個永久的節(jié)點,然后再永久的節(jié)點下邊創(chuàng)建臨時節(jié)點,用作服務(wù)的注冊,同時寫上對應(yīng)的監(jiān)聽事件,如果服務(wù)下線或上線了,將進(jìn)行服務(wù)上下線處理。

Protostuff是基于protoBuff的一種方案,這種方案可以在protoBuff的基礎(chǔ)上省去對應(yīng)的.proto文件,這樣相對來講會更方便一些。它的主要作用是將數(shù)據(jù)進(jìn)行序列化和反序列化,相對于JDK自帶的序列化方案,這種方案有更好更優(yōu)的處理效率。

Spring主要是因為最近項目都比較流行Spring, 所以需要將Spring結(jié)合起來,這樣才能更好的兼容大部分的工程,同時也了解一下Spring的各種機制,本篇主要采用的是自定義一個注解,然后將接口和方法添加上注解,添加好之后,在Spring啟動的時候,獲取該注解的類并且將其封閉到一個Map中,待后續(xù)使用。

Cglib的作用是動態(tài)代碼,客戶端將需要操作的接口類,方法,參數(shù),參數(shù)進(jìn)行進(jìn)行封裝,然后序列化后發(fā)給服務(wù)端,服務(wù)端收到請求之后將結(jié)合注冊在Map中的Bean進(jìn)行方法調(diào)用,采用的就是Cglib,關(guān)于動態(tài)代理我還寫過一篇文章

Log4j用于配置進(jìn)行日志輸出。

接下來咱們一起看下代碼片段:

下邊的是Server的定義,里邊主要有兩個主要的功能,一個是Netty的初始化,采用上邊說的將長度寫到Length里邊, Length占4個字節(jié),剩下的就是數(shù)據(jù)。

效果如下,最大長度為64436即 64K,Length的長度為4個字節(jié)。將Request和Response進(jìn)行解碼和編碼,這個含義是直接將實際的數(shù)據(jù)轉(zhuǎn)化為真實的Request。

* +------------+--------------------+* | Length | Actual Content |* | 0x00000C | "HELLO, WORLD" |* +------------+--------------------+

還有一個比較重要的就是在啟動初始化的時候,將注解RPCServer的類獲取且封裝起來,放到Map里邊用于后續(xù)的調(diào)用。

package com.hqs.server; import com.hqs.client.RPCClientHandler; import com.hqs.codec.RPCDecoder; import com.hqs.codec.RPCEncoder; import com.hqs.protocol.Request; import com.hqs.protocol.Response; import com.hqs.registry.ServiceDiscovery; import com.hqs.registry.ServiceRegistry; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.commons.collections4.MapUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class RPCServer implements ApplicationContextAware, InitializingBean { private String serverAddress; private ServiceRegistry serviceRegistry; private Map<String, Object> handlerMap = new HashMap<>(); private static ThreadPoolExecutor threadPoolExecutor; private EventLoopGroup bossGroup = null; private EventLoopGroup workerGroup = null; public RPCServer(String serverAddress) { this.serverAddress = serverAddress;} public RPCServer(String serverAddress, ServiceRegistry serviceRegistry) { this.serverAddress = serverAddress; this.serviceRegistry = serviceRegistry;}@Override public void afterPropertiesSet() throws Exception {start();}@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, Object> serverBeanMap = applicationContext.getBeansWithAnnotation(RPCService.class); if(!MapUtils.isEmpty(serverBeanMap)) { for(Object serviceBean : serverBeanMap.values()) {String interfaceName = serviceBean.getClass().getAnnotation(RPCService.class).value().getName();handlerMap.put(interfaceName, serviceBean);}}} public void start() throws InterruptedException { if(bossGroup == null && workerGroup == null) {bossGroup = new NioEventLoopGroup();workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Override protected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 0)).addLast(new RPCDecoder(Request.class)).addLast(new RPCEncoder(Response.class)).addLast(new RPCServerHandler(handlerMap));}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);String[] address = serverAddress.split(":");String host = address[0]; int port = Integer.parseInt(address[1]);ChannelFuture future = serverBootstrap.bind(host, port).sync();System.out.println("servier 啟動"); if(serviceRegistry != null) {serviceRegistry.register(serverAddress);}future.channel().closeFuture().sync();}} public static void submit(Runnable task) { if(threadPoolExecutor == null) { synchronized (RPCServer.class) { if(threadPoolExecutor == null) {threadPoolExecutor = new ThreadPoolExecutor(16, 16, 600L,TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(65536));}}}threadPoolExecutor.submit(task);} public RPCServer addService(String interfaceName, Object serviceBean) { if(!handlerMap.containsKey(interfaceName)) {handlerMap.put(interfaceName, serviceBean);} return this;} }

下面是異步調(diào)用接口和動態(tài)代理接口,用于進(jìn)行接口異步調(diào)用,實現(xiàn)動態(tài)代理。

package com.hqs.proxy; import com.hqs.async.RPCFuture; public interface AsyncObjectProxy {RPCFuture call(String funcName, Object... args); }

client端的代理采用的是JDK的代理機制,在初始化ObjectProxy的時候,將需要代理的類傳入,這樣如果類在調(diào)用方法的時候,首先會調(diào)用里邊的invoke方法,這樣就可以在invoke里邊進(jìn)行數(shù)據(jù)請求的初始化工作了。

package com.hqs.proxy; import com.hqs.ConnectionManager; import com.hqs.async.RPCFuture; import com.hqs.client.RPCClientHandler; import com.hqs.protocol.Request; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.util.UUID; public class ObjectProxy<T> implements InvocationHandler, AsyncObjectProxy{ private Class<T> clazz; public ObjectProxy(Class<T> clazz) { this.clazz = clazz;}@Override public RPCFuture call(String funcName, Object... args) {RPCClientHandler handler = ConnectionManager.getInstance().chooseHandler();Request request = createRquest(this.clazz.getName(), funcName, args);RPCFuture future = handler.sendRequest(request); return future;}@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (Object.class == method.getDeclaringClass()) {String name = method.getName(); if ("equals".equals(name)) { return proxy == args[0];} else if ("hashCode".equals(name)) { return System.identityHashCode(proxy);} else if ("toString".equals(name)) { return proxy.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(proxy)) +", with InvocationHandler " + this;} else { throw new IllegalStateException(String.valueOf(method));}}Request request = new Request();request.setRequestId(UUID.randomUUID().toString());request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setParameters(args);Class[] parameterTypes = new Class[args.length]; for(int i = 0; i < args.length; i++) {parameterTypes[i] = getClassType(args[i]);}request.setParameterTypes(parameterTypes);System.out.println("requestId:" + request.getRequestId() + " className:" + request.getClassName());RPCClientHandler handler = ConnectionManager.getInstance().chooseHandler();RPCFuture future = handler.sendRequest(request); return future.get();} private Request createRquest(String className, String methodName, Object[] args) {Request request = new Request();request.setRequestId(UUID.randomUUID().toString());request.setClassName(className);request.setMethodName(methodName);request.setParameters(args);Class[] parameterTypes = new Class[args.length]; for(int i = 0; i < args.length; i++) {parameterTypes[i] = getClassType(args[i]);}request.setParameterTypes(parameterTypes);System.out.println("requestId:" + request.getRequestId() + " className:" + className); return request;} private Class<?> getClassType(Object obj) {Class<?> classType = obj.getClass();String typeName = classType.getName(); switch (typeName) { case "java.lang.Integer": return Integer.TYPE; case "java.lang.Long": return Long.TYPE; case "java.lang.Float": return Float.TYPE; case "java.lang.Double": return Double.TYPE; case "java.lang.Character": return Character.TYPE; case "java.lang.Boolean": return Boolean.TYPE; case "java.lang.Short": return Short.TYPE; case "java.lang.Byte": return Byte.TYPE;} return classType;} }

異步回調(diào)方法接口和異步處理類RPCFuture,該類實現(xiàn)了Future類,這個類里有的方法大家應(yīng)該比較常用。cancel(), isCancelled(), isDone(), get(), get(long timeout, TimeUnit unit),其中g(shù)et是同步調(diào)用,什么時候執(zhí)行完成之后什么時候繼續(xù)執(zhí)行后續(xù)操作,get(long timeout, TimeUnit unit)用于在某個時間內(nèi)不給到回執(zhí)的話,將會不丟棄掉請求。

package com.hqs.async; public interface AsyncRPCCallback { void success(Object result); void fail(Exception e);} package com.hqs.async; import com.hqs.client.RPCClient; import com.hqs.protocol.Request; import com.hqs.protocol.Response; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 用于實現(xiàn)異步調(diào)用 */ public class RPCFuture implements Future<Object> { private Sync sync; private Request request; private Response response; private long startTime; private long responseTimeThreshold = 5000L; private List<AsyncRPCCallback> pendingCallbacks = new ArrayList<>(); private Lock lock = new ReentrantLock(); public RPCFuture(Request request) { this.sync = new Sync(); this.request = request; this.startTime = System.currentTimeMillis();}@Override public boolean cancel(boolean mayInterruptIfRunning) { return false;}@Override public boolean isCancelled() { throw new UnsupportedOperationException();}@Override public boolean isDone() { return sync.isDone();}@Override public Object get() throws InterruptedException, ExecutionException {sync.acquire(1); if(this.response != null) { return this.response.getResult();} return null;}@Override public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { boolean success = sync.tryAcquireNanos(1, unit.toNanos(timeout)); if(success) { if(this.response != null) { return this.response.getResult();} return null;} return new RuntimeException("Timeout exception. Request id: " + this.request.getRequestId() + ". Request class name: " + this.request.getClassName() + ". Request method: " + this.request.getMethodName());} public void done(Response response) { this.response = response;sync.release(1);invokeCallbacks(); long responseTime = System.currentTimeMillis() - startTime; if(responseTime > responseTimeThreshold) {System.out.println("Service response time is too slow. Request id = " + response.getRequestId());}} private void invokeCallbacks() {lock.lock(); try { for( AsyncRPCCallback asyncRPCCallback : pendingCallbacks) {runCallback(asyncRPCCallback);}} finally {lock.unlock();}} public RPCFuture addCallback(AsyncRPCCallback callback) {lock.lock(); try { if(isDone()) {runCallback(callback);} else { this.pendingCallbacks.add(callback);}} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();} return this;} private void runCallback(final AsyncRPCCallback callback) { final Response response = this.response;RPCClient.submit(new Runnable() {@Override public void run() { if(!response.isError()) {callback.success(response.getResult());} else {callback.fail(new RuntimeException("Response error", new Throwable(response.getError())));}}});} static class Sync extends AbstractQueuedSynchronizer { //future statusprivate final int done = 1; private final int pending = 0;@Override protected boolean tryAcquire(int arg) { return getState() == done;}@Override protected boolean tryRelease(int arg) { if(getState() == pending) { if(compareAndSetState(pending, done)) { return true;} else { return false;}} else { return true;}} public boolean isDone() { return getState() == done;}} }

服務(wù)的注冊和服務(wù)發(fā)現(xiàn)類,里邊包括了zk的連接,設(shè)置ZK的監(jiān)聽,創(chuàng)建永久節(jié)點和臨時節(jié)點。

package com.hqs.registry; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; public class ServiceRegistry { private CountDownLatch latch = new CountDownLatch(1); private String registryAddress; public ServiceRegistry(String registryAddress) { this.registryAddress = registryAddress;} public void register(String data) { if(data != null) {ZooKeeper zk = connectServer(); if(zk != null) {AddRootNode(zk);createNode(zk, data);}}} private ZooKeeper connectServer() {ZooKeeper zk = null; try {zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {@Override public void process(WatchedEvent event) { if(event.getState() == Event.KeeperState.SyncConnected) {latch.countDown();}}});latch.await();} catch (Exception e) {e.printStackTrace();} return zk;} private void AddRootNode(ZooKeeper zk) { try {Stat s = zk.exists(Constant.ZK_REGISTRY_PATH, false); if(s == null) {zk.create(Constant.ZK_REGISTRY_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}} catch (Exception e) {e.printStackTrace();}} private void createNode(ZooKeeper zk, String data) { try { byte[] dataBytes= data.getBytes();String path = zk.create(Constant.ZK_DATA_PATH, dataBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);System.out.println("createNode:path" + path + " data:" + data);} catch (Exception e) {e.printStackTrace();}} } package com.hqs.registry; import com.hqs.ConnectionManager; import org.apache.zookeeper.*; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; public class ServiceDiscovery { private CountDownLatch latch = new CountDownLatch(1); private volatile List<String> dataList = new ArrayList<>(); private String registryAddress; private ZooKeeper zooKeeper; public ServiceDiscovery(String registryAddress) { this.registryAddress = registryAddress;zooKeeper = connectServer(); if(zooKeeper != null) { try {watchNode(zooKeeper);} catch (Exception e) { try {watchNode(zooKeeper);} catch (Exception e1) {e1.printStackTrace();}}}} private ZooKeeper connectServer() {ZooKeeper zk = null; try {zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {@Override public void process(WatchedEvent event) { if(event.getState() == Event.KeeperState.SyncConnected) {latch.countDown();}}});latch.await();} catch (Exception e ) {e.printStackTrace();} return zk;} private void watchNode(final ZooKeeper zk) { try {List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {@Override public void process(WatchedEvent event) { if(event.getType() == Event.EventType.NodeDataChanged) {watchNode(zk);}}});List<String> dataList = new ArrayList<>(); for(String node : nodeList) { byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);dataList.add(new String(bytes));} this.dataList = dataList;UpdateConnectServer();} catch (KeeperException | InterruptedException e) {e.printStackTrace();}} private void UpdateConnectServer() {ConnectionManager.getInstance().UpdateConnectedServer(dataList);} public void close() { if(zooKeeper != null) { try {zooKeeper.close();} catch (Exception e) {e.printStackTrace();}}}}

三、測試

大部分代碼功能已經(jīng)在上邊描述了,當(dāng)然還有很多細(xì)節(jié)需要了解,比如AQS,RentrantLock,Condition,這個需要自行了解一下。下邊咱們來看一下測試用例。

啟動zookeeper,然后啟動RPCBootServiceWithSpring,將下邊每個測試的類進(jìn)行調(diào)用,依次是同步調(diào)用,異步調(diào)用,同步callback調(diào)用。

package com.hqs.spring; import com.hqs.HelloService; import com.hqs.async.AsyncRPCCallback; import com.hqs.async.RPCFuture; import com.hqs.client.RPCClient; import com.hqs.proxy.AsyncObjectProxy; import com.hqs.registry.ServiceDiscovery; import org.junit.After; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit;@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:client.xml") public class ServiceTest { private static final Logger logger = LoggerFactory.getLogger(ServiceTest.class);@Autowired private RPCClient rpcClient;@Test public void syncTest() {HelloService helloService = rpcClient.create(HelloService.class);String result = helloService.sayHi("hqs");System.out.println(result);Assert.assertEquals("Hi hqs", result);}@Test public void asyncInvokeTest() {ServiceDiscovery serviceDiscovery = new ServiceDiscovery("127.0.0.1:2181");RPCClient rpcClient = new RPCClient(serviceDiscovery);AsyncObjectProxy asyncClient = rpcClient.createAsync(HelloService.class);RPCFuture future = asyncClient.call("sayHi", "hqs"); try {String result = (String) future.get(5, TimeUnit.SECONDS);Assert.assertEquals("Hi hqs", result);System.out.println(result);} catch (Exception e) {e.printStackTrace();}}@Test public void syncCallbackTest() {ServiceDiscovery serviceDiscovery = new ServiceDiscovery("127.0.0.1:2181");RPCClient rpcClient = new RPCClient(serviceDiscovery);AsyncObjectProxy asyncClient = rpcClient.createAsync(HelloService.class);RPCFuture future = asyncClient.call("sayHi", "hqs"); final CountDownLatch latch = new CountDownLatch(1);future.addCallback(new AsyncRPCCallback() {@Override public void success(Object result) {System.out.println("result:" + result.toString());Assert.assertEquals("Hi hqs", result);latch.countDown();}@Override public void fail(Exception e) {System.out.println("fail:" + e.getMessage());latch.countDown();}}); try {latch.await();} catch (Exception e) {e.printStackTrace();}}@After public void setTear() { if (rpcClient != null) {rpcClient.stop();}} } ![](https://img2018.cnblogs.com/i-beta/1236784/202002/1236784-20200210123031597-125855471.png)

運行上邊的結(jié)果都通過了,說明能正常運行。

如果想要看更詳細(xì)的代碼訪問:https://github.com/stonehqs/MyNettyRpc

歡迎指正。

總結(jié)

以上是生活随笔為你收集整理的实现一个简易的RPC的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。