Elasticsearch新增一个字段并赋值
先吐槽幾句,最近本博主一直在做數(shù)據(jù)平臺的事,越發(fā)覺得做數(shù)據(jù)平臺難,尤其數(shù)據(jù)量很大的情況下,然而一旦問題解決,又馬上覺得峰回路轉(zhuǎn),蠻有成就感。
下面就介紹一下,在已經(jīng)存有大量數(shù)據(jù)的ES索引中(博主處理的大概在1億7千萬條),向一個type中添加一個新字段并賦給一個值。
說明,cimissgcdb是index,agmedays是type。
首先來查看一下原始的mapping:
GET /cimissgcdb/agmedays/_mapping查看一下該表數(shù)據(jù)量多大,心里有個底:
POST cimissgcdb/agmedays/_search/ {"query": {"match_all": {}} }然后添加一個新字段,并設置字段的數(shù)據(jù)類型(ES的數(shù)據(jù)類型比如Integer、String、Double、Date等),博主這里新增了一個date類型的時間字段TimeFormat,并進行了format:
PUT cimissgcdb/_mapping/agmedays {"properties": {"TimeFormat": {"type": "date","format": "yyyy-MM-dd HH:mm:ss"}} }接下來給新字段賦值,有2種方式:
1. 使用painless
ES 5.0版本后推出painless。何為painless,推薦參考這2篇博文:
elasticsearch painless最強教程
elasticsearch painless最強教程 二
由于博主的ES用的還是低版本2.3.2的,所以第一種方法用不了,那只能使用第二種方式了。
2. 自己通過代碼重寫,由于博主的新字段值需要通過已經(jīng)存在字段的值來計算,所以,自己動手寫java api來實現(xiàn)。
public static void updateHourByScroll(String Type) throws IOException, ExecutionException, InterruptedException {System.out.println("scroll 模式啟動!");Date begin = new Date();SearchResponse scrollResponse = client.prepareSearch(Index).setTypes(TYPE).setSearchType(SearchType.SCAN).setSize(5000).setScroll(TimeValue.timeValueMinutes(1)).execute().actionGet();long count = scrollResponse.getHits().getTotalHits();//第一次不返回數(shù)據(jù)for(int i=0,sum=0; sum<count; i++) {scrollResponse = client.prepareSearchScroll(scrollResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(8)).execute().actionGet();sum += scrollResponse.getHits().hits().length;SearchHits searchHits = scrollResponse.getHits();List<UpdateRequest> list = new ArrayList<UpdateRequest>();for (SearchHit hit : searchHits) {String id = hit.getId();Map<String, Object> source = hit.getSource();if (source.containsKey("TimeFormat")) { //這個很重要,如果中間過程失敗了,在執(zhí)行時,起到過濾作用,提高效率。System.out.println("TimeFormat已經(jīng)存在!");}else{Integer year = Integer.valueOf(source.get("Year").toString());Integer month = Integer.valueOf(source.get("Mon").toString());Integer day = Integer.valueOf(source.get("Day").toString());Integer hour = 0;if(source.containsKey(""Hour"")){ //處理Hour不存在的情況hour = Integer.valueOf(source.get("Hour").toString());}else{hour = 0;}String time = getyear_month_day_hour(year, month, day, hour); //這個方法自定義,用來生成新字段TimeFormat的值,按需修改即可。System.out.println(time);UpdateRequest uRequest = new UpdateRequest().index(Index).type(Type).id(id).doc(jsonBuilder().startObject().field("TimeFormat", time).endObject());list.add(uRequest); //client.update(uRequest).get(); //注釋上一行,就是單個提交,大數(shù)據(jù)量效率很低,用一個list來使用bulk,批量提高效率}}// 批量執(zhí)行BulkRequestBuilder bulkRequest = client.prepareBulk();for (UpdateRequest uprequest : list) {bulkRequest.add(uprequest);}BulkResponse bulkResponse = bulkRequest.execute().actionGet();if (bulkResponse.hasFailures()) {System.out.println("批量錯誤!");}System.out.println("總量" + count + " 已經(jīng)查到" + sum);}Date end = new Date();System.out.println("耗時: "+(end.getTime()-begin.getTime())); }上面方法就對1億7千萬條數(shù)據(jù)重新查詢?nèi)缓髮懥艘槐?#xff0c;用到Scroll游標查詢。
對于游標查詢和分頁查看文章:Elasticsearch分頁查詢From&Size VS scroll
這樣就完成了對一個已經(jīng)存在的ES表新增字段并賦值,但是切記,這樣的操作只是對表已經(jīng)存在的歷史記錄的改變,對于新來的數(shù)據(jù),要修改ES寫入新字段的過程。
總結(jié)
以上是生活随笔為你收集整理的Elasticsearch新增一个字段并赋值的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Elasticsearch分页查询Fro
- 下一篇: Hbase中的列式表映射到hive的外表