日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

【原创】大数据基础之Hive(2)Hive SQL执行过程之SQL解析过程

發(fā)布時間:2025/5/22 数据库 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【原创】大数据基础之Hive(2)Hive SQL执行过程之SQL解析过程 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Hive SQL解析過程

SQL->AST(Abstract Syntax Tree)->Task(MapRedTask,FetchTask)->QueryPlan(Task集合)->Job(Yarn)

SQL解析會在兩個地方進(jìn)行:

  • 一個是SQL執(zhí)行前compile,具體在Driver.compile,為了創(chuàng)建QueryPlan;
  • 一個是explain,具體在ExplainSemanticAnalyzer.analyzeInternal,為了創(chuàng)建ExplainTask;

SQL執(zhí)行過程

1 compile過程(SQL->AST(Abstract Syntax Tree)->QueryPlan)

org.apache.hadoop.hive.ql.Driver

public int compile(String command, boolean resetTaskIds, boolean deferClose) { ...ParseDriver pd = new ParseDriver();ASTNode tree = pd.parse(command, ctx);tree = ParseUtils.findRootNonNullToken(tree); ...BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree); ...sem.analyze(tree, ctx); ...// Record any ACID compliant FileSinkOperators we saw so we can add our transaction ID to// them later.acidSinks = sem.getAcidFileSinks();LOG.info("Semantic Analysis Completed");// validate the plan sem.validate();acidInQuery = sem.hasAcidInQuery();perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);if (isInterrupted()) {return handleInterruption("after analyzing query.");}// get the output schemaschema = getSchema(sem, conf);plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,queryState.getHiveOperation(), schema); ...

compile過程為先由ParseDriver將SQL轉(zhuǎn)換為ASTNode,然后由BaseSemanticAnalyzer對ASTNode進(jìn)行分析,最后將BaseSemanticAnalyzer傳入QueryPlan構(gòu)造函數(shù)來創(chuàng)建QueryPlan;

1)將SQL轉(zhuǎn)換為ASTNode過程如下(SQL->AST(Abstract Syntax Tree))

org.apache.hadoop.hive.ql.parse.ParseDriver

public ASTNode parse(String command, Context ctx, boolean setTokenRewriteStream)throws ParseException {if (LOG.isDebugEnabled()) {LOG.debug("Parsing command: " + command);}HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command));TokenRewriteStream tokens = new TokenRewriteStream(lexer);if (ctx != null) {if ( setTokenRewriteStream) {ctx.setTokenRewriteStream(tokens);}lexer.setHiveConf(ctx.getConf());}HiveParser parser = new HiveParser(tokens);if (ctx != null) {parser.setHiveConf(ctx.getConf());}parser.setTreeAdaptor(adaptor);HiveParser.statement_return r = null;try {r = parser.statement();} catch (RecognitionException e) {e.printStackTrace();throw new ParseException(parser.errors);}if (lexer.getErrors().size() == 0 && parser.errors.size() == 0) {LOG.debug("Parse Completed");} else if (lexer.getErrors().size() != 0) {throw new ParseException(lexer.getErrors());} else {throw new ParseException(parser.errors);}ASTNode tree = (ASTNode) r.getTree();tree.setUnknownTokenBoundaries();return tree;}

2)analyze過程(AST(Abstract Syntax Tree)->Task)

org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer

public void analyze(ASTNode ast, Context ctx) throws SemanticException {initCtx(ctx);init(true);analyzeInternal(ast);}

其中analyzeInternal是抽象方法,由不同的子類實現(xiàn),比如DDLSemanticAnalyzer,SemanticAnalyzer,UpdateDeleteSemanticAnalyzer,ExplainSemanticAnalyzer等;
analyzeInternal主要的工作是將ASTNode轉(zhuǎn)化為Task,包括可能的optimize,過程比較復(fù)雜,這里不貼代碼;

3)創(chuàng)建QueryPlan過程如下(Task->QueryPlan)

org.apache.hadoop.hive.ql.QueryPlan

public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,HiveOperation operation, Schema resultSchema) {this.queryString = queryString;rootTasks = new ArrayList<Task<? extends Serializable>>(sem.getAllRootTasks());reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();fetchTask = sem.getFetchTask();// Note that inputs and outputs can be changed when the query gets executedinputs = sem.getAllInputs();outputs = sem.getAllOutputs();linfo = sem.getLineageInfo();tableAccessInfo = sem.getTableAccessInfo();columnAccessInfo = sem.getColumnAccessInfo();idToTableNameMap = new HashMap<String, String>(sem.getIdToTableNameMap());this.queryId = queryId == null ? makeQueryId() : queryId;query = new org.apache.hadoop.hive.ql.plan.api.Query();query.setQueryId(this.queryId);query.putToQueryAttributes("queryString", this.queryString);queryProperties = sem.getQueryProperties();queryStartTime = startTime;this.operation = operation;this.autoCommitValue = sem.getAutoCommitValue();this.resultSchema = resultSchema;}

可見只是簡單的將BaseSemanticAnalyzer中的內(nèi)容拷貝出來,其中最重要的是sem.getAllRootTasks和sem.getFetchTask;

2 execute過程(QueryPlan->Job)

org.apache.hadoop.hive.ql.Driver

public int execute(boolean deferClose) throws CommandNeedRetryException { ...// Add root Tasks to runnablefor (Task<? extends Serializable> tsk : plan.getRootTasks()) {// This should never happen, if it does, it's a bug with the potential to produce// incorrect results.assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();driverCxt.addToRunnable(tsk);} ...// Loop while you either have tasks running, or tasks queued upwhile (driverCxt.isRunning()) {// Launch upto maxthreads tasksTask<? extends Serializable> task;while ((task = driverCxt.getRunnable(maxthreads)) != null) {TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);if (!runner.isRunning()) {break;}} ...private TaskRunner launchTask(Task<? extends Serializable> tsk, String queryId, boolean noName,String jobname, int jobs, DriverContext cxt) throws HiveException { ...TaskRunner tskRun = new TaskRunner(tsk, tskRes); ...tskRun.start(); ...tskRun.runSequential(); ...

Driver.run中從QueryPlan中取出Task,并逐個launchTask,launchTask過程為將Task包裝為TaskRunner,并最終調(diào)用TaskRunner.runSequential,下面看TaskRunner:

org.apache.hadoop.hive.ql.exec.TaskRunner

public void runSequential() {int exitVal = -101;try {exitVal = tsk.executeTask(); ...

這里直接調(diào)用Task.executeTask

org.apache.hadoop.hive.ql.exec.Task

public int executeTask() { ...int retval = execute(driverContext); ...

這里execute是抽象方法,由子類實現(xiàn),比如DDLTask,MapRedTask等,著重看MapRedTask,因為大部分的Task都是MapRedTask:

org.apache.hadoop.hive.ql.exec.mr.MapRedTask

public int execute(DriverContext driverContext) { ...if (!runningViaChild) {// we are not running this mapred task via child jvm// so directly invoke ExecDriverreturn super.execute(driverContext);} ...

這里直接調(diào)用父類方法,也就是ExecDriver.execute,下面看:

org.apache.hadoop.hive.ql.exec.mr.ExecDriver

protected transient JobConf job; ...public int execute(DriverContext driverContext) { ...JobClient jc = null;MapWork mWork = work.getMapWork();ReduceWork rWork = work.getReduceWork(); ...if (mWork.getNumMapTasks() != null) {job.setNumMapTasks(mWork.getNumMapTasks().intValue());} ...job.setNumReduceTasks(rWork != null ? rWork.getNumReduceTasks().intValue() : 0);job.setReducerClass(ExecReducer.class); ...jc = new JobClient(job); ...rj = jc.submitJob(job);this.jobID = rj.getJobID(); ...

這里將Task轉(zhuǎn)化為Job提交到Y(jié)arn執(zhí)行;

SQL Explain過程

另外一個SQL解析的過程是explain,在ExplainSemanticAnalyzer中將ASTNode轉(zhuǎn)化為ExplainTask:

org.apache.hadoop.hive.ql.parse.ExplainSemanticAnalyzer

public void analyzeInternal(ASTNode ast) throws SemanticException { ...ctx.setExplain(true);ctx.setExplainLogical(logical);// Create a semantic analyzer for the queryASTNode input = (ASTNode) ast.getChild(0);BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, input);sem.analyze(input, ctx);sem.validate();ctx.setResFile(ctx.getLocalTmpPath());List<Task<? extends Serializable>> tasks = sem.getAllRootTasks();if (tasks == null) {tasks = Collections.emptyList();}FetchTask fetchTask = sem.getFetchTask();if (fetchTask != null) {// Initialize fetch work such that operator tree will be constructed. fetchTask.getWork().initializeForFetch(ctx.getOpContext());}ParseContext pCtx = null;if (sem instanceof SemanticAnalyzer) {pCtx = ((SemanticAnalyzer)sem).getParseContext();}boolean userLevelExplain = !extended&& !formatted&& !dependency&& !logical&& !authorize&& (HiveConf.getBoolVar(ctx.getConf(), HiveConf.ConfVars.HIVE_EXPLAIN_USER) && HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"));ExplainWork work = new ExplainWork(ctx.getResFile(),pCtx,tasks,fetchTask,sem,extended,formatted,dependency,logical,authorize,userLevelExplain,ctx.getCboInfo());work.setAppendTaskType(HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEEXPLAINDEPENDENCYAPPENDTASKTYPES));ExplainTask explTask = (ExplainTask) TaskFactory.get(work, conf);fieldList = explTask.getResultSchema();rootTasks.add(explTask);}

?

轉(zhuǎn)載于:https://www.cnblogs.com/barneywill/p/10186644.html

總結(jié)

以上是生活随笔為你收集整理的【原创】大数据基础之Hive(2)Hive SQL执行过程之SQL解析过程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。