日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

es数据频繁的更新_es之文档更新过程中并发冲突问题

發布時間:2023/12/2 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 es数据频繁的更新_es之文档更新过程中并发冲突问题 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1:樂觀鎖控制

ES是分布式的,也是異步并發的,我們的復制請求是并行發送的;這就意味著請求到達目的地的順序是不可控制的,是亂序的;

如果是亂序的方式,很有可能出現這樣的一個問題,新version的文檔被舊version的文檔覆蓋掉—-數據丟失,或者直接拋異常;

TransportClient client = null;

?

@Before

public void testConn(){

?

try {

Settings settings = Settings.builder()

.put("cluster.name", "cluster_es").build();

client = new PreBuiltTransportClient(settings)

.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("hadoop01"), 9300));

System.out.println("========連接成功=============");

} catch (UnknownHostException e) {

e.printStackTrace();

}

}

?

?

/**

* upsert

* */

@Test

public void upsertDocument2() throws InterruptedException {

?

?

ExecutorService executorService = Executors.newFixedThreadPool(3);

for (int i = 0; i < 10; i++){

executorService.execute(new Thread1());

?

}

Thread.sleep(10000);

executorService.shutdown();

?

}

?

class Thread1 implements Runnable {

?

public void run() {

System.out.println("*************" + Thread.currentThread().getName() + " *************");

// 設置查詢條件, 查找不到則添加

IndexRequest indexRequest = null;

try {

indexRequest = new IndexRequest("website", "blog", "1")

.source(XContentFactory.jsonBuilder()

.startObject()

.field("id", "1")

.endObject());

// 設置更新, 查找到更新下面的設置

UpdateRequest upsert = new UpdateRequest("website", "blog", "1")

.doc(XContentFactory.jsonBuilder()

.startObject()

.field("process_id", Thread.currentThread().getId())

.endObject())

.upsert(indexRequest);

?

client.update(upsert).get();

} catch (Exception e) {

e.printStackTrace();

}

?

}

}

?

?

@After

public void close(){

client.close();

}

?

?

所以在分布式異步并發場景中,需要一種方式:新版本的文檔不會被舊版本的文檔覆蓋——【樂觀鎖】

Elasticsearch使用這個 _version 號來確保變更以正確順序得到執行。如果舊版本的文檔在新版本之后到達,它可以被簡單的忽略。

我們可以利用 _version 號來確保 應用中相互沖突的變更不會導致數據丟失。我們通過指定想要修改文檔的 version 號來達到這個目的。 如果該版本不是當前版本號,我們的請求將會失敗。

新建一個文檔,這個時候我們可以看到新文檔的版本號_version=1:

PUT /website/blog/1/_create

{

"title" : "this is title" ,

"txt" : "just do it"

}

?

現在嘗試通過重建文檔索引來保存修改數據:

請求成功,并且響應體告訴我們 _version 已經遞增到 2

PUT /website/blog/1?version=1

{

"title" : "this is test" ,

"txt" : "just do it"

}

?

然而,如果我們重新運行相同的索引請求,仍然指定 version=1 , Elasticsearch 返回 409 ConflictHTTP 響應碼,和一個如下所示的響應體:

以上通過version的控制,可以讓es在并行情況下操作而不出現丟失數據的現象,這種樂觀鎖的操作是比較常用的;

2:通過外部系統進行版本控制

上面我們講到的是基于version進行版本的控制。在分布式環境下,只要version不同,那么修改就會報錯;

而通過外部系統進行控制:version_type=external,只有當你提供的version比es中的_version大的時候,才能完成修改

_versionversion_type=external

只有_versioin相同,才會執行修改

只有當你提供的version比es中的_version大的時候,才能完成修改

例如,要創建一個新的具有外部版本號 5 的博客文章,我們可以按以下方法進行:

PUT /website/blog/2?version=5&version_type=external

{

"title": "My first external blog entry",

"text": ?"Starting to get the hang of this..."

}

?

現在我們更新這個文檔,指定一個新的 version 號是 10 :

PUT /website/blog/2?version=10&version_type=external

{

"title": "My first external blog entry",

"text": ?"This is a piece of cake..."

}

?

version_type=external能夠修改的條件就是:提供的版本號必須比_version大

如果此時插入版本號比現在的_version小的,就會報錯:

3:重復提交retry_on_conflict

elasticsearch設計的目的就是多用戶的海量數據操作;

那么可能存在這樣場景:A進程接收到請求嘗試去檢索(retrieve)和重建索引(reindex)某個文檔C,B進程也接收到請求檢索(retrieve)和重建索引(reindex)文檔C;

那么這個時候就會出現:其中一個進程提前修改了文檔C,然后另一個進程在做檢索的時候,因為_version改變了,所以匹配不到文檔C,操作就會失敗,然后數據丟失

這就是在并發操作的時候經常出現的現象;

解決:

對于多用戶的更新操作,文檔被修改了并不要緊,如果出現了匹配不到的現象,我們只要重新在操作一遍就可以了;所以需要使用關鍵字retry_on_conflict(默認0)

POST /website/pageviews/1/_update?retry_on_conflict=5

{

"script" : "ctx._source.views+=1",

"upsert": {

"views": 0

}

}

?

retry_on_conflict=5 代表如果出現失敗,最大可以重復五次的update操作

5.7.6:悲觀鎖控制【無用】

類似傳統數據庫————mysql,在處理并發的時候,為了防止出現沖突的問題,就會使用悲觀鎖;

這種方法被關系型數據庫廣泛使用,它假定有變更沖突可能發生,因此阻塞訪問資源以防止沖突。

一個典型的例子是讀取一行數據之前先將其鎖住,確保只有放置鎖的線程能夠對這行數據進行修改(想想java中的synchronize)。

5.7.6.1:全局鎖(無用)

只允許一個線程進行執行更新操作,這樣能夠避免并發性問題,在es中,全局鎖是將一份文檔是否存在作為依據

獲取一個全局鎖:

PUT website/blog/1/_create

{}

這樣就上鎖了,然后使用java的多線程做測試,在里面修改數據

TransportClient client = null;

?

@Before

public void testConn(){

?

try {

Settings settings = Settings.builder()

.put("cluster.name", "cluster").build();

client = new PreBuiltTransportClient(settings)

.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("hadoop01"), 9300));

System.out.println("========連接成功=============");

} catch (UnknownHostException e) {

e.printStackTrace();

}

}

?

?

/**

* upsert

* */

@Test

public void upsertDocument2() throws InterruptedException {

?

?

ExecutorService executorService = Executors.newFixedThreadPool(1);//線程數為1是全局鎖

for (int i = 0; i < 10; i++){

executorService.execute(new Thread1());

?

}

Thread.sleep(10000);

executorService.shutdown();

?

}

?

class Thread1 implements Runnable {

?

public void run() {

System.out.println("*************" + Thread.currentThread().getName() + " *************");

// 設置查詢條件, 查找不到則添加

IndexRequest indexRequest = null;

try {

indexRequest = new IndexRequest("website", "blog", "1")

.source(XContentFactory.jsonBuilder()

.startObject()

.field("id", "1")

.endObject());

// 設置更新, 查找到更新下面的設置

UpdateRequest upsert = new UpdateRequest("website", "blog", "1")

.doc(XContentFactory.jsonBuilder()

.startObject()

.field("process_id", Thread.currentThread().getId())

.endObject())

.upsert(indexRequest);

?

client.update(upsert).get();

} catch (Exception e) {

e.printStackTrace();

}

?

}

}

?

?

@After

public void close(){

client.close();

}

?

?

如果另一個進行想同時在創建一個website/blog/1 就會拋異常

釋放全局鎖:

全局鎖必須通過刪除來釋放:

DELETE website/blog/1

優點:操作非常簡單,非常容易使用,成本低缺點:你直接就把整個index給上鎖了,這個時候對index中所有的doc的操作,都會被block住,導致整個系統的并發能力很低

5.7.6.2:document文檔鎖(無用)

這種鎖比全局鎖的粒度小,因為全局鎖是鎖定整個index,那么文檔所就是針對單個文檔完成鎖定

上鎖的方式依賴groovy腳本:/config/scripts

vim documentLock.groovy 【腳本需要上傳到所有節點】

if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';

腳本的意思:

如果當前傳入的process_id和設定的process_id不一致,就拋異常assert false

如果一致的,返回'noop'

插入一個文檔:

PUT website/blog/1

{

"id" : 1,

"process_id" : 234

}

?

對當前文檔上文檔鎖:

POST /website/blog/1/_update

{

"upsert": { "process_id": 234 },

"script": {

"lang": "groovy",

"file": "documentLock",

"params": {

"process_id": 234

}

}

}

?

注意,當前設定的"process_id": 234,如果此時換一個"process_id" : 123,那么就會拋異常:assert false

比如:

POST /website/blog/1/_update

{

"upsert": { "process_id": 123 },

"script": {

"lang": "groovy",

"file": "documentLock",

"params": {

"process_id": 123

}

}

}

?

注意:如果傳入的是"process_id": 234,傳入正確參數,直接返回ctx.op = 'noop'

POST /website/blog/1/_update

{

"upsert": { "process_id": 234 },

"script": {

"lang": "groovy",

"file": "documentLock",

"params": {

"process_id": 234

}

}

}

?

如何釋放悲觀鎖 , 刪除對應的process_id數據即可:

DELETE website/blog/1

{

"query": {

"term": {

"process_id": 234

}

}

}

?

文檔級鎖可以實現細粒度的訪問控制,但是當文檔數量達到百分甚至上千萬的時候,這種方式開銷是比較昂貴的

5.7.6.3:共享鎖和排它鎖(無用)

共享鎖:數據是共享的,多個線程可以獲取同一個數據的共享鎖,然后對這個數據執行讀操作 排它鎖:只能有一個線程獲取排它鎖,然后執行更新操作

在config/scripts下 vim gongxiang_paita.groovy

if (ctx._source.lock_type == 'exclusive') {

assert false

} else {

ctx._source.lock_count++

}

?

腳本意思:

如果其他線程共享:ctx._source.lock_count++

POST /website/blog/1/_update

{

"upsert": {

"lock_type": ?"shared",

"lock_count": 1

},

"script": {

"lang": "groovy",

"file": "gongxiang_paita"

}

}

?

如果其他線程添加排他鎖'exclusive',那么拋異常:

(1):將共享share標記修改成排他exclusive標記

POST /website/blog/1/_update

{

"doc" : {

"lock_type": "exclusive"

}

}

?

(2):修改成排他標記后,在嘗試共享修改操作,報錯

POST /website/blog/1/_update

{

"upsert": {

"lock_type": ?"shared",

"lock_count": 1

},

"script": {

"lang": "groovy",

"file": "gongxiang_paita"

}

}

?

如何釋放鎖:

Vim unlock.groovy

if (ctx._source.lock_type == "shared") {ctx._source.lock_count --};

if (ctx._source.lock_count == 0) { ctx.op = 'delete' };

?

腳本意思:

ctx._source.lock_type == "shared" 則lock_count—

當lock_count == 0,那么刪除/website/blog/1

(1):GET website/blog/1 查看一下,當前是共享鎖還是排它鎖;

(2): 如果是排他鎖,需要修改會共享鎖

POST /website/blog/1/_update

{

"doc" : {

"lock_type": "shared"

}

}

(3):釋放共享鎖

POST /website/blog/1/_update

{

"upsert": {

"lock_type": ?"shared",

"lock_count": 1

},

"script": {

"lang": "groovy",

"file": "unlock"

}

}

?

這樣就釋放了共享鎖;

總結

以上是生活随笔為你收集整理的es数据频繁的更新_es之文档更新过程中并发冲突问题的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。