dubbo笔记+源码刨析
會不斷更新!沖沖沖!跳轉連接
https://blog.csdn.net/qq_35349982/category_10317485.html
dubbo筆記
1.概念
RPC全稱為remote procedure call,即遠程過程調用。
借助RPC可以做到像本地調用一樣調用遠程服務,是一種進程間的通信方式。
Java RMI 指的是遠程方法調用 (Remote Method Invocation),是java原生支持的遠程調用 ,采用JRMP(JavaRemote Messageing protocol)作為通信協議,可以認為是純java版本的分布式遠程調用解決方案, RMI主要用于不同虛擬機之間的通信
JMS即Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關于面向消息中間件(MOM)
官網: http://dubbo.apache.org/zh-cn/docs/user/quick-start.html
2.配置方式
xml配置: http://dubbo.apache.org/zh-cn/docs/user/quick-start.html
注解配置:http://dubbo.apache.org/zh-cn/docs/user/configuration/annotation.html
@EnableDubbo(scanBasePackages = "com.lagou.service.impl") //類似于啟動類 //config文件 //加載報掃描 //生命周期 @EnableDubboConfig @DubboComponentScan @EnableDubboLifecycle public @interface EnableDubbo { }3. SPI簡介
SPI 全稱為 (Service Provider Interface) ,是JDK內置的一種服務提供發現機制。 目前有不少框架用它來做服務的擴展發現,簡單來說,它就是一種動態替換發現的機制。使用SPI機制的優勢是實現解耦,使得第三方服務模塊的裝配控制邏輯與調用者的業務代碼分離。
1.java的SPI
1.約定
1、當服務提供者提供了接口的一種具體實現后,在META-INF/services目錄下創建一個以“接口全限定名”為命名的文件,內容為實現類的全限定名;
2、接口實現類所在的jar包放在主程序的classpath中;
3、主程序通過java.util.ServiceLoader動態裝載實現模塊,它通過掃描META-INF/services目錄下的配置文件找到實現類的全限定名,把類加載到JVM;
4、SPI的實現類必須攜帶一個無參構造方法;
2.例子
-
數據庫驅動加載接口實現類的加載 JDBC加載不同類型數據庫的驅動
-
日志門面接口實現類加載 SLF4J加載不同提供商的日志實現類
-
Spring Spring中大量使用了SPI,比如:對servlet3.0規范對ServletContainerInitializer的實現、自動類型轉換Type Conversion SPI(Converter SPI、Formatter SPI)等
-
Dubbo中也大量使用SPI的方式實現框架的擴展, 不過它對Java提供的原生SPI做了封裝,允許用戶擴展實Filter接口
3.code
1.api層
public interface HelloService {String sayHello(); }2.impl層
public class DogHelloService implements HelloService {@Overridepublic String sayHello() {return "wang wang";} } public class HumanHelloService implements HelloService {@Overridepublic String sayHello() {return "hello 你好";} }resources層
新建 META-INF/services/ 目錄
后面跟上接口的包名
META-INF/services/com.lagou.service.HelloService
com.lagou.service.impl.DogHelloService com.lagou.service.impl.HumanHelloService3.調用層
public static void main(String[] args) {final ServiceLoader<HelloService> helloServices = ServiceLoader.load(HelloService.class);for (HelloService helloService : helloServices){System.out.println(helloService.getClass().getName() + ":" + helloService.sayHello());}}2.dubbo的SPI
1.約定
使用注解 @SPI
@Adaptive 主要解決的問題是如何動態的選擇具體的擴展點。通過getAdaptiveExtension 統一對指定接口對應的所有擴展點進行封裝
配置到 META-INF/dubbo/下 + com.lagou.service.HelloService(接口路徑)
- dubbo自己做SPI的目的
2.code
1.api層
@SPI("human") public interface HelloService {String sayHello();@AdaptiveString sayHello(URL url); }2.impi層
//================================== public class DogHelloService implements HelloService{@Overridepublic String sayHello() {return "wang wang";}@Overridepublic String sayHello(URL url) {return "wang url";} } //================================== public class HumanHelloService implements HelloService{@Overridepublic String sayHello() {return "hello 你好";}@Overridepublic String sayHello(URL url) {return "hello url";} }在Resources的META-INF/dubbo/com.lagou.service.HelloService目錄下
human=com.lagou.service.impl.HumanHelloService dog=com.lagou.service.impl.DogHelloService3.調用層
- 調用全部拓展加載器
- 調用指定的
4.dubbo使用
1.api模塊
//公共模塊 service-api public interface HelloService {String sayHello(String name); }2.provider模塊
//package com.lagou.service.impl; //dubbo的service注解 @Service public class HelloServiceImpl implements HelloService {@Overridepublic String sayHello(String name) {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}return "hello:"+name;} } //====Main方法 public class DubboPureMain {public static void main(String[] args) throws Exception{AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProviderConfiguration.class);context.start();System.in.read();}@Configuration@EnableDubbo(scanBasePackages = "com.lagou.service.impl")@PropertySource("classpath:/dubbo-provider.properties")static class ProviderConfiguration{@Beanpublic RegistryConfig registryConfig(){RegistryConfig registryConfig = new RegistryConfig();registryConfig.setAddress("zookeeper://127.0.0.1:2181?timeout=10000");//registryConfig.setTimeout(10000);return registryConfig;}}}resources中
dubbo-consumer.properties
dubbo.application.name=service-consumer dubbo.registry.address=zookeeper://127.0.0.1:2181 dubbo.consumer.timeout=4000 ##運維命令 dubbo.application.qosEnable=true dubbo.application.qosPort=33333 dubbo.application.qosAcceptForeignIp=false3.consumer模塊
@Component public class ComsumerComponet {//dubbo的reference注解@Referenceprivate HelloService helloService;public String sayHello(String name){return helloService.sayHello(name);}} //================================= //Main方法 public class AnnotationConsumerMain {public static void main(String[] args) throws Exception {System.out.println("-------------");AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class);context.start();// 獲取消費者組件ComsumerComponet service = context.getBean(ComsumerComponet.class);while(true){System.in.read();String hello = service.sayHello("world");System.out.println("result:"+hello);}}@Configuration@PropertySource("classpath:/dubbo-consumer.properties")@ComponentScan(basePackages = "com.lagou.bean")@EnableDubbostatic class ConsumerConfiguration{} }#=============
3.過濾器配置
@Activate(group = {CommonConstants.CONSUMER,CommonConstants.PROVIDER}) public class DubboInvokeFilter implements Filter {@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {long startTime = System.currentTimeMillis();try {// 執行方法return invoker.invoke(invocation);} finally {System.out.println("invoke time:"+(System.currentTimeMillis()-startTime) + "毫秒");}} }在Resources配置文件中配置
META-INF/dubbo/org.apache.dubbo.rpc.Filter
timeFilter=com.lagou.filter.DubboInvokeFilter在使用的模塊中 添加依賴即可
4.負載均衡
public class OnlyFirstLoadbalancer implements LoadBalance {@Overridepublic <T> Invoker<T> select(List<Invoker<T>> list, URL url, Invocation invocation) throws RpcException {// 所有的服務提供者 按照IP + 端口排序 選擇第一個return list.stream().sorted((i1,i2)->{final int ipCompare = i1.getUrl().getIp().compareTo(i2.getUrl().getIp());if(ipCompare == 0){return Integer.compare(i1.getUrl().getPort(),i2.getUrl().getPort());}return ipCompare;}).findFirst().get();} }在Resources配置文件中配置
META-INF/dubbo/org.apache.dubbo.rpc.cluster.LoadBalance
onlyFirst=com.laogu.loadbalance.OnlyFirstLoadbalancer在Counsumer中使用
@Component public class ConsumerComponent {//在dubbo的注解這塊 添加負載均衡@Reference(loadbalance = "onlyFirst")private HelloService helloService;public String sayHello(String name, int timeToWait) {return helloService.sayHello(name, timeToWait);}}5.線程池
官網 : http://dubbo.apache.org/zh-cn/docs/user/demos/thread-model.html
public class WachingThreadPool extends FixedThreadPool implements Runnable{private static final Logger LOGGER = LoggerFactory.getLogger(WachingThreadPool.class);// 定義線程池使用的閥值private static final double ALARM_PERCENT = 0.90;private final Map<URL, ThreadPoolExecutor> THREAD_POOLS = new ConcurrentHashMap<>();public WachingThreadPool(){// 每隔3秒打印線程使用情況Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this,1,3, TimeUnit.SECONDS);}// 通過父類創建線程池@Overridepublic Executor getExecutor(URL url) {final Executor executor = super.getExecutor(url);if(executor instanceof ThreadPoolExecutor){THREAD_POOLS.put(url,(ThreadPoolExecutor)executor);}return executor;}@Overridepublic void run() {// 遍歷線程池for (Map.Entry<URL,ThreadPoolExecutor> entry: THREAD_POOLS.entrySet()){final URL url = entry.getKey();final ThreadPoolExecutor executor = entry.getValue();// 計算相關指標final int activeCount = executor.getActiveCount();final int poolSize = executor.getCorePoolSize();double usedPercent = activeCount / (poolSize*1.0);LOGGER.info("線程池執行狀態:[{}/{}:{}%]",activeCount,poolSize,usedPercent*100);if (usedPercent > ALARM_PERCENT){LOGGER.error("超出警戒線! host:{} 當前使用率是:{},URL:{}",url.getIp(),usedPercent*100,url);}}} } public class WachingThreadPool extends FixedThreadPool implements Runnable{private static final Logger LOGGER = LoggerFactory.getLogger(WachingThreadPool.class);// 定義線程池使用的閥值private static final double ALARM_PERCENT = 0.90;private final Map<URL, ThreadPoolExecutor> THREAD_POOLS = new ConcurrentHashMap<>();public WachingThreadPool(){// 每隔3秒打印線程使用情況Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this,1,3, TimeUnit.SECONDS);}// 通過父類創建線程池@Overridepublic Executor getExecutor(URL url) {final Executor executor = super.getExecutor(url);if(executor instanceof ThreadPoolExecutor){THREAD_POOLS.put(url,(ThreadPoolExecutor)executor);}return executor;}@Overridepublic void run() {// 遍歷線程池for (Map.Entry<URL,ThreadPoolExecutor> entry: THREAD_POOLS.entrySet()){final URL url = entry.getKey();final ThreadPoolExecutor executor = entry.getValue();// 計算相關指標final int activeCount = executor.getActiveCount();final int poolSize = executor.getCorePoolSize();double usedPercent = activeCount / (poolSize*1.0);LOGGER.info("線程池執行狀態:[{}/{}:{}%]",activeCount,poolSize,usedPercent*100);if (usedPercent > ALARM_PERCENT){LOGGER.error("超出警戒線! host:{} 當前使用率是:{},URL:{}",url.getIp(),usedPercent*100,url);}}} }在provider中配置文件中添加線程池
dubbo.application.name=dubbo-demo-annotation-provider dubbo.protocol.name=dubbo dubbo.protocol.port=20885 #dubbo.protocol.host=192.168.1.109 dubbo.provider.threadpool=watching#dubbo.protocol.telnet=clear,exit,help,status,log,ls,ps,cd,pwd,invoke,trace,count,select,shutdown6.服務降級
服務降級,當服務器壓力劇增的情況下,根據當前業務情況及流量對一些服務有策略的降低服務級別,以釋放服務器資源,保證核心任務的正常運行。
- 在 dubbo 管理控制臺配置服務降級
屏蔽和容錯
mock=force:return+null
表示消費方對該服務的方法調用都直接返回 null 值,不發起遠程調用。用來屏蔽不重要服務不可用時對調用方的影響。
mock=fail:return+null
表示消費方對該服務的方法調用在失敗后,再返回 null 值,不拋異常。用來容忍不重要服務不穩定時對調用方的影響。
- 指定返回簡單值或者null
- 使用java代碼 動態寫入配置中心
dubbo的控制臺客戶端
1.從git 上下載項目 https://github.com/apache/dubbo-admin 2.修改項目下的dubbo.properties文件 注意 dubbo.registry.address對應的值需要對應當前使用的Zookeeper的ip地址和端口號 dubbo.registry.address=zookeeper://zk所在機器ip:zk端口 dubbo.admin.root.password=root dubbo.admin.guest.password=guest 3.切換到項目所在的路徑 使用mvn 打包 mvn clean package -Dmaven.test.skip=true 4.java 命令運行 java -jar 對應的jar包dubbo的補充
1.dubbo中的Url
- protocol:一般是 dubbo 中的各種協議 如:dubbo thrift http zk
- username/password:用戶名/密碼
- host/port:主機/端口
- path:接口名稱
- parameters:參數鍵值對
任意的一個領域中的一個實現都可以認為是一類 URL,dubbo 使用 URL 來統一描述了元數據,配置信息,貫穿在整個框架之中。
http://dubbo.apache.org/zh-cn/blog/introduction-to-dubbo-url.html
dubbo的設計
http://dubbo.apache.org/zh-cn/docs/dev/design.html
作業思路
作業二
服務端:
? 定義一個類,三個方法,用隨機數100秒 隨機
過濾器:
? 方法執行時,記錄時間存儲到Map
? key是 方法名
? value是一個Map
? key是方法最后的執行時間
? value是方法的執行消耗時間
消費端:
? 開啟兩個線程
? 1.循環執行三個方法
? 2.讀取Map,key
? 根據key剔除掉一分鐘之前的數據
? Map按照時間戳排序
? T90取size()*90的數據
消費端
//消費端public class AnnotationConsumerMain {public static void main(String[] args) throws Exception {AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class);context.start();ConsumerComponent service = context.getBean(ConsumerComponent.class);//定義線程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,2,100, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10));//定義三個方法執行的方法Runnable consumer = new Thread(() -> {try {service.consumer();} catch (InterruptedException e) {e.printStackTrace();}});threadPoolExecutor.execute(consumer);//定義監控的方法Runnable consumer1 = new Thread(() -> {PrintMethodCost.print();});threadPoolExecutor.execute(consumer1);}@Configuration@PropertySource("classpath:/dubbo-consumer.properties")//@EnableDubbo(scanBasePackages = "com.lagou.bean")@ComponentScan("com.lagou.bean")@EnableDubbostatic class ConsumerConfiguration {} }///監控的方法 public class PrintMethodCost {public static void print() {while (true) {//取執行方法的集合//key 方法名//value 是 方法的最后執行時間-執行毫秒數Map<String, HashMap<Long, Long>> methodCount = MethodCostTimeCount.getMethodCount();long currentTimeMillis = System.currentTimeMillis();//現在的時間-60秒就是 開始的時間;long oneMinuteBefore = currentTimeMillis - 60000;for (Map.Entry<String, HashMap<Long, Long>> entry : methodCount.entrySet()) {//移除Iterator iterator = entry.getValue().entrySet().iterator();while (iterator.hasNext()) {Map.Entry<Long, Long> key = (Map.Entry<Long, Long>) iterator.next();if (key.getKey() < oneMinuteBefore) {iterator.remove();}}HashMap<Long, Long> oneMinuteMap = entry.getValue();//按照value值進行升序排序List<Map.Entry<Long, Long>> list = new ArrayList<>(oneMinuteMap.entrySet());/* Collections.sort(list, new Comparator<Map.Entry<Long, Long>>() {public int compare(Map.Entry<Long, Long> o1, Map.Entry<Long, Long> o2) {return o1.getValue().compareTo(o2.getValue());//利用String類的compareTo方法}});*/Collections.sort(list, Comparator.comparing(Map.Entry::getValue));int index90 = (int) (list.size() * 0.9);int index99 = (int) (list.size() * 0.99);Long tp90Time = list.get(index90).getKey();Long tp90Cost = list.get(index90).getValue();System.out.println("T90方法名="+entry.getKey() + "毫秒數=" + tp90Cost + " ms,方法執行時間 : " + tp90Time);Long tp99Time = list.get(index99).getKey();Long tp99Cost = list.get(index99).getValue();System.out.println("T99方法名="+entry.getKey() + "毫秒數=" + tp99Cost + " ms,方法執行時間 : " + tp99Time);}try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {System.out.println("Thread sleep exception!");}System.out.println("==============================");}} } //執行方法的 @Component public class ConsumerComponent {@Referenceprivate HelloService helloService;private volatile Integer num =0;private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,8,100, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1000));public void consumer() throws InterruptedException {Runnable runnableA = new Thread(() -> helloService.sayHello1());Runnable runnableB = new Thread(() -> helloService.sayHello2());Runnable runnableC = new Thread(() -> helloService.sayHello3());while (true){TimeUnit.MILLISECONDS.sleep(70);try{threadPoolExecutor.execute(runnableA);threadPoolExecutor.execute(runnableB);threadPoolExecutor.execute(runnableC);} catch (RejectedExecutionException e){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException ex) {System.out.println("Thread sleep exception!");}}}}}過濾器
//過濾器存儲類 public class MethodCostTimeCount {public static Map<String, HashMap<Long, Long>> methodCount = new ConcurrentHashMap<>();public static void put(String method, Long time, Long cost){HashMap<Long, Long> hashMap = methodCount.get(method);if(hashMap == null){hashMap = new HashMap<>();methodCount.put(method, hashMap);}hashMap.put(time, cost);}public static Map<String, HashMap<Long, Long>> getMethodCount() {return methodCount;} } @Activate(group = {CommonConstants.CONSUMER}) public class TPMonitorFilter implements Filter {@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {String methodName = invocation.getMethodName();long startTime = System.currentTimeMillis();try {// 執行方法return invoker.invoke(invocation);} finally {long endTime = System.currentTimeMillis();long costTime = endTime - startTime;//添加到集合MethodCostTimeCount.put(methodName, endTime, costTime);System.out.println("invoke time:"+costTime + "毫秒");}} }附錄
1.spring注解開發AnnotationConfigApplicationContext的使用
使用AnnotationConfigApplicationContext可以實現基于Java的配置類(包括各種注解)加載Spring的應用上下文。避免使用application.xml進行配置。相比XML配置,更加便捷。
2.spring的上下文
Spring有兩個核心接口:BeanFactory和ApplicationContext,其中ApplicationContext是BeanFactory的子接口。他們都可代表Spring容器,Spring容器是生成Bean實例的工廠,并且管理容器中的Bean。
Spring容器最基本的接口就是BeanFactor。BeanFactory負責配置、創建、管理Bean,
他有一個子接口:ApplicationContext,因此也稱之為Spring上下文。
Spring容器負責管理Bean與Bean之間的依賴關系。
https://www.cnblogs.com/chenssy/archive/2012/11/15/2772287.html
3.java8的Stream流編程
Stream 是 Java8 中處理集合的關鍵抽象概念
中間操作
? filter:過濾流中的某些元素
? limit(n):獲取n個元素
? skip(n):跳過n元素,配合limit(n)可實現分頁
? distinct:通過流中元素的 hashCode() 和 equals() 去除重復元素
https://blog.csdn.net/y_k_y/article/details/84633001
4.Thread.sleep()和TimeUnit.SECONDS.sleep()的區別與聯系
TimeUnit對Thread.sleep方法的包裝,實現是一樣的,只是多了時間單位轉換和驗證
源碼底層
//TimeUnit源碼 public void sleep(long timeout) throws InterruptedException {if (timeout > 0) {long ms = toMillis(timeout);int ns = excessNanos(timeout, ms);Thread.sleep(ms, ns);}}5.Future是什么?使用?
https://www.cnblogs.com/cz123/p/7693064.html
https://www.jianshu.com/p/b8952f07ee5d
6.TP90、TP99耗時監控設計與實現
https://blog.csdn.net/u012472945/article/details/105611155
TP50:指在一個時間段內(如5分鐘),統計該方法每次調用所消耗的時間,并將這些時間按從小到大的順序進行排序,取第50%的那個值作為TP50的值;配置此監控指標對應的報警閥值后,需要保證在這個時間段內該方法所有調用的消耗時間至少有50%的值要小于此閥值,否則系統將會報警
7.粘包與沾包
8.線程池OOM
https://mp.weixin.qq.com/s/1ZRkW4ND8y1KqRSFY_7_-A
作業
dubbo定義攔截器獲取 :Ip獲取白名單 :https://www.jianshu.com/p/98d68d57f62a
單詞
manual 手動的
Extension 拓展
retention 保留
Policy 政策
exhaust 廢棄,疲憊
excess超過
convenience 方便,適宜
Scheduled 安排
business 商業,業務
cluster 群,聚集
protocol 協議
attachment附件
總結
以上是生活随笔為你收集整理的dubbo笔记+源码刨析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: TomcatNginx源码笔记分析
- 下一篇: Cluster模式潜在问题及解决方案、W