当前位置: 首页 > news >正文

StarRocks 中如何做到查询超时(QueryTimeout)

背景

本文基于 StarRocks 3.1.7
主要是分析以下两种超时设置的方式:

  • SESSION 级别
    SET query_timeout = 10;SELECT sleep(20);
  • SQL 级别
  select /*+ SET_VAR(query_timeout=10) */ sleep(20); 

通过本文的分析大致可以了解到在Starrocks的FE端是如何进行Command的交互以及数据流走向,其他的命令也是可以举一反三

分析

query_timeout 命令解析

和Spark以及hive等但是解析一样,StarRocks也是采用的Anltr4进行语法的解析,
对于StarRocks来说, 对应的语法解析文件为 StarRocks.g4文件,那么其set query_time在如下的位置

setStatement: SET setVar (',' setVar)*;setVar: (CHAR SET | CHARSET | CHARACTER SET) (identifierOrString | DEFAULT)                       #setNames| NAMES (charset = identifierOrString | DEFAULT)(COLLATE (collate = identifierOrString | DEFAULT))?                                     #setNames| PASSWORD '=' (string | PASSWORD '(' string ')')                                           #setPassword| PASSWORD FOR user '=' (string | PASSWORD '(' string ')')                                  #setPassword| userVariable '=' expression                                                               #setUserVar| varType? identifier '=' setExprOrDefault                                                  #setSystemVar| systemVariable '=' setExprOrDefault                                                       #setSystemVar| varType? TRANSACTION transaction_characteristics                                          #setTransaction;

继而可以找到对应的语法解析部分为 AstBuilder.java 中

 @Overridepublic ParseNode visitSetSystemVar(StarRocksParser.SetSystemVarContext context) {NodePosition pos = createPos(context);if (context.systemVariable() != null) {VariableExpr variableDesc = (VariableExpr) visit(context.systemVariable());Expr expr = (Expr) visit(context.setExprOrDefault());return new SystemVariable(variableDesc.getSetType(), variableDesc.getName(), expr, pos);} else {Expr expr = (Expr) visit(context.setExprOrDefault());String variable = ((Identifier) visit(context.identifier())).getValue();if (context.varType() != null) {return new SystemVariable(getVariableType(context.varType()), variable, expr, pos);} else {return new SystemVariable(SetType.SESSION, variable, expr, pos);}}}

从以上所示,SET query_timeout = 10; 就会在语法层面解析为 new SystemVariable(SetType.SESSION, variable, expr, pos)

数据流向

以上只是说到了 SET query_timeout = 10 只会被解析为SystemVariable对应的java数据结构,但是一条SQL从客户端发送过来,是怎么一个数据流呢?
我们大概的捋一下:

StarRocksFE中新建QeService对象||\/new NMysqlServer(port, scheduler, sslContext)||\/new AcceptListener(connectScheduler, sslContext)||\/AcceptListener.handleEvent||\/context.startAcceptQuery(processor)||\/NMysqlChannel.startAcceptQuery||\/conn.getSourceChannel().setReadListener(new ReadListener(nConnectContext, connectProcessor))||\/ReadListener.handleEvent||\/connectProcessor.processOnce()||\/connectProcessor.dispatch||\/connectProcessor.handleQuery||\/stmts = com.starrocks.sql.parser.SqlParser.parse(originStmt, ctx.getSessionVariable());||\/StmtExecutor.execute()||\/StatementPlanner.plan(parsedStmt, context)||\/StmtExecutor.handleSetStmt()||\/SetExecutor.execute // 会设置到变量的keyValue到`ConnectContext`的`SystemVariable`变量中,后续会或获取对应的值

query_timeout 怎么生效

还是定位到StarRocksFE.java中:

ExecuteEnv.setup();

该方法是整个执行环境的基础设置。其中会有ConnectScheduler的初始化:

public ConnectScheduler(int maxConnections) {this.maxConnections = new AtomicInteger(maxConnections);numberConnection = new AtomicInteger(0);nextConnectionId = new AtomicInteger(0);// Use a thread to check whether connection is timeout. Because// 1. If use a scheduler, the task maybe a huge number when query is messy.//    Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks in scheduler.// 2. Use a thread to poll maybe lose some accurate, but is enough to us.ScheduledExecutorService checkTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1,"Connect-Scheduler-Check-Timer", true);checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L, TimeUnit.MILLISECONDS);}

这里有个定时线程池去进行timeout的检查,间隔是一秒。具体的检查机制在TimeoutChecker类中:

private class TimeoutChecker extends TimerTask {@Overridepublic void run() {try {long now = System.currentTimeMillis();synchronized (ConnectScheduler.this) {//Because unregisterConnection will be callback in NMysqlChannel's close,//unregisterConnection will remove connectionMap (in the same thread)//This will result in a concurrentModifyException.//So here we copied the connectionIds to avoid removing iterator during operate iteratorArrayList<Long> connectionIds = new ArrayList<>(connectionMap.keySet());for (Long connectId : connectionIds) {ConnectContext connectContext = connectionMap.get(connectId);connectContext.checkTimeout(now);}}} catch (Throwable e) {//Catch Exception to avoid thread exitLOG.warn("Timeout checker exception, Internal error : " + e.getMessage());}}}

主要逻辑就是从connectionMap中获取对应的ConnectContext,从而调用ConnectContext.checkTimeout方法检查是否超时。
checkTimeout方法主要是通过sessionVariable.getQueryTimeoutS()获取设置的超时时间,如果超时,则调用StmtExecutor.cancel,继而调用Coordinator.cancel
所以现在就存在一个问题: 当前连接的ConnectContext什么时候被集成到 connectionMap中去的?
还是回到流程 AcceptListener.handleEvent 中去:

    connectScheduler.submit(context);...if (connectScheduler.registerConnection(context)) {MysqlProto.sendResponsePacket(context);connection.setCloseListener(streamConnection -> connectScheduler.unregisterConnection(context));} else {...

这里的submit 方法会生成context的conectionId.
registerConnection方法会把当前 ConnectionContext 的id和 ConnectionContext 组成KeyValue对并放置到connectionMap

至此 SET query_timeout = 10 整体的数据流就结束了,待在同一个连接中进行select 操作的时候,就会根据执行的长短进行超时处理了。

注意:
对于 select /*+ SET_VAR(query_timeout=10) */ sleep(20); 这种情况的解析,是通过 HintCollector来解析的。
词法解析是在StarRocksLex.g4 中,

OPTIMIZER_HINT: '/*+' .*? '*/' -> channel(2);

对于获取hint是通过HintCollectorextractHintToRight获取的:

 private void extractHintToRight(ParserRuleContext ctx) {Token semi = ctx.start;int i = semi.getTokenIndex();List<Token> hintTokens = tokenStream.getHiddenTokensToRight(i, HINT_CHANNEL);if (hintTokens != null) {contextWithTokenMap.computeIfAbsent(ctx, e -> new ArrayList<>()).addAll(hintTokens);}}

对应的解析在:SqlParser.parseWithStarRocksDialect 方法中:

  HintCollector collector = new HintCollector((CommonTokenStream) parser.getTokenStream());collector.collect(singleStatementContexts.get(idx));AstBuilder astBuilder = new AstBuilder(sessionVariable.getSqlMode(), collector.getContextWithHintMap());

AstBuilder 中会存储 hint到 hintMap 变量中,而在 visitQuerySpecification方法中

        selectList.setOptHints(extractVarHints(hintMap.get(context)));

从而在StmtExecutor.execute中会调用 optHints = selectRelation.getSelectList().getOptHints();获取对应的hint,

 if (isQuery &&((QueryStatement) parsedStmt).getQueryRelation() instanceof SelectRelation) {SelectRelation selectRelation = (SelectRelation) ((QueryStatement) parsedStmt).getQueryRelation();optHints = selectRelation.getSelectList().getOptHints();}if (optHints != null) {LOG.error("optHints: parsedStmt:" + parsedStmt.getOrigStmt() +"  "+ optHints.size());});SessionVariable sessionVariable = (SessionVariable) sessionVariableBackup.clone();for (String key : optHints.keySet()) {VariableMgr.setSystemVariable(sessionVariable,new SystemVariable(key, new StringLiteral(optHints.get(key))), true);}context.setSessionVariable(sessionVariable);

这样 hint相关的变量就设置到ConnectContextSessionVariable中了,后续的流程和之前的一致。

http://www.lryc.cn/news/451322.html

相关文章:

  • Windows 开发工具使用技巧 Visual Studio使用安装和使用技巧 Visual Studio 快捷键
  • 计算机网络-系分(5)
  • React Native使用高德地图
  • 排序算法的理解
  • Yocto - 使用Yocto开发嵌入式Linux系统_04 使用Toaster来创建一个image
  • 【C#生态园】后端服务与网络库:选择适合你游戏开发的利器
  • 计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-09-30
  • 【漏洞复现】JeecgBoot 积木报表 queryFieldBySql sql注入漏洞
  • Qt6 中相对于 Qt5 的新增特性及亮点
  • 超轻巧modbus调试助手使用说明
  • Percona Monitoring and Management
  • WarehouseController
  • 基于 STM32 单片机的温室物理无害生长系统
  • 新版pycharm如何导入自定义环境
  • 一文彻底搞懂多模态 - 多模态理解+视觉大模型+多模态检索
  • 提升效率的编程世界探索与体验
  • VMware tools菜单为灰色无法安装
  • 不相同的二叉搜索树
  • 毕业论文设计javaweb+VUE高校教师信息管理系统
  • L0-Python-关卡材料提交
  • 【unity进阶知识6】Resources的使用,如何封装一个Resources资源管理器
  • ThreadLocal内存泄漏分析
  • 第 30 章 XML
  • VMware下的ubuntu显示文字太小的自适应显示调整
  • 外贸网站怎么搭建对谷歌seo比较好?
  • 如何创建网络白名单
  • 前端动态创建svg不起效果?
  • 三、Drf request对象
  • CMIS5.2_光模块切应用(Application Selection and Instantiation)
  • 网络安全 DVWA通关指南 DVWA Weak Session IDs(弱会话)