日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

elasticsearch索引创建create index集群matedata更新的方法

發布時間:2023/12/15 综合教程 18 生活家
生活随笔 收集整理的這篇文章主要介紹了 elasticsearch索引创建create index集群matedata更新的方法 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文小編為大家詳細介紹“elasticsearch索引創建createindex集群matedata更新的方法”,內容詳細,步驟清晰,細節處理妥當,希望這篇“elasticsearch索引創建createindex集群matedata更新的方法”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。

    更新集群index matedata

    創建索引需要創建索引并且更新集群index matedata,這一過程在MetaDataCreateIndexService的createIndex方法中完成。這里會提交一個高優先級,AckedClusterStateUpdateTask類型的task。索引創建需要即時得到反饋,異常這個task需要返回,會超時,而且這個任務的優先級也非常高。

    下面具體看一下它的execute方法,這個方法會在master執行任務時調用,這個方法非常長,主要完成以下三個功能:更新合并request,template中的mapping和setting,調用indiceService創建索引,對創建后的索引添加mapping。這一系列功能完成后,合并完成后生成新的matedata,并更新集群狀態,完成了索引的創建。

    首先創建index的create方法

    代碼如下所示:

    @Override
    publicClusterStateexecute(ClusterStatecurrentState)throwsException{
    booleanindexCreated=false;
    StringremovalReason=null;
    try{
                //檢查request的合法性,1.5版本主要檢查index名字是否合法,如不能含有某些字符,另外就是集群節點版本
    validate(request,currentState);
    for(Aliasalias:request.aliases()){//檢查別稱是否合法
    aliasValidator.validateAlias(alias,request.index(),currentState.metaData());
    }
    //查找索引模板
    List<IndexTemplateMetaData>templates=findTemplates(request,currentState,indexTemplateFilter);
    Map<String,Custom>customs=Maps.newHashMap();
    //addtherequestmapping
    Map<String,Map<String,Object>>mappings=Maps.newHashMap();
    Map<String,AliasMetaData>templatesAliases=Maps.newHashMap();
    List<String>templateNames=Lists.newArrayList();
                //取出request中的mapping配置,雖然mapping可以后面添加,多數情況創建索引的時候還是會附帶著mapping,在request中mapping是一個map
    for(Map.Entry<String,String>entry:request.mappings().entrySet()){
    mappings.put(entry.getKey(),parseMapping(entry.getValue()));
    }
                //一些預設如warm等
    for(Map.Entry<String,Custom>entry:request.customs().entrySet()){
    customs.put(entry.getKey(),entry.getValue());
    }
    //將找到的template和request中的mapping合并
    for(IndexTemplateMetaDatatemplate:templates){
    templateNames.add(template.getName());
    for(ObjectObjectCursor<String,CompressedString>cursor:template.mappings()){
    if(mappings.containsKey(cursor.key)){
    XContentHelper.mergeDefaults(mappings.get(cursor.key),parseMapping(cursor.value.string()));
    }else{
    mappings.put(cursor.key,parseMapping(cursor.value.string()));
    }
    }
    //合并custom
    for(ObjectObjectCursor<String,Custom>cursor:template.customs()){
    Stringtype=cursor.key;
    IndexMetaData.Customcustom=cursor.value;
    IndexMetaData.Customexisting=customs.get(type);
    if(existing==null){
    customs.put(type,custom);
    }else{
    IndexMetaData.Custommerged=IndexMetaData.lookupFactorySafe(type).merge(existing,custom);
    customs.put(type,merged);
    }
    }
    //處理合并別名
    for(ObjectObjectCursor<String,AliasMetaData>cursor:template.aliases()){
    AliasMetaDataaliasMetaData=cursor.value;
    //ifanaliaswithsamenamecamewiththecreateindexrequestitself,
    //ignorethisonetakenfromtheindextemplate
    if(request.aliases().contains(newAlias(aliasMetaData.alias()))){
    continue;
    }
    //ifanaliaswithsamenamewasalreadyprocessed,ignorethisone
    if(templatesAliases.containsKey(cursor.key)){
    continue;
    }
    //AllowtemplatesAliasestobetemplatedbyreplacingatokenwiththenameoftheindexthatweareapplyingitto
    if(aliasMetaData.alias().contains("{index}")){
    StringtemplatedAlias=aliasMetaData.alias().replace("{index}",request.index());
    aliasMetaData=AliasMetaData.newAliasMetaData(aliasMetaData,templatedAlias);
    }
    aliasValidator.validateAliasMetaData(aliasMetaData,request.index(),currentState.metaData());
    templatesAliases.put(aliasMetaData.alias(),aliasMetaData);
    }
    }
    //合并完template和request,現在開始處理配置基本的mapping,合并邏輯跟之前相同,只是mapping來源不同
    FilemappingsDir=newFile(environment.configFile(),"mappings");
    if(mappingsDir.isDirectory()){
    //firstindexlevel
    FileindexMappingsDir=newFile(mappingsDir,request.index());
    if(indexMappingsDir.isDirectory()){
    addMappings(mappings,indexMappingsDir);
    }
    //secondisthe_defaultmapping
    FiledefaultMappingsDir=newFile(mappingsDir,"_default");
    if(defaultMappingsDir.isDirectory()){
    addMappings(mappings,defaultMappingsDir);
    }
    }
                //處理index的配置(setting)
    ImmutableSettings.BuilderindexSettingsBuilder=settingsBuilder();
    //加入模板中的setting
    for(inti=templates.size()-1;i>=0;i--){
    indexSettingsBuilder.put(templates.get(i).settings());
    }
    //加入request中的mapping,request中設置會覆蓋模板中的設置
    indexSettingsBuilder.put(request.settings());
                //處理shard,shard數量不能小于1,因此這里需要特殊處理,如果沒有則要使用默認值
    if(request.index().equals(ScriptService.SCRIPT_INDEX)){
    indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS,settings.getAsInt(SETTING_NUMBER_OF_SHARDS,1));
    }else{
    if(indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS)==null){
    if(request.index().equals(riverIndexName)){
    indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS,settings.getAsInt(SETTING_NUMBER_OF_SHARDS,1));
    }else{
    indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS,settings.getAsInt(SETTING_NUMBER_OF_SHARDS,5));
    }
    }
    }
    if(request.index().equals(ScriptService.SCRIPT_INDEX)){
    indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS,settings.getAsInt(SETTING_NUMBER_OF_REPLICAS,0));
    indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS,"0-all");
    }
    else{
    if(indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS)==null){
    if(request.index().equals(riverIndexName)){
    indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS,settings.getAsInt(SETTING_NUMBER_OF_REPLICAS,1));
    }else{
    indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS,settings.getAsInt(SETTING_NUMBER_OF_REPLICAS,1));
    }
    }
    }
                //處理副本
    if(settings.get(SETTING_AUTO_EXPAND_REPLICAS)!=null&&indexSettingsBuilder.get(SETTING_AUTO_EXPAND_REPLICAS)==null){
    indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS,settings.get(SETTING_AUTO_EXPAND_REPLICAS));
    }
    if(indexSettingsBuilder.get(SETTING_VERSION_CREATED)==null){
    DiscoveryNodesnodes=currentState.nodes();
    finalVersioncreatedVersion=Version.smallest(version,nodes.smallestNonClientNodeVersion());
    indexSettingsBuilder.put(SETTING_VERSION_CREATED,createdVersion);
    }
    if(indexSettingsBuilder.get(SETTING_CREATION_DATE)==null){
    indexSettingsBuilder.put(SETTING_CREATION_DATE,System.currentTimeMillis());
    }
    indexSettingsBuilder.put(SETTING_UUID,Strings.randomBase64UUID());
                //創建setting
    SettingsactualIndexSettings=indexSettingsBuilder.build();
                //通過indiceservice創建索引
    indicesService.createIndex(request.index(),actualIndexSettings,clusterService.localNode().id());
    indexCreated=true;
    //如果創建成功這里就可以獲取到對應的indexservice,否則會拋出異常
    IndexServiceindexService=indicesService.indexServiceSafe(request.index());
                //獲取mappingService試圖放置mapping
    MapperServicemapperService=indexService.mapperService();
    //為索引添加mapping,首先是默認mapping
    if(mappings.containsKey(MapperService.DEFAULT_MAPPING)){
    try{
    mapperService.merge(MapperService.DEFAULT_MAPPING,newCompressedString(XContentFactory.jsonBuilder().map(mappings.get(MapperService.DEFAULT_MAPPING)).string()),false);
    }catch(Exceptione){
    removalReason="failedonparsingdefaultmappingonindexcreation";
    thrownewMapperParsingException("mapping["+MapperService.DEFAULT_MAPPING+"]",e);
    }
    }
    for(Map.Entry<String,Map<String,Object>>entry:mappings.entrySet()){
    if(entry.getKey().equals(MapperService.DEFAULT_MAPPING)){
    continue;
    }
    try{
    //applythedefaulthere,itsthefirsttimeweparseit
    mapperService.merge(entry.getKey(),newCompressedString(XContentFactory.jsonBuilder().map(entry.getValue()).string()),true);
    }catch(Exceptione){
    removalReason="failedonparsingmappingsonindexcreation";
    thrownewMapperParsingException("mapping["+entry.getKey()+"]",e);
    }
    }
                //添加request中的別稱
    IndexQueryParserServiceindexQueryParserService=indexService.queryParserService();
    for(Aliasalias:request.aliases()){
    if(Strings.hasLength(alias.filter())){
    aliasValidator.validateAliasFilter(alias.name(),alias.filter(),indexQueryParserService);
    }
    }
    for(AliasMetaDataaliasMetaData:templatesAliases.values()){
    if(aliasMetaData.filter()!=null){
    aliasValidator.validateAliasFilter(aliasMetaData.alias(),aliasMetaData.filter().uncompressed(),indexQueryParserService);
    }
    }
    //以下更新Index的matedata,
    Map<String,MappingMetaData>mappingsMetaData=Maps.newHashMap();
    for(DocumentMappermapper:mapperService.docMappers(true)){
    MappingMetaDatamappingMd=newMappingMetaData(mapper);
    mappingsMetaData.put(mapper.type(),mappingMd);
    }
    finalIndexMetaData.BuilderindexMetaDataBuilder=IndexMetaData.builder(request.index()).settings(actualIndexSettings);
    for(MappingMetaDatamappingMd:mappingsMetaData.values()){
    indexMetaDataBuilder.putMapping(mappingMd);
    }
    for(AliasMetaDataaliasMetaData:templatesAliases.values()){
    indexMetaDataBuilder.putAlias(aliasMetaData);
    }
    for(Aliasalias:request.aliases()){
    AliasMetaDataaliasMetaData=AliasMetaData.builder(alias.name()).filter(alias.filter())
    .indexRouting(alias.indexRouting()).searchRouting(alias.searchRouting()).build();
    indexMetaDataBuilder.putAlias(aliasMetaData);
    }
    for(Map.Entry<String,Custom>customEntry:customs.entrySet()){
    indexMetaDataBuilder.putCustom(customEntry.getKey(),customEntry.getValue());
    }
    indexMetaDataBuilder.state(request.state());
                //matedata更新完畢,build新的matedata
    finalIndexMetaDataindexMetaData;
    try{
    indexMetaData=indexMetaDataBuilder.build();
    }catch(Exceptione){
    removalReason="failedtobuildindexmetadata";
    throwe;
    }
    indexService.indicesLifecycle().beforeIndexAddedToCluster(newIndex(request.index()),
    indexMetaData.settings());
                //更新集群的matedata,將新build的indexmatadata加入到metadata中
    MetaDatanewMetaData=MetaData.builder(currentState.metaData())
    .put(indexMetaData,false)
    .build();
    logger.info("[{}]creatingindex,cause[{}],templates{},shards[{}]/[{}],mappings{}",request.index(),request.cause(),templateNames,indexMetaData.numberOfShards(),indexMetaData.numberOfReplicas(),mappings.keySet());
                //阻塞集群,更新matadata
    ClusterBlocks.Builderblocks=ClusterBlocks.builder().blocks(currentState.blocks());
    if(!request.blocks().isEmpty()){
    for(ClusterBlockblock:request.blocks()){
    blocks.addIndexBlock(request.index(),block);
    }
    }
    if(request.state()==State.CLOSE){
    blocks.addIndexBlock(request.index(),MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
    }
    ClusterStateupdatedState=ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build();
    if(request.state()==State.OPEN){
    RoutingTable.BuilderroutingTableBuilder=RoutingTable.builder(updatedState.routingTable())
    .addAsNew(updatedState.metaData().index(request.index()));
    RoutingAllocation.ResultroutingResult=allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTableBuilder).build());
    updatedState=ClusterState.builder(updatedState).routingResult(routingResult).build();
    }
    removalReason="cleaningupaftervalidatingindexonmaster";
    returnupdatedState;
    }finally{
    if(indexCreated){
    //Indexwasalreadypartiallycreated-needtocleanup
    indicesService.removeIndex(request.index(),removalReason!=null?removalReason:"failedtocreateindex");
    }
    }
    }
    });
    }

    以上就是創建index的create方法,方法中主要進行了兩個動作:合并更新index的matadata和創建index。更新合并matadata的過程都在上面的代碼中體現了。

    從indice中獲取對應的IndexService

    創建索引是調用indiceSerivice構建一個guice的injector,這個injector包含了Index的所有功能(如分詞,相似度等)。同時會將其存儲到indiceService中,以一個map的格式存儲Map<String, Tuple<IndexService, Injector>> indices。運行中的集群每次對某個索引的操作都首先從indice中獲取對應的IndexService。

    這一部分代碼如下所示:

    publicsynchronizedIndexServicecreateIndex(StringsIndexName,@IndexSettingsSettingssettings,StringlocalNodeId)throwsElasticsearchException{
    if(!lifecycle.started()){
    thrownewElasticsearchIllegalStateException("Can'tcreateanindex["+sIndexName+"],nodeisclosed");
    }
    Indexindex=newIndex(sIndexName);
        //檢測index是否已經存在
    if(indices.containsKey(index.name())){
    thrownewIndexAlreadyExistsException(index);
    }
    indicesLifecycle.beforeIndexCreated(index,settings);
    logger.debug("creatingIndex[{}],shards[{}]/[{}]",sIndexName,settings.get(SETTING_NUMBER_OF_SHARDS),settings.get(SETTING_NUMBER_OF_REPLICAS));
    SettingsindexSettings=settingsBuilder()
    .put(this.settings)
    .put(settings)
    .classLoader(settings.getClassLoader())
    .build();
        //構建index對應的injector
    ModulesBuildermodules=newModulesBuilder();
    modules.add(newIndexNameModule(index));
    modules.add(newLocalNodeIdModule(localNodeId));
    modules.add(newIndexSettingsModule(index,indexSettings));
    modules.add(newIndexPluginsModule(indexSettings,pluginsService));
    modules.add(newIndexStoreModule(indexSettings));
    modules.add(newIndexEngineModule(indexSettings));
    modules.add(newAnalysisModule(indexSettings,indicesAnalysisService));
    modules.add(newSimilarityModule(indexSettings));
    modules.add(newIndexCacheModule(indexSettings));
    modules.add(newIndexFieldDataModule(indexSettings));
    modules.add(newCodecModule(indexSettings));
    modules.add(newMapperServiceModule());
    modules.add(newIndexQueryParserModule(indexSettings));
    modules.add(newIndexAliasesServiceModule());
    modules.add(newIndexGatewayModule(indexSettings,injector.getInstance(Gateway.class)));
    modules.add(newIndexModule(indexSettings));
    InjectorindexInjector;
    try{
    indexInjector=modules.createChildInjector(injector);
    }catch(CreationExceptione){
    thrownewIndexCreationException(index,Injectors.getFirstErrorFailure(e));
    }catch(Throwablee){
    thrownewIndexCreationException(index,e);
    }
    IndexServiceindexService=indexInjector.getInstance(IndexService.class);
    indicesLifecycle.afterIndexCreated(indexService);
         //將Indexservice和IndexInjector加入到indicemap中
    indices=newMapBuilder(indices).put(index.name(),newTuple&lt;&gt;(indexService,indexInjector)).immutableMap();
    returnindexService;
    }

    以上方法就是具體創建索引的過程,它是在master上操作的,同時它是同步方法。這樣才能保證集群的Index創建一致性,因此這也會導致之前所說的大量創建創建索引時候的速度瓶頸。但是創建大量索引的動作是不常見的,需要盡量避免。創建一個索引對于一個集群來說就是開啟對于該索引的各種操作,因此這里通過guice將索引的各個功能模塊注入,并獲得index操作的接口類Indexservice。如果這個方法執行成功,則可以合并template及request中的mapping,并且向剛創建的索引添加合并后的mapping,最后構建新的matadata,并將集群新的matadata發送給各個節點完成索引創建。

    總結

    以上是生活随笔為你收集整理的elasticsearch索引创建create index集群matedata更新的方法的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。