日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

elasticSearch6源码分析(2)模块化管理

發布時間:2025/4/5 编程问答 16 豆豆
生活随笔 收集整理的這篇文章主要介紹了 elasticSearch6源码分析(2)模块化管理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

elasticsearch里面的組件基本都是用Guice的Injector進行注入與獲取實例方式進行模塊化管理。

在node的構造方法中

/*** Constructs a node** @param environment the environment for this node* @param classpathPlugins the plugins to be loaded from the classpath* @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the* test framework for tests that rely on being able to set private settings*/protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {logger = LogManager.getLogger(Node.class);final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an errorboolean success = false;try {originalSettings = environment.settings();Settings tmpSettings = Settings.builder().put(environment.settings()).put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();nodeEnvironment = new NodeEnvironment(tmpSettings, environment);resourcesToClose.add(nodeEnvironment);logger.info("node name [{}], node ID [{}]",NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId());final JvmInfo jvmInfo = JvmInfo.jvmInfo();logger.info("version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot()),jvmInfo.pid(),Build.CURRENT.flavor().displayName(),Build.CURRENT.type().displayName(),Build.CURRENT.shortHash(),Build.CURRENT.date(),Constants.OS_NAME,Constants.OS_VERSION,Constants.OS_ARCH,Constants.JVM_VENDOR,Constants.JVM_NAME,Constants.JAVA_VERSION,Constants.JVM_VERSION);logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));warnIfPreRelease(Version.CURRENT, Build.CURRENT.isSnapshot(), logger);if (logger.isDebugEnabled()) {logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]",environment.configFile(), Arrays.toString(environment.dataFiles()), environment.logsFile(), environment.pluginsFile());}this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins);this.settings = pluginsService.updatedSettings();localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());// create the environment based on the finalized (processed) view of the settings// this is just to makes sure that people get the same settings, no matter where they ask them fromthis.environment = new Environment(this.settings, environment.configFile());Environment.assertEquivalent(environment, this.environment);final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));// adds the context to the DeprecationLogger so that it does not need to be injected everywhere DeprecationLogger.setThreadContext(threadPool.getThreadContext());resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());for (final ExecutorBuilder<?> builder : threadPool.builders()) {additionalSettings.addAll(builder.getRegisteredSettings());}client = new NodeClient(settings, threadPool);final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));// this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool// so we might be late here alreadyfinal Set<SettingUpgrader<?>> settingsUpgraders = pluginsService.filterPlugins(Plugin.class).stream().map(Plugin::getSettingUpgraders).flatMap(List::stream).collect(Collectors.toSet());final SettingsModule settingsModule =new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter, settingsUpgraders);scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());resourcesToClose.add(resourceWatcherService);final NetworkService networkService = new NetworkService(getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);clusterService.addStateApplier(scriptModule.getScriptService());resourcesToClose.add(clusterService);final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state,clusterService.getClusterSettings(), client);final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,listener::onNewInfo);final UsageService usageService = new UsageService(settings);ModulesBuilder modules = new ModulesBuilder();// plugin modules must be added here, before others or we can get crazy injection errors...for (Module pluginModule : pluginsService.createGuiceModules()) {modules.add(pluginModule);}final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);modules.add(clusterModule);IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));modules.add(indicesModule);SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),settingsModule.getClusterSettings());resourcesToClose.add(circuitBreakerService);modules.add(new GatewayModule());PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);resourcesToClose.add(bigArrays);modules.add(settingsModule);List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(NetworkModule.getNamedWriteables().stream(),indicesModule.getNamedWriteables().stream(),searchModule.getNamedWriteables().stream(),pluginsService.filterPlugins(Plugin.class).stream().flatMap(p -> p.getNamedWriteables().stream()),ClusterModule.getNamedWriteables().stream()).flatMap(Function.identity()).collect(Collectors.toList());final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(NetworkModule.getNamedXContents().stream(),indicesModule.getNamedXContents().stream(),searchModule.getNamedXContents().stream(),pluginsService.filterPlugins(Plugin.class).stream().flatMap(p -> p.getNamedXContent().stream()),ClusterModule.getNamedXWriteables().stream()).flatMap(Function.identity()).collect(toList()));modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);// collect engine factory providers from server and from pluginsfinal Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders =Stream.concat(indicesModule.getEngineFactories().stream(),enginePlugins.stream().map(plugin -> plugin::getEngineFactory)).collect(Collectors.toList());final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories =pluginsService.filterPlugins(IndexStorePlugin.class).stream().map(IndexStorePlugin::getIndexStoreFactories).flatMap(m -> m.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));final IndicesService indicesService =new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays,scriptModule.getScriptService(), client, metaStateService, engineFactoryProviders, indexStoreFactories);final AliasValidator aliasValidator = new AliasValidator(settings);final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(settings,clusterService,indicesService,clusterModule.getAllocationService(),aliasValidator,environment,settingsModule.getIndexScopedSettings(),threadPool,xContentRegistry,forbidPrivateIndexSettings);Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream().flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,namedWriteableRegistry).stream()).collect(Collectors.toList());ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);modules.add(actionModule);final RestController restController = actionModule.getRestController();final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,networkService, restController);Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders =pluginsService.filterPlugins(Plugin.class).stream().map(Plugin::getCustomMetaDataUpgrader).collect(Collectors.toList());Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders =pluginsService.filterPlugins(Plugin.class).stream().map(Plugin::getIndexTemplateMetaDataUpgrader).collect(Collectors.toList());Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream().map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList());final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders);final MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry,indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders);final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, nodeEnvironment, metaStateService,metaDataIndexUpgradeService, metaDataUpgrader);new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders);final Transport transport = networkModule.getTransportSupplier().get();Set<String> taskHeaders = Stream.concat(pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),Stream.of(Task.X_OPAQUE_ID)).collect(Collectors.toSet());final TransportService transportService = newTransportService(settings, transport, threadPool,networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService);final SearchTransportService searchTransportService = new SearchTransportService(settings, transportService,SearchExecutionStatsCollector.makeWrapper(responseCollectorService));final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService, namedWriteableRegistry,networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),clusterModule.getAllocationService(), environment.configFile());this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,searchTransportService);final SearchService searchService = newSearchService(clusterService, indicesService,threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),responseCollectorService);final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class).stream().map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client)).flatMap(List::stream).collect(toList());final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors);final PersistentTasksClusterService persistentTasksClusterService =new PersistentTasksClusterService(settings, registry, clusterService);final PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client);modules.add(b -> {b.bind(Node.class).toInstance(this);b.bind(NodeService.class).toInstance(nodeService);b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);b.bind(PluginsService.class).toInstance(pluginsService);b.bind(Client.class).toInstance(client);b.bind(NodeClient.class).toInstance(client);b.bind(Environment.class).toInstance(this.environment);b.bind(ThreadPool.class).toInstance(threadPool);b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);b.bind(BigArrays.class).toInstance(bigArrays);b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());b.bind(IngestService.class).toInstance(ingestService);b.bind(UsageService.class).toInstance(usageService);b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);b.bind(MetaStateService.class).toInstance(metaStateService);b.bind(IndicesService.class).toInstance(indicesService);b.bind(AliasValidator.class).toInstance(aliasValidator);b.bind(MetaDataCreateIndexService.class).toInstance(metaDataCreateIndexService);b.bind(SearchService.class).toInstance(searchService);b.bind(SearchTransportService.class).toInstance(searchTransportService);b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings,searchService::createReduceContext));b.bind(Transport.class).toInstance(transport);b.bind(TransportService.class).toInstance(transportService);b.bind(NetworkService.class).toInstance(networkService);b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService);b.bind(ClusterInfoService.class).toInstance(clusterInfoService);b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());{RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(settings, transportService,indicesService, recoverySettings));b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(settings, threadPool,transportService, recoverySettings, clusterService));}b.bind(HttpServerTransport.class).toInstance(httpServerTransport);pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));b.bind(PersistentTasksService.class).toInstance(persistentTasksService);b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);});injector = modules.createInjector();// TODO hack around circular dependencies problems in AllocationServiceclusterModule.getAllocationService().setGatewayAllocator(injector.getInstance(GatewayAllocator.class));List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream().filter(p -> p instanceof LifecycleComponent).map(p -> (LifecycleComponent) p).collect(Collectors.toList());pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream().map(injector::getInstance).collect(Collectors.toList()));resourcesToClose.addAll(pluginLifecycleComponents);this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);client.initialize(injector.getInstance(new Key<Map<Action, TransportAction>>() {}),() -> clusterService.localNode().getId(), transportService.getRemoteClusterService());logger.debug("initializing HTTP handlers ...");actionModule.initRestHandlers(() -> clusterService.state().nodes());logger.info("initialized");success = true;} catch (IOException ex) {throw new ElasticsearchException("failed to bind service", ex);} finally {if (!success) {IOUtils.closeWhileHandlingException(resourcesToClose);}}}

涉及的主要模塊

?

上圖的文本如下;

ClusterModule
GatewayAllocator
AllocationService
ClusterService
NodeConnectionsService
MetaDataDeleteIndexService
MetaDataIndexStateService
MetaDataMappingService
MetaDataIndexAliasesService
MetaDataUpdateSettingsService
MetaDataIndexTemplateService
IndexNameExpressionResolver
RoutingService
DelayedAllocationService
ShardStateAction
NodeMappingRefreshAction
MappingUpdatedAction
TaskResultsService
AllocationDeciders
ShardsAllocator

IndicesModule
IndicesStore
IndicesClusterStateService
SyncedFlushService
TransportNodesListShardStoreMetaData
GlobalCheckpointSyncAction
TransportResyncReplicationAction
PrimaryReplicaSyncer

其他
Node
NodeService
NamedXContentRegistry
PluginsService
Client
NodeClient
Environment
ThreadPool
NodeEnvironment
ResourceWatcherService
CircuitBreakerService
BigArrays
ScriptService
AnalysisRegistry
IngestService
UsageService
NamedWriteableRegistry
MetaDataUpgrader
MetaStateService
IndicesService
AliasValidator
MetaDataCreateIndexService
SearchService
SearchTransportService
SearchPhaseController
Transport
TransportService
NetworkService
UpdateHelper
MetaDataIndexUpgradeService
ClusterInfoService
GatewayMetaState
Discovery
PeerRecoverySourceService
PeerRecoveryTargetService
HttpServerTransport
PersistentTasksService
PersistentTasksClusterService
PersistentTasksExecutorRegistry

pluginModule

GatewayModule
DanglingIndicesState
GatewayService
TransportNodesListGatewayMetaState
TransportNodesListGatewayStartedShards
LocalAllocateDangledIndices

SettingsModule
Settings
SettingsFilter
ClusterSettings
IndexScopedSettings

ActionModule
ActionFilters
DestructiveOperations
AutoCreateIndex
TransportLivenessAction
TransportAction
supportAction

RepositoriesModule
RepositoriesService
SnapshotsService
SnapshotShardsService
TransportNodesSnapshotsStatus
RestoreService

?

轉載于:https://www.cnblogs.com/davidwang456/p/10039399.html

總結

以上是生活随笔為你收集整理的elasticSearch6源码分析(2)模块化管理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。