分布式理论、架构设计(自定义RPC)
會不斷更新!沖沖沖!跳轉連接
https://blog.csdn.net/qq_35349982/category_10317485.html
分布式理論、架構設計(自定義RPC)
1.分布式架構
1.1什么是分布式系統
分布式系統,就是一個業務拆分成多個子業務,分布在不同的服務器節點,共同構成的系統稱為分布式系統,同一個分布式系統中的服務器節點在空間部署上是可以隨意分布的,這些服務器可能放在不同的機柜中,也可能在不同的機房中,甚至分布在不同的城市。
1.2分布式與集群的區別
集群:多個人在一起作同樣的事 。 分布式 :多個人在一起作不同的事 。1.3特點
(1)分布性 (2)對等性 (3)并發性 (4)缺乏全局時鐘 (5)故障總是會發生1.4 分布式系統面臨的問題
- 通信異常
網絡本身的不可靠性,因此每次網絡通信都會伴隨著網絡不可用的風險
- 網絡分區
整個系統的網絡環境被切分成了若干個孤立的區域,分布式系統就會出現局部小集群,
- 節點故障
組成分布式系統的服務器節點出現的宕機或"僵死"現象
- 三態
分布式系統每一次請求與響應存在特有的“三態”概念,即成功、失敗和超時。
分布式理論
1.一致性
- 分布式一致性分布式數據一致性,指的是數據在多份副本中存儲時,各副本中的數據是一致的。
- 副本一致性 - 強一致性 - 弱一致性 - 讀寫一致性 - 單調讀一致性 - 因果一致性 - 最終一致性2.CAP定理
CAP 理論含義是,一個分布式系統不可能同時滿足一致性(C:Consistency),可用性(A: Availability)和分區容錯性(P:Partition tolerance)這三個基本需求,最多只能同時滿足其中的2個。
- 分布式一致性的特點?
1.由于存在數據庫同步過程,寫操作的響應會有一定的延遲
2.為了保定數據的一致性,對資源暫時鎖定,待數據同步完成后釋放鎖定資源
3.如果請求數據同步失敗的節點則會返回錯誤信息, 一定不會返回舊數據.
3.BASE 理論
BASE:全稱:**Basically Available(基本可用),Soft state(軟狀態),和 Eventually consistent(最終一致性)**三個短語的縮寫.
BASE是對CAP中一致性和可用性權衡的結果,BASE理論的核心思想是:即使無法做到強一致性,但每個應用都可以根據自身業務特點,采用適當的方式來使系統達到最終一致性。
4.分布式事務
1. 四大特性- Atomicity(原子性)- **Consistency(一致性)**- **Isolation**(隔離性)- Durablity(持久性)5.一致性協議 2PC
2PC ( Two-Phase Commit縮寫)即兩階段提交協議,是將整個事務流程分為兩個階段,準備階段(Preparephase)、提交階段(commit phase),2是指兩個階段,P是指準備階段,C是指提交階段。
6.一致性協議 3PC
3PC,全稱 “three phase commit”,是 2PC 的改進版,將 2PC 的 “提交事務請求” 過程一分為二,共形成了由CanCommit、PreCommit和doCommit三個階段組成的事務處理協議。
7.一致性算法 Paxos
提案(Proposal)。最終要達成一致的value就在提案里
提案 (Proposal):Proposal信息包括提案編號 (Proposal ID) 和提議的值 (Value)
Client:客戶端
? 客戶端向分布式系統發出請求,并等待響應。例如,對分布式文件服務器中文件的寫請求。
Proposer:提案發起者
? 提案者提倡客戶請求,試圖說服Acceptor對此達成一致,并在發生沖突時充當協調者以推動協議向前發展
Acceptor:決策者,可以批準提案
? Acceptor可以接受(accept)提案;如果某個提案被選定(chosen),那么該提案里的value就被選定了
Learners:最終決策的學習者 學習者充當該協議的復制因素
8.一致性算法 Raft
http://thesecretlivesofdata.com/raft/ 動畫演示
在Raft中,任何時候一個服務器都可以扮演下面的角色之一:
? 領導者(leader):處理客戶端交互,日志復制等動作,一般一次只有一個領導者
? 候選者(candidate):候選者就是在選舉過程中提名自己的實體,一旦選舉成功,則成為領導者
? 跟隨者(follower):類似選民,完全被動的角色,這樣的服務器等待被通知投票
2.分布式系統設計策略
- 心跳檢測
- 高可用設計
- 容錯性
- 負載均衡
3.分布式架構網絡通信
3.1基本原理
在底層層面去看,網絡通信需要做的就是將流從一臺計算機傳輸到另外一臺計算機,基于傳輸協議和網絡IO來實現,其中傳輸協議比較出名的有tcp、udp等等,tcp、udp都是在基于Socket概念上為某類應用場景而擴展出的傳輸協議,網絡IO,主要有bio、nio、aio三種方式,
3.2什么是RPC
RPC全稱為remote procedure call,即遠程過程調用。
借助RPC可以做到像本地調用一樣調用遠程服務,是一種進程間的通信方式
3.3RMI
Java RMI 指的是遠程方法調用 (Remote Method Invocation),是java原生支持的遠程調用 ,采用JRMP(JavaRemote Messageing protocol)作為通信協議,可以認為是純java版本的分布式遠程調用解決方案, RMI主要用于不同虛擬機之間的通信,這些虛擬機可以在不同的主機上、也可以在同一個主機上,這里的通信可以理解為一個虛擬機上的對象調用另一個虛擬機上對象的方法
3.4BIO,NIO,AIO
- 同步和異步
同步(synchronize)、異步(asychronize)是指應用程序和內核的交互而言的.
同步:指用戶進程觸發IO操作等待或者輪訓的方式查看IO操作是否就緒。
異步:當一個異步進程調用發出之后,調用者不會立刻得到結果。而是在調用發出之后,被調用者通過狀態、通知來通知調用者,或者通過回調函數來處理這個調用。
- 阻塞和非阻塞
阻塞和非阻塞是針對于進程訪問數據的時候,根據IO操作的就緒狀態來采取不同的方式
阻塞: 阻塞方式下讀取和寫入將一直等待,
非阻塞: 非阻塞方式下,讀取和寫入方法會理解返回一個狀態值.
BIO 同步阻塞IO。B代表blocking
服務器實現模式為一個連接一個線程,即客戶端有連接請求時服務器端就需要啟動一個線程進行處理,如果這個連接不做任何事情會造成不必要的線程開銷,當然可以通過線程池機制改善
NIO 同步非阻塞IO (non-blocking IO / new io)
服務器實現模式為一個請求一個通道,即客戶端發送的連接請求都會注冊到多路復用器上**,多路復用器輪詢到連接有IO請求時才啟動一個線程進行處理**
AIO 異步非阻塞IO。A代表asynchronize
**當有流可以讀時,操作系統會將可以讀的流傳入read方法的緩沖區,并通知應用程序,對于寫操作,OS將write方法的流寫入完畢是操作系統會主動通知應用程序。**因此read和write都是異步 的,完成后會調用回調函數。
3.5Netty
1.概念
Netty 是由 JBOSS 提供一個異步的、 基于事件驅動的網絡編程框架。
2.為什么使用Netty
NIO缺點
-
NIO 的類庫和 API 繁雜,使用麻煩。你需要熟練掌握 Selector、ServerSocketChannel、SocketChannel、
-
ByteBuffffer 等.
-
可靠性不強,開發工作量和難度都非常大
-
NIO 的 Bug。例如 Epoll Bug,它會導致 Selector 空輪詢,最終導致 CPU 100%。
Netty優點
-
對各種傳輸協議提供統一的 API
-
高度可定制的線程模型——單線程、一個或多個線程池
-
更好的吞吐量,更低的等待延遲
-
更少的資源消耗
-
最小化不必要的內存拷貝
阻塞的例子
老張煮開水。 老張,水壺兩把(普通水壺,簡稱水壺;會響的水壺,簡稱響水壺)。 1 老張把水壺放到火上,站立著等水開。(同步阻塞) 2 老張把水壺放到火上,去客廳看電視,時不時去廚房看看水開沒有。(同步非阻塞) 3 老張把響水壺放到火上,立等水開。(異步阻塞) 4 老張把響水壺放到火上,去客廳看電視,水壺響之前不再去看它了,響了再去拿壺。(異步非阻塞)3.Netty使用
1.執行邏輯
2.服務器端
// 接收客戶端請求,打印在控制臺 public class NettyServer {public static void main(String[] args) throws InterruptedException {//1.創建2個線程池對象//bossGroup 負責接收用戶連接NioEventLoopGroup bossGroup = new NioEventLoopGroup();//workGroup 負責處理用戶的io讀寫操作NioEventLoopGroup workGroup = new NioEventLoopGroup();//2.創建啟動引導類ServerBootstrap serverBootstrap = new ServerBootstrap();//3.設置啟動引導類//添加到組中,兩個線程池,第一個位置的線程池就負責接收,第二個參數就負責讀寫serverBootstrap.group(bossGroup,workGroup)//給我們當前設置一個通道類型.channel(NioServerSocketChannel.class)//綁定一個初始化監聽.childHandler(new ChannelInitializer<NioSocketChannel>() {//事件監聽Channel通道protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//獲取pipeLineChannelPipeline pipeline = nioSocketChannel.pipeline();//綁定編碼pipeline.addFirst(new StringEncoder());pipeline.addLast(new StringDecoder());//綁定我們的業務邏輯pipeline.addLast(new SimpleChannelInboundHandler<String>() {protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {//獲取入棧信息,打印客戶端傳遞的數據System.out.println(msg);}});}});//4.啟動引導類綁定端口ChannelFuture future = serverBootstrap.bind(9999).sync();//5.關閉通道future.channel().closeFuture().sync();} }3.客戶端
//客戶端給服務器發送數據 public class NettyClient {public static void main(String[] args) throws InterruptedException {//1.創建連接池對象NioEventLoopGroup group = new NioEventLoopGroup();//2.創建客戶端的啟動引導類 BootStrapBootstrap bootstrap = new Bootstrap();//3.配置啟動引導類bootstrap.group(group)//設置通道為Nio.channel(NioSocketChannel.class)//設置Channel初始化監聽.handler(new ChannelInitializer<Channel>() {//當前該方法監聽channel是否初始化protected void initChannel(Channel channel) throws Exception {//設置編碼channel.pipeline().addLast(new StringEncoder());}});//4.使用啟動引導類連接服務器 , 獲取一個channelChannel channel = bootstrap.connect("127.0.0.1", 9999).channel();//5.循環寫數據給服務器while (true) {//給服務器寫數據channel.writeAndFlush("hello server .. this is client ...");Thread.sleep(2000);}} }4.手寫RPC
1.流程
-
客戶端
-
創建代理對象
-
創建線程池對象
-
聲明自定義處理器(UserClientHandler)
-
初始化客戶端
- 初始化自定義處理器UserClientHandler
- 創建連接池對象
- 初始化客戶端
- 配置客戶端屬性
- 設置通道(NIO),設置協議TCP
- 監聽channel 并初始化
- 設置管道
- 設置編碼
- 添加自定義處理器
設置服務器連接
給自定義處理器設置參數
線程處理處理call() 寫操作,并返回結果
服務端
初始化服務器
設置兩個線程池
配置引導類
- 設置通道為NIO
- 創建監聽channel
- 獲取管道對象
- 設置編碼
- 設置自定義處理器
4.綁定端口
##2.代碼
1.序列化
//采用JSON的方式,定義JSONSerializer的實現類:(其他序列化方式,可以自行實現序列化接口) public class JSONSerializer implements Serializer{/*** java對象轉換為二進制** @param object* @return*/public byte[] serialize(Object object) throws IOException {return JSON.toJSONBytes(object);//return new byte[0];}/*** 二進制轉換成java對象** @param clazz* @param bytes* @param <T>* @return*/public <T> T deserialize(Class<T> clazz, byte[] bytes) throws IOException {return JSON.parseObject(bytes, clazz);//return null;} }2.客戶端
1.反射,初始化類
/*** 消費者*/ public class RPCConsumer {//1.創建一個線程池對象 -- 它要處理我們自定義事件private static ExecutorService executorService =Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());//2.聲明一個自定義事件處理器 UserClientHandlerprivate static UserClientHandler userClientHandler;//3.編寫方法,初始化客戶端 ( 創建連接池 bootStrap 設置bootstrap 連接服務器)public static void initClient() throws InterruptedException {//1) 初始化UserClientHandleruserClientHandler = new UserClientHandler();//2)創建連接池對象EventLoopGroup group = new NioEventLoopGroup();//3)創建客戶端的引導對象Bootstrap bootstrap = new Bootstrap();//4)配置啟動引導對象bootstrap.group(group)//設置通道為NIO.channel(NioSocketChannel.class)//設置請求協議為TCP.option(ChannelOption.TCP_NODELAY,true)//監聽channel 并初始化.handler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {//獲取ChannelPipelineChannelPipeline pipeline = socketChannel.pipeline();//設置編碼pipeline.addLast( new RpcEncoder(RpcRequest.class, new JSONSerializer()));pipeline.addLast(new StringDecoder());//添加自定義事件處理器pipeline.addLast(userClientHandler);}});//5)連接服務端bootstrap.connect("127.0.0.1",8999).sync();}//============================================//創建代理對象//IUserService service = (IUserService) RPCConsumer.createProxy(IUserService.class, rpcRequest);//4.編寫一個方法,使用JDK的動態代理創建對象// serviceClass 接口類型,根據哪個接口生成子類代理對象; providerParam : "UserService#sayHello#"public static Object createProxy(Class<?> serviceClass, final RpcRequest rpcRequest){return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class[]{serviceClass}, new InvocationHandler() {public Object invoke(Object o, Method method, Object[] objects) throws Throwable {//1)初始化客戶端clietif(userClientHandler == null){initClient();}//2)給UserClientHandler 設置param參數userClientHandler.setParam(rpcRequest);//3).使用線程池,開啟一個線程處理處理call() 寫操作,并返回結果Object result = executorService.submit(userClientHandler).get();//4)return 結果return result;}});}}2.自定義事件處理器
public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {//1.定義成員變量private ChannelHandlerContext context; //事件處理器上下文對象 (存儲handler信息,寫操作)private String result; // 記錄服務器返回的數據private RpcRequest rpcRequest; //記錄將要返送給服務器的數據//2.實現channelActive 客戶端和服務器連接時,該方法就自動執行@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//初始化ChannelHandlerContextthis.context = ctx;}//3.實現channelRead 當我們讀到服務器數據,該方法自動執行@Overridepublic synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//將讀到的服務器的數據msg ,設置為成員變量的值result = msg.toString();notify();}//4.將客戶端的數寫到服務器public synchronized Object call() throws Exception {//context給服務器寫數據context.writeAndFlush(rpcRequest);wait();return result;}//5.設置參數的方法public void setParam(RpcRequest param){this.rpcRequest = param;} }3.服務端
1.獲取掃描容器中的類
@Component public class ApplicationContextUtil implements ApplicationContextAware{private static ApplicationContext applicationContext;//聲明一個靜態變量保存public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {System.out.println("applicationContext正在初始化");this.applicationContext=applicationContext;}public static <T> T getBean(Class<T> clazz){if(applicationContext==null){System.out.println("applicationContext是空的");}else{// System.out.println("applicationContext不是空的");}return applicationContext.getBean(clazz);}public static ApplicationContext getApplicationContext(){return applicationContext;} }2.自定義處理器
/*** 自定義的業務處理器*/ public class UserServiceHandler extends ChannelInboundHandlerAdapter {//當客戶端讀取數據時,該方法會被調用@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String split = "{"+StrUtil.split(msg.toString(), "{")[1];//反序列化RpcRequest rpcRequest = JSON.parseObject(split, RpcRequest.class);//從容器中獲取對象UserServiceImpl userServiceImpl = (UserServiceImpl)ApplicationContextUtil.getBean(Class.forName(rpcRequest.getClassName()));// UserServiceImpl userServiceImpl = (UserServiceImpl)Class.forName(rpcRequest.getClassName());//字節碼對象Class<?> aClass = Class.forName(rpcRequest.getClassName());//執行指定方法Method method = aClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());Object invoke = method.invoke(userServiceImpl, rpcRequest.getParameters());ctx.writeAndFlush(invoke);} }3.服務器類
public class nettyServer {//創建一個方法啟動服務器public static void startServer(String ip , int port) throws InterruptedException {// UserServiceImpl bean = ApplicationContextUtil.getBean(UserServiceImpl.class);// System.out.println(bean);//1.創建兩個線程池對象NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workGroup = new NioEventLoopGroup();//2.創建服務端的啟動引導對象ServerBootstrap serverBootstrap = new ServerBootstrap();//3.配置啟動引導對象serverBootstrap.group(bossGroup,workGroup)//設置通道為NIO.channel(NioServerSocketChannel.class)//創建監聽channel.childHandler(new ChannelInitializer<NioSocketChannel>() {protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {//獲取管道對象ChannelPipeline pipeline = nioSocketChannel.pipeline();//給管道對象pipeLine 設置編碼pipeline.addLast(new StringEncoder());pipeline.addLast(new StringDecoder()); // pipeline.addLast(new MyEncoder()); // // pipeline.addLast(new MyDecoder());//把我們自頂一個ChannelHander添加到通道中pipeline.addLast(new UserServiceHandler());}});//4.綁定端口serverBootstrap.bind(8999).sync();} }參考答案記錄
編碼類
public class RpcEncoder extends MessageToByteEncoder{private Class<?> clazz;private Serializer serializer;public RpcEncoder(Class<?> clazz, Serializer serializer) {this.clazz = clazz;this.serializer = serializer;}@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf byteBuf) throws Exception {if (clazz != null && clazz.isInstance(msg)) {byte[] bytes = serializer.serialize(msg);byteBuf.writeInt(bytes.length);byteBuf.writeBytes(bytes);}} }解碼類
public class RpcDecoder extends ByteToMessageDecoder{private Class<?> clazz;private Serializer serializer;public RpcDecoder(Class<?> clazz, Serializer serializer) {this.clazz = clazz;this.serializer = serializer;}@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {//因為之前編碼的時候寫入一個Int型,4個字節來表示長度if (byteBuf.readableBytes() < 4) {return;}//標記當前讀的位置byteBuf.markReaderIndex();int dataLength = byteBuf.readInt();if (byteBuf.readableBytes() < dataLength) {byteBuf.resetReaderIndex();return;}byte[] data = new byte[dataLength];//將byteBuf中的數據讀入data字節數組byteBuf.readBytes(data);Object obj = serializer.deserialize(clazz, data);list.add(obj);} }客戶端的動態代理類
public class RpcConsumer {//創建線程池對象private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private static UserClientHandler userClientHandler;//1.創建一個代理對象 providerName:UserService#sayHello are you ok?public Object createProxy(final Class<?> serviceClass){//借助JDK動態代理生成代理對象return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass}, new InvocationHandler() {public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//(1)調用初始化netty客戶端的方法if(userClientHandler == null){initClient();}//封裝RpcRequest request = new RpcRequest();String requestId = UUID.randomUUID().toString();System.out.println(requestId);String className = method.getDeclaringClass().getName();String methodName = method.getName();Class<?>[] parameterTypes = method.getParameterTypes();request.setRequestId(requestId);request.setClassName(className);request.setMethodName(methodName);request.setParameterTypes(parameterTypes);request.setParameters(args);// 設置參數userClientHandler.setPara(request);System.out.println(request);System.out.println("設置參數完成");// 去服務端請求數據return executor.submit(userClientHandler).get();}});}//2.初始化netty客戶端public static void initClient() throws InterruptedException {userClientHandler = new UserClientHandler();EventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true).handler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer()));pipeline.addLast(new StringDecoder());pipeline.addLast(userClientHandler);}});bootstrap.connect("127.0.0.1",8990).sync();}}服務器的處理器類
@Component public class UserServerHandler extends ChannelInboundHandlerAdapter implements ApplicationContextAware {private static ApplicationContext applicationContext2;public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {UserServerHandler.applicationContext2 = applicationContext;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcRequest msg1 = (RpcRequest) msg;Object handler = handler(msg1);ctx.writeAndFlush("success");// 判斷是否符合約定,符合則調用本地方法,返回數據// msg: UserService#sayHello#are you ok? // if(msg.toString().startsWith("UserService")){ // UserServiceImpl userService = new UserServiceImpl(); // String result = userService.sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1)); // ctx.writeAndFlush(result); // }}private Object handler(RpcRequest request) throws ClassNotFoundException, InvocationTargetException {//使用Class.forName進行加載Class文件Class<?> clazz = Class.forName(request.getClassName());Object serviceBean = applicationContext2.getBean(clazz);Class<?> serviceClass = serviceBean.getClass();String methodName = request.getMethodName();Class<?>[] parameterTypes = request.getParameterTypes();Object[] parameters = request.getParameters();//使用CGLB ReflectFastClass fastClass = FastClass.create(serviceClass);FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);return fastMethod.invoke(serviceBean, parameters);}}附錄
1.Java多線程學習之wait、notify/notifyAll 詳解
-
wait()、notify/notifyAll() 方法是Object的本地final方法,無法被重寫。
-
wait()使當前線程阻塞,前提是 必須先獲得鎖
-
只有當 notify/notifyAll() 被執行時候,才會喚醒一個或多個正處于等待狀態的線程,
單詞
Consistency 一致性
Availability 可用性
Partition tolerance 分區容錯性
Basically Available(基本可用)
Soft state(軟狀態)
Eventually consistent(最終一致性)
Proposer 提案發起者
Acceptor:決策者,可以批準提案
總結
以上是生活随笔為你收集整理的分布式理论、架构设计(自定义RPC)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: zookeeper笔记+源码刨析
- 下一篇: dubbo笔记+源码刨析