日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

dubbo笔记+源码刨析

發布時間:2024/4/19 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 dubbo笔记+源码刨析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

會不斷更新!沖沖沖!跳轉連接

https://blog.csdn.net/qq_35349982/category_10317485.html

dubbo筆記

1.概念

RPC全稱為remote procedure call,即遠程過程調用。

借助RPC可以做到像本地調用一樣調用遠程服務,是一種進程間的通信方式。

Java RMI 指的是遠程方法調用 (Remote Method Invocation),是java原生支持的遠程調用 ,采用JRMP(JavaRemote Messageing protocol)作為通信協議,可以認為是純java版本的分布式遠程調用解決方案, RMI主要用于不同虛擬機之間的通信

JMS即Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關于面向消息中間件(MOM)

官網: http://dubbo.apache.org/zh-cn/docs/user/quick-start.html

2.配置方式

xml配置: http://dubbo.apache.org/zh-cn/docs/user/quick-start.html

注解配置:http://dubbo.apache.org/zh-cn/docs/user/configuration/annotation.html

@EnableDubbo(scanBasePackages = "com.lagou.service.impl") //類似于啟動類 //config文件 //加載報掃描 //生命周期 @EnableDubboConfig @DubboComponentScan @EnableDubboLifecycle public @interface EnableDubbo { }

3. SPI簡介

SPI 全稱為 (Service Provider Interface) ,是JDK內置的一種服務提供發現機制。 目前有不少框架用它來做服務的擴展發現,簡單來說,它就是一種動態替換發現的機制。使用SPI機制的優勢是實現解耦,使得第三方服務模塊的裝配控制邏輯與調用者的業務代碼分離。

1.java的SPI

1.約定

1、當服務提供者提供了接口的一種具體實現后,在META-INF/services目錄下創建一個以“接口全限定名”為命名的文件,內容為實現類的全限定名;

2、接口實現類所在的jar包放在主程序的classpath中;

3、主程序通過java.util.ServiceLoader動態裝載實現模塊,它通過掃描META-INF/services目錄下的配置文件找到實現類的全限定名,把類加載到JVM;

4、SPI的實現類必須攜帶一個無參構造方法

2.例子

  • 數據庫驅動加載接口實現類的加載 JDBC加載不同類型數據庫的驅動

  • 日志門面接口實現類加載 SLF4J加載不同提供商的日志實現類

  • Spring Spring中大量使用了SPI,比如:對servlet3.0規范對ServletContainerInitializer的實現、自動類型轉換Type Conversion SPI(Converter SPI、Formatter SPI)等

  • Dubbo中也大量使用SPI的方式實現框架的擴展, 不過它對Java提供的原生SPI做了封裝,允許用戶擴展實Filter接口

3.code

1.api層

public interface HelloService {String sayHello(); }

2.impl層

public class DogHelloService implements HelloService {@Overridepublic String sayHello() {return "wang wang";} } public class HumanHelloService implements HelloService {@Overridepublic String sayHello() {return "hello 你好";} }

resources層

新建 META-INF/services/ 目錄

后面跟上接口的包名

META-INF/services/com.lagou.service.HelloService

com.lagou.service.impl.DogHelloService com.lagou.service.impl.HumanHelloService

3.調用層

public static void main(String[] args) {final ServiceLoader<HelloService> helloServices = ServiceLoader.load(HelloService.class);for (HelloService helloService : helloServices){System.out.println(helloService.getClass().getName() + ":" + helloService.sayHello());}}

2.dubbo的SPI

1.約定

使用注解 @SPI

@Adaptive 主要解決的問題是如何動態的選擇具體的擴展點。通過getAdaptiveExtension 統一對指定接口對應的所有擴展點進行封裝

配置到 META-INF/dubbo/下 + com.lagou.service.HelloService(接口路徑)

  • dubbo自己做SPI的目的
1. JDK 標準的 SPI 會一次性實例化擴展點所有實現,如果有擴展實現初始化很耗時,但如果沒用上也加 載,會很浪費資源 2. 如果有擴展點加載失敗,則所有擴展點無法使用 3. 提供了對擴展點包裝的功能(Adaptive),并且還支持通過set的方式對其他的擴展點進行注入

2.code

1.api層

@SPI("human") public interface HelloService {String sayHello();@AdaptiveString sayHello(URL url); }

2.impi層

//================================== public class DogHelloService implements HelloService{@Overridepublic String sayHello() {return "wang wang";}@Overridepublic String sayHello(URL url) {return "wang url";} } //================================== public class HumanHelloService implements HelloService{@Overridepublic String sayHello() {return "hello 你好";}@Overridepublic String sayHello(URL url) {return "hello url";} }

在Resources的META-INF/dubbo/com.lagou.service.HelloService目錄下

human=com.lagou.service.impl.HumanHelloService dog=com.lagou.service.impl.DogHelloService

3.調用層

  • 調用全部拓展加載器
public static void main(String[] args) {// 獲取擴展加載器ExtensionLoader<HelloService> extensionLoader = ExtensionLoader.getExtensionLoader(HelloService.class);// 遍歷所有的支持的擴展點 META-INF.dubboSet<String> extensions = extensionLoader.getSupportedExtensions();for (String extension : extensions){String result = extensionLoader.getExtension(extension).sayHello();System.out.println(result);}}
  • 調用指定的
public static void main(String[] args) {//test://localhost/hello? 可以先隨便寫// hello.service=dog 表示 HelloService類的 配置的key為dog的接口URL url = URL.valueOf("test://localhost/hello?hello.service=dog");HelloService adaptiveExtension = ExtensionLoader.getExtensionLoader(HelloService.class).getAdaptiveExtension();String msg = adaptiveExtension.sayHello(url);System.out.println(msg);}

4.dubbo使用

1.api模塊

//公共模塊 service-api public interface HelloService {String sayHello(String name); }

2.provider模塊

//package com.lagou.service.impl; //dubbo的service注解 @Service public class HelloServiceImpl implements HelloService {@Overridepublic String sayHello(String name) {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}return "hello:"+name;} } //====Main方法 public class DubboPureMain {public static void main(String[] args) throws Exception{AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProviderConfiguration.class);context.start();System.in.read();}@Configuration@EnableDubbo(scanBasePackages = "com.lagou.service.impl")@PropertySource("classpath:/dubbo-provider.properties")static class ProviderConfiguration{@Beanpublic RegistryConfig registryConfig(){RegistryConfig registryConfig = new RegistryConfig();registryConfig.setAddress("zookeeper://127.0.0.1:2181?timeout=10000");//registryConfig.setTimeout(10000);return registryConfig;}}}

resources中

dubbo-consumer.properties

dubbo.application.name=service-consumer dubbo.registry.address=zookeeper://127.0.0.1:2181 dubbo.consumer.timeout=4000 ##運維命令 dubbo.application.qosEnable=true dubbo.application.qosPort=33333 dubbo.application.qosAcceptForeignIp=false

3.consumer模塊

@Component public class ComsumerComponet {//dubbo的reference注解@Referenceprivate HelloService helloService;public String sayHello(String name){return helloService.sayHello(name);}} //================================= //Main方法 public class AnnotationConsumerMain {public static void main(String[] args) throws Exception {System.out.println("-------------");AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class);context.start();// 獲取消費者組件ComsumerComponet service = context.getBean(ComsumerComponet.class);while(true){System.in.read();String hello = service.sayHello("world");System.out.println("result:"+hello);}}@Configuration@PropertySource("classpath:/dubbo-consumer.properties")@ComponentScan(basePackages = "com.lagou.bean")@EnableDubbostatic class ConsumerConfiguration{} }

#=============

3.過濾器配置

@Activate(group = {CommonConstants.CONSUMER,CommonConstants.PROVIDER}) public class DubboInvokeFilter implements Filter {@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {long startTime = System.currentTimeMillis();try {// 執行方法return invoker.invoke(invocation);} finally {System.out.println("invoke time:"+(System.currentTimeMillis()-startTime) + "毫秒");}} }

在Resources配置文件中配置

META-INF/dubbo/org.apache.dubbo.rpc.Filter

timeFilter=com.lagou.filter.DubboInvokeFilter

在使用的模塊中 添加依賴即可

4.負載均衡

public class OnlyFirstLoadbalancer implements LoadBalance {@Overridepublic <T> Invoker<T> select(List<Invoker<T>> list, URL url, Invocation invocation) throws RpcException {// 所有的服務提供者 按照IP + 端口排序 選擇第一個return list.stream().sorted((i1,i2)->{final int ipCompare = i1.getUrl().getIp().compareTo(i2.getUrl().getIp());if(ipCompare == 0){return Integer.compare(i1.getUrl().getPort(),i2.getUrl().getPort());}return ipCompare;}).findFirst().get();} }

在Resources配置文件中配置

META-INF/dubbo/org.apache.dubbo.rpc.cluster.LoadBalance

onlyFirst=com.laogu.loadbalance.OnlyFirstLoadbalancer

在Counsumer中使用

@Component public class ConsumerComponent {//在dubbo的注解這塊 添加負載均衡@Reference(loadbalance = "onlyFirst")private HelloService helloService;public String sayHello(String name, int timeToWait) {return helloService.sayHello(name, timeToWait);}}

5.線程池

官網 : http://dubbo.apache.org/zh-cn/docs/user/demos/thread-model.html

public class WachingThreadPool extends FixedThreadPool implements Runnable{private static final Logger LOGGER = LoggerFactory.getLogger(WachingThreadPool.class);// 定義線程池使用的閥值private static final double ALARM_PERCENT = 0.90;private final Map<URL, ThreadPoolExecutor> THREAD_POOLS = new ConcurrentHashMap<>();public WachingThreadPool(){// 每隔3秒打印線程使用情況Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this,1,3, TimeUnit.SECONDS);}// 通過父類創建線程池@Overridepublic Executor getExecutor(URL url) {final Executor executor = super.getExecutor(url);if(executor instanceof ThreadPoolExecutor){THREAD_POOLS.put(url,(ThreadPoolExecutor)executor);}return executor;}@Overridepublic void run() {// 遍歷線程池for (Map.Entry<URL,ThreadPoolExecutor> entry: THREAD_POOLS.entrySet()){final URL url = entry.getKey();final ThreadPoolExecutor executor = entry.getValue();// 計算相關指標final int activeCount = executor.getActiveCount();final int poolSize = executor.getCorePoolSize();double usedPercent = activeCount / (poolSize*1.0);LOGGER.info("線程池執行狀態:[{}/{}:{}%]",activeCount,poolSize,usedPercent*100);if (usedPercent > ALARM_PERCENT){LOGGER.error("超出警戒線! host:{} 當前使用率是:{},URL:{}",url.getIp(),usedPercent*100,url);}}} } public class WachingThreadPool extends FixedThreadPool implements Runnable{private static final Logger LOGGER = LoggerFactory.getLogger(WachingThreadPool.class);// 定義線程池使用的閥值private static final double ALARM_PERCENT = 0.90;private final Map<URL, ThreadPoolExecutor> THREAD_POOLS = new ConcurrentHashMap<>();public WachingThreadPool(){// 每隔3秒打印線程使用情況Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this,1,3, TimeUnit.SECONDS);}// 通過父類創建線程池@Overridepublic Executor getExecutor(URL url) {final Executor executor = super.getExecutor(url);if(executor instanceof ThreadPoolExecutor){THREAD_POOLS.put(url,(ThreadPoolExecutor)executor);}return executor;}@Overridepublic void run() {// 遍歷線程池for (Map.Entry<URL,ThreadPoolExecutor> entry: THREAD_POOLS.entrySet()){final URL url = entry.getKey();final ThreadPoolExecutor executor = entry.getValue();// 計算相關指標final int activeCount = executor.getActiveCount();final int poolSize = executor.getCorePoolSize();double usedPercent = activeCount / (poolSize*1.0);LOGGER.info("線程池執行狀態:[{}/{}:{}%]",activeCount,poolSize,usedPercent*100);if (usedPercent > ALARM_PERCENT){LOGGER.error("超出警戒線! host:{} 當前使用率是:{},URL:{}",url.getIp(),usedPercent*100,url);}}} }

在provider中配置文件中添加線程池

dubbo.application.name=dubbo-demo-annotation-provider dubbo.protocol.name=dubbo dubbo.protocol.port=20885 #dubbo.protocol.host=192.168.1.109 dubbo.provider.threadpool=watching#dubbo.protocol.telnet=clear,exit,help,status,log,ls,ps,cd,pwd,invoke,trace,count,select,shutdown

6.服務降級

服務降級,當服務器壓力劇增的情況下,根據當前業務情況及流量對一些服務有策略的降低服務級別,以釋放服務器資源,保證核心任務的正常運行。

  • 在 dubbo 管理控制臺配置服務降級

屏蔽和容錯

mock=force:return+null

表示消費方對該服務的方法調用都直接返回 null 值,不發起遠程調用。用來屏蔽不重要服務不可用時對調用方的影響。

mock=fail:return+null

表示消費方對該服務的方法調用在失敗后,再返回 null 值,不拋異常。用來容忍不重要服務不穩定時對調用方的影響。

  • 指定返回簡單值或者null
<dubbo:reference id="xxService" check="false" interface="com.xx.XxService" timeout="3000" mock="return null" /> <dubbo:reference id="xxService2" check="false" interface="com.xx.XxService2" timeout="3000" mock="return 1234" />
  • 使用java代碼 動態寫入配置中心
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension() ; Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://IP:端 口")); registry.register(URL.valueOf("override://0.0.0.0/com.foo.BarService?category=configurators&dynamic=false&application=foo&mock=force:return+null"));

dubbo的控制臺客戶端

1.從git 上下載項目 https://github.com/apache/dubbo-admin 2.修改項目下的dubbo.properties文件 注意 dubbo.registry.address對應的值需要對應當前使用的Zookeeper的ip地址和端口號 dubbo.registry.address=zookeeper://zk所在機器ip:zk端口 dubbo.admin.root.password=root dubbo.admin.guest.password=guest 3.切換到項目所在的路徑 使用mvn 打包 mvn clean package -Dmaven.test.skip=true 4.java 命令運行 java -jar 對應的jar包

dubbo的補充

1.dubbo中的Url

  • protocol:一般是 dubbo 中的各種協議 如:dubbo thrift http zk
  • username/password:用戶名/密碼
  • host/port:主機/端口
  • path:接口名稱
  • parameters:參數鍵值對

任意的一個領域中的一個實現都可以認為是一類 URL,dubbo 使用 URL 來統一描述了元數據,配置信息,貫穿在整個框架之中。

http://dubbo.apache.org/zh-cn/blog/introduction-to-dubbo-url.html

dubbo的設計

http://dubbo.apache.org/zh-cn/docs/dev/design.html

作業思路

作業二

服務端:

? 定義一個類,三個方法,用隨機數100秒 隨機

過濾器:

? 方法執行時,記錄時間存儲到Map

? key是 方法名

? value是一個Map

? key是方法最后的執行時間

? value是方法的執行消耗時間

消費端:

? 開啟兩個線程

? 1.循環執行三個方法

? 2.讀取Map,key

? 根據key剔除掉一分鐘之前的數據

? Map按照時間戳排序

? T90取size()*90的數據

消費端

//消費端public class AnnotationConsumerMain {public static void main(String[] args) throws Exception {AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class);context.start();ConsumerComponent service = context.getBean(ConsumerComponent.class);//定義線程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,2,100, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10));//定義三個方法執行的方法Runnable consumer = new Thread(() -> {try {service.consumer();} catch (InterruptedException e) {e.printStackTrace();}});threadPoolExecutor.execute(consumer);//定義監控的方法Runnable consumer1 = new Thread(() -> {PrintMethodCost.print();});threadPoolExecutor.execute(consumer1);}@Configuration@PropertySource("classpath:/dubbo-consumer.properties")//@EnableDubbo(scanBasePackages = "com.lagou.bean")@ComponentScan("com.lagou.bean")@EnableDubbostatic class ConsumerConfiguration {} }///監控的方法 public class PrintMethodCost {public static void print() {while (true) {//取執行方法的集合//key 方法名//value 是 方法的最后執行時間-執行毫秒數Map<String, HashMap<Long, Long>> methodCount = MethodCostTimeCount.getMethodCount();long currentTimeMillis = System.currentTimeMillis();//現在的時間-60秒就是 開始的時間;long oneMinuteBefore = currentTimeMillis - 60000;for (Map.Entry<String, HashMap<Long, Long>> entry : methodCount.entrySet()) {//移除Iterator iterator = entry.getValue().entrySet().iterator();while (iterator.hasNext()) {Map.Entry<Long, Long> key = (Map.Entry<Long, Long>) iterator.next();if (key.getKey() < oneMinuteBefore) {iterator.remove();}}HashMap<Long, Long> oneMinuteMap = entry.getValue();//按照value值進行升序排序List<Map.Entry<Long, Long>> list = new ArrayList<>(oneMinuteMap.entrySet());/* Collections.sort(list, new Comparator<Map.Entry<Long, Long>>() {public int compare(Map.Entry<Long, Long> o1, Map.Entry<Long, Long> o2) {return o1.getValue().compareTo(o2.getValue());//利用String類的compareTo方法}});*/Collections.sort(list, Comparator.comparing(Map.Entry::getValue));int index90 = (int) (list.size() * 0.9);int index99 = (int) (list.size() * 0.99);Long tp90Time = list.get(index90).getKey();Long tp90Cost = list.get(index90).getValue();System.out.println("T90方法名="+entry.getKey() + "毫秒數=" + tp90Cost + " ms,方法執行時間 : " + tp90Time);Long tp99Time = list.get(index99).getKey();Long tp99Cost = list.get(index99).getValue();System.out.println("T99方法名="+entry.getKey() + "毫秒數=" + tp99Cost + " ms,方法執行時間 : " + tp99Time);}try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {System.out.println("Thread sleep exception!");}System.out.println("==============================");}} } //執行方法的 @Component public class ConsumerComponent {@Referenceprivate HelloService helloService;private volatile Integer num =0;private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8,8,100, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1000));public void consumer() throws InterruptedException {Runnable runnableA = new Thread(() -> helloService.sayHello1());Runnable runnableB = new Thread(() -> helloService.sayHello2());Runnable runnableC = new Thread(() -> helloService.sayHello3());while (true){TimeUnit.MILLISECONDS.sleep(70);try{threadPoolExecutor.execute(runnableA);threadPoolExecutor.execute(runnableB);threadPoolExecutor.execute(runnableC);} catch (RejectedExecutionException e){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException ex) {System.out.println("Thread sleep exception!");}}}}}

過濾器

//過濾器存儲類 public class MethodCostTimeCount {public static Map<String, HashMap<Long, Long>> methodCount = new ConcurrentHashMap<>();public static void put(String method, Long time, Long cost){HashMap<Long, Long> hashMap = methodCount.get(method);if(hashMap == null){hashMap = new HashMap<>();methodCount.put(method, hashMap);}hashMap.put(time, cost);}public static Map<String, HashMap<Long, Long>> getMethodCount() {return methodCount;} } @Activate(group = {CommonConstants.CONSUMER}) public class TPMonitorFilter implements Filter {@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {String methodName = invocation.getMethodName();long startTime = System.currentTimeMillis();try {// 執行方法return invoker.invoke(invocation);} finally {long endTime = System.currentTimeMillis();long costTime = endTime - startTime;//添加到集合MethodCostTimeCount.put(methodName, endTime, costTime);System.out.println("invoke time:"+costTime + "毫秒");}} }

附錄

1.spring注解開發AnnotationConfigApplicationContext的使用

使用AnnotationConfigApplicationContext可以實現基于Java的配置類(包括各種注解)加載Spring的應用上下文。避免使用application.xml進行配置。相比XML配置,更加便捷。

2.spring的上下文

Spring有兩個核心接口:BeanFactory和ApplicationContext,其中ApplicationContext是BeanFactory的子接口。他們都可代表Spring容器,Spring容器是生成Bean實例的工廠,并且管理容器中的Bean

Spring容器最基本的接口就是BeanFactor。BeanFactory負責配置、創建、管理Bean,

他有一個子接口:ApplicationContext,因此也稱之為Spring上下文。

Spring容器負責管理Bean與Bean之間的依賴關系。

https://www.cnblogs.com/chenssy/archive/2012/11/15/2772287.html

3.java8的Stream流編程

Stream 是 Java8 中處理集合的關鍵抽象概念

中間操作

? filter:過濾流中的某些元素
? limit(n):獲取n個元素
? skip(n):跳過n元素,配合limit(n)可實現分頁
? distinct:通過流中元素的 hashCode() 和 equals() 去除重復元素

Stream<Integer> stream = Stream.of(6, 4, 6, 7, 3, 9, 8, 10, 12, 14, 14);Stream<Integer> newStream = stream.filter(s -> s > 5) //6 6 7 9 8 10 12 14 14.distinct() //6 7 9 8 10 12 14.skip(2) //9 8 10 12 14.limit(2); //9 8 newStream.forEach(System.out::println);

https://blog.csdn.net/y_k_y/article/details/84633001

4.Thread.sleep()和TimeUnit.SECONDS.sleep()的區別與聯系

TimeUnit對Thread.sleep方法的包裝,實現是一樣的,只是多了時間單位轉換和驗證

源碼底層

//TimeUnit源碼 public void sleep(long timeout) throws InterruptedException {if (timeout > 0) {long ms = toMillis(timeout);int ns = excessNanos(timeout, ms);Thread.sleep(ms, ns);}}

5.Future是什么?使用?

https://www.cnblogs.com/cz123/p/7693064.html

https://www.jianshu.com/p/b8952f07ee5d

6.TP90、TP99耗時監控設計與實現

https://blog.csdn.net/u012472945/article/details/105611155

TP50:指在一個時間段內(如5分鐘),統計該方法每次調用所消耗的時間,并將這些時間按從小到大的順序進行排序,取第50%的那個值作為TP50的值;配置此監控指標對應的報警閥值后,需要保證在這個時間段內該方法所有調用的消耗時間至少有50%的值要小于此閥值,否則系統將會報警

7.粘包與沾包

8.線程池OOM

https://mp.weixin.qq.com/s/1ZRkW4ND8y1KqRSFY_7_-A

作業

dubbo定義攔截器獲取 :Ip獲取白名單 :https://www.jianshu.com/p/98d68d57f62a

單詞

manual 手動的

Extension 拓展

retention 保留

Policy 政策

exhaust 廢棄,疲憊

excess超過

convenience 方便,適宜

Scheduled 安排

business 商業,業務

cluster 群,聚集

protocol 協議

attachment附件

總結

以上是生活随笔為你收集整理的dubbo笔记+源码刨析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。