使用Java客户端操作elasticsearch
常見的配置
前面已介紹過,RestClientBuilder支持同時(shí)提供一個(gè)RequestConfigCallback和一個(gè)HttpClientConfigCallback,你可以定制 the Apache Async Http Client 公開的配置。這兩個(gè)回調(diào)函數(shù)可以修改某些特定的行為,而不會(huì)覆蓋RestClient初始化的所有其他默認(rèn)配置。 本節(jié)介紹一些需要為客戶端進(jìn)行額外配置的常見場景。
Timeouts
啥都不說了,直接上代碼。該例子演示了連接超時(shí)(默認(rèn)為1秒)和套接字超時(shí)(默認(rèn)為30秒)。 也相應(yīng)地調(diào)整最大重試超時(shí)時(shí)間(默認(rèn)為30秒)。
?
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200)).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {//該方法接收一個(gè)RequestConfig.Builder對(duì)象,對(duì)該對(duì)象進(jìn)行修改后然后返回。 @Overridepublic RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {return requestConfigBuilder.setConnectTimeout(5000) //連接超時(shí)(默認(rèn)為1秒).setSocketTimeout(60000);//套接字超時(shí)(默認(rèn)為30秒)}}).setMaxRetryTimeoutMillis(60000);//調(diào)整最大重試超時(shí)時(shí)間(默認(rèn)為30秒)?
線程數(shù)
The Apache Http Async Client默認(rèn)啟動(dòng)一個(gè)dispatcher線程和供連接管理器使用的多個(gè)worker線程,與本地檢測(cè)到的處理器數(shù)量一樣多(取決于Runtime.getRuntime().availableProcessors()的返回值)。 修改線程數(shù)可以如下操作:
?
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200)).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {return httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());}});?
基本認(rèn)證
同樣直接上代碼
?
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("user", "password"));RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200)).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {// 該方法接收HttpAsyncClientBuilder的實(shí)例作為參數(shù),對(duì)其修改后進(jìn)行返回@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);//提供一個(gè)默認(rèn)憑據(jù)}});?
搶占式認(rèn)證可以被禁用,這意味著每個(gè)請(qǐng)求都將被發(fā)送,不用去看授權(quán)請(qǐng)求頭,在收到HTTP 401響應(yīng)后,會(huì)再次發(fā)送相同的請(qǐng)求,這次會(huì)帶上基本的身份認(rèn)證頭,如果你想這樣做,那么你可以通過HttpAsyncClientBuilder來禁用它:
?
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("user", "password"));RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200)).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {httpClientBuilder.disableAuthCaching(); //禁用搶占式身份驗(yàn)證return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);}});?
加密通信
加密通信也可以通過HttpClientConfigCallback進(jìn)行配置。 參數(shù)HttpAsyncClientBuilder公開了配置加密通信的多種方法:
?
setSSLContext,setSSLSessionStrategy和setConnectionManager,重要性依次增加。 以下是一個(gè)例子: KeyStore truststore = KeyStore.getInstance("jks"); try (InputStream is = Files.newInputStream(keyStorePath)) {truststore.load(is, keyStorePass.toCharArray()); } SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null); final SSLContext sslContext = sslBuilder.build(); RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "https")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {return httpClientBuilder.setSSLContext(sslContext);}});?
如果未提供明確的配置,則將使用系統(tǒng)默認(rèn)配置。
其他
如果對(duì)其他配置需要修改,可以參考Apache HttpAsyncClient文檔:https://hc.apache.org/httpcomponents-asyncclient-4.1.x/?
如果您的應(yīng)用程序在安全管理器下運(yùn)行,可能會(huì)采用JVM默認(rèn)緩存策略:
- A. 域名能夠正確解析的IP地址將會(huì)永久緩存;
- B. 域名解析出錯(cuò)的IP地址會(huì)默認(rèn)緩存10S;
如果客戶端連接到的主機(jī)的地址隨時(shí)間變化,那么可能你想修改默認(rèn)的JVM行為。這些可以通過添加?networkaddress.cache.ttl=<timeout>?和?networkaddress.cache.negative.ttl=<timeout>??到您的Java安全策略進(jìn)行修改。
嗅探器
從運(yùn)行的Elasticsearch集群中自動(dòng)發(fā)現(xiàn)節(jié)點(diǎn),并將其設(shè)置到現(xiàn)有的RestClient實(shí)例。 默認(rèn)情況下,它將使用Nodes Info api來檢索屬于集群的節(jié)點(diǎn),并使用jackson解析響應(yīng)的json數(shù)據(jù)。看完之后還是不清除說的個(gè)啥?那就仔細(xì)說說吧。之前我們都是像下面這樣創(chuàng)建客戶端實(shí)例的。
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http"),new HttpHost("localhost", 9201, "http")).build();假設(shè)一個(gè)集群有100個(gè)節(jié)點(diǎn),如果手動(dòng)?new?HttpHost("localhost", 9200, "http")?這樣創(chuàng)建100個(gè)HttpHost也可以,但是可能會(huì)寫到手軟,出錯(cuò)概率極大。所以就出現(xiàn)了sniffer,它來幫你做這件事。
the REST client sniffer 兼容Elasticsearch 2.x及以上版本。可以在這里找到它的javadoc。
Maven倉庫
The REST client sniffer?與Elasticsearch的發(fā)布周期相同。 發(fā)布的第一版為5.0.0-alpha4。同樣,你也可以自由替換成你想要的版本。 它和通訊的Elasticsearch版本之間沒有關(guān)聯(lián)。sniffer?支持從Elasticsearch 2.x及其以后的版本獲取節(jié)點(diǎn)列表。
Maven配置
以下是使用maven作為依賴管理器。 將以下內(nèi)容添加到您的pom.xml文件中:
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client-sniffer</artifactId><version>6.2.3</version> </dependency>Gradle配置
以下是使用gradle作為依賴管理器。 將以下內(nèi)容添加到您的build.gradle文件中:
dependencies {compile 'org.elasticsearch.client:elasticsearch-rest-client-sniffer:6.2.3' }用法
?RestClient實(shí)例創(chuàng)建后,就可以將一個(gè)Sniffer關(guān)聯(lián)到它。Sniffer使用RestClient定期(默認(rèn)每5分鐘)從集群中獲取當(dāng)前所有節(jié)點(diǎn)的列表,并通過調(diào)用RestClient的setHosts方法來更新。
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http")).build(); Sniffer sniffer = Sniffer.builder(restClient).build();關(guān)閉Sniffer是非常重要的,這樣它的后臺(tái)線程才能正常關(guān)閉并釋放所有資源。 Sniffer對(duì)象的生命周期應(yīng)與RestClient相同,并在客戶端之前關(guān)閉:
sniffer.close(); restClient.close();Sniffer默認(rèn)每5分鐘更新一次節(jié)點(diǎn)。 該時(shí)間間隔也可以自定義(以毫秒為單位),如下所示:
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http")).build(); Sniffer sniffer = Sniffer.builder(restClient).setSniffIntervalMillis(60000).build();也可以在失敗時(shí)啟用嗅探,這意味著在每次失敗后,節(jié)點(diǎn)列表將被直接更新。 此種方式需要首先創(chuàng)建SniffOnFailureListener,并在創(chuàng)建RestClient時(shí)提供。 同樣,一旦Sniffer被創(chuàng)建,它需要與同一個(gè)SniffOnFailureListener實(shí)例相關(guān)聯(lián),SniffOnFailureListener實(shí)例將在每次失敗時(shí)通知,并且會(huì)使用該Sniffer再執(zhí)行一輪嗅探。
Elasticsearch Nodes Info api在連接到節(jié)點(diǎn)時(shí)不會(huì)返回使用的協(xié)議,而只會(huì)返回它們的host:port鍵對(duì),因此默認(rèn)情況下使用http。 如果想使用https,則必須手動(dòng)創(chuàng)建ElasticsearchHostsSniffer實(shí)例,可按如下方式:
?
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http")).build(); HostsSniffer hostsSniffer = new ElasticsearchHostsSniffer(restClient,ElasticsearchHostsSniffer.DEFAULT_SNIFF_REQUEST_TIMEOUT,ElasticsearchHostsSniffer.Scheme.HTTPS); Sniffer sniffer = Sniffer.builder(restClient).setHostsSniffer(hostsSniffer).build();?
同樣也可以自定義sniffRequestTimeout,默認(rèn)為1秒。 在調(diào)用the Nodes Info api時(shí)timeout參數(shù)使用querystring方式傳遞,以便當(dāng)服務(wù)器端的超時(shí)時(shí),仍然會(huì)返回有效的響應(yīng),盡管它可能只包含群集一部分的節(jié)點(diǎn) 。
?
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http")).build(); HostsSniffer hostsSniffer = new ElasticsearchHostsSniffer(restClient,TimeUnit.SECONDS.toMillis(5),ElasticsearchHostsSniffer.Scheme.HTTP); Sniffer sniffer = Sniffer.builder(restClient).setHostsSniffer(hostsSniffer).build();?
此外,我們可能需要從外部源獲取主機(jī),而不是從Elasticsearch獲取主機(jī)。則可以自定義實(shí)現(xiàn)HostsSniffer。
?
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http")).build();HostsSniffer hostsSniffer = new HostsSniffer() {@Overridepublic List<HttpHost> sniffHosts() throws IOException {return null;//從外部獲取主機(jī)}};Sniffer sniffer = Sniffer.builder(restClient).setHostsSniffer(hostsSniffer).build();?
?
官方文檔:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_common_configuration.html
總結(jié)
以上是生活随笔為你收集整理的使用Java客户端操作elasticsearch的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ElasticSearch 实践过程中遇
- 下一篇: RxJava 2.x 教程