当前位置: 首页 > news >正文

streamlit 实现 flink SQL运行界面

实现效果

在这里插入图片描述

streamlit

flink-playground.py 文件如下:

import streamlit as st
import io
import contextlib
import sys
import os
import uuid
import subprocess
from jinja2 import Templatest.set_page_config(layout="wide")# 设置页面标题
st.title("Flink SQL")# 初始化会话状态
if 'user_id' not in st.session_state:st.session_state.user_id = str(uuid.uuid4())# 创建一个输入框用于配置 JobManager 地址
st.session_state.jobmanager_address = st.text_input("JobManager 地址", value="10.50.108.7:48085")
# 创建一个文本框用于输入配置项
default_config = """
execution.checkpointing.interval=10s
execution.runtime-mode=batch
sql-client.execution.result-mode=table
sql-client.execution.max-table-result.rows=10000
pipeline.auto-watermark-interval=200
pipeline.max-parallelism=10
table.exec.state.ttl=1000
restart-strategy.type=fixed-delay
table.optimizer.join-reorder-enabled=true
table.exec.spill-compression.enabled=true
table.exec.spill-compression.block-size=128kb
""".strip()
st.session_state.config_input = st.text_area("输入配置项 (格式: key=value)", height=300, value=default_config)# 创建一个大的文本框用于输入代码
st.session_state.sql_input = st.text_area("输入你的 Flink SQL 代码", height=500)# 创建一个按钮
if st.button("执行 Flink Job"):try:# 读取模板文件with open("/work/template/pyflink-job.py.template", "r") as template_file:template_content = template_file.read()# 解析配置项config_dict = {}for line in st.session_state.config_input.splitlines():if '=' in line:key, value = line.split('=', 1)config_dict[key.strip()] = value.strip()# 使用 jinja2 模板引擎渲染模板template = Template(template_content)job_content = template.render(sqls=st.session_state.sql_input, config_items=config_dict)st.text("完整pyflink任务代码")st.code(job_content, language='python')# 将替换后的内容保存到临时文件file_name = f"flink_job_{st.session_state.user_id}.py"with open(file_name, "w") as job_file:job_file.write(job_content)# 使用 subprocess 执行 flink run 命令,并传递 JobManager 地址command = f"flink run -m {st.session_state.jobmanager_address} -py {file_name}"result = subprocess.run(command, shell=True, capture_output=True, text=True)# 获取捕获的输出captured_output = result.stdout# 显示输出结果st.text_area("执行结果", value=captured_output, height=200)except Exception as e:# 如果代码执行出错,打印错误信息st.error(f"代码执行出错: {e}")finally:# 删除临时文件if file_name and os.path.exists(file_name):os.remove(file_name)

运行:

nohup streamlit run /work/flink-playground.py --server.port 9999 2>&1  > .streamlit.log &

模板文件

模板文件根据用户输入动态更新任务配置和SQL

import re
from pyflink.table import EnvironmentSettings, TableEnvironmentdef remove_comments(sql):# 使用正则表达式删除单行注释和多行注释sql = re.sub(r'--.*$', '', sql, flags=re.MULTILINE)  # 删除单行注释sql = re.sub(r'/\*.*?\*/', '', sql, flags=re.DOTALL)  # 删除多行注释return sqldef execute_sql_file(table_env, sql_statements):sql_statements = sql_statements.split(';')for sql in sql_statements:# 删除注释sql = remove_comments(sql)sql = sql.strip()if sql:print(f"Executing SQL: {sql}")result = table_env.execute_sql(sql)# if result:#     result.print()def main():# 创建 TableEnvironmentenv_settings = EnvironmentSettings.new_instance().in_batch_mode().build()table_env = TableEnvironment.create(env_settings)table_config = table_env.get_config(){% for key, value in config_items.items() %}table_config.get_configuration().set_string("{{ key }}", "{{ value }}"){% endfor %}sqls = """{{ sqls }}"""# 读取 SQL 文件并执行execute_sql_file(table_env, sqls)if __name__ == "__main__":main()
http://www.lryc.cn/news/470204.html

相关文章:

  • 鲸鱼优化算法(Whale Optimization Algorithm, WOA)原理与MATLAB例程
  • MFC七段码显示实例
  • 【日常知识点】到底推不推荐用JWT?
  • 网络编程项目之FTP服务器
  • SpringBoot02:第一个springboot程序
  • 快速入门HTML
  • RabbitMQ是一个开源的消息代理和队列服务器
  • 经典算法思想--并查集
  • 挑战Java面试题复习第2天,百折不挠
  • 【vue之道】
  • 基于麻雀优化算法SSA的CEEMDAN-BiLSTM-Attention的预测模型
  • Linux:指令再认识
  • PHP如何抛出和接收错误
  • 计算机网络:网络层 —— IPv4 地址的应用规划
  • Mongodb命令大全
  • 宇视设备视频平台EasyCVR视频融合平台果园/鱼塘/养殖场/菜园有电没网视频监控方案
  • 面试题:ABCD四个线程,A线程最后执行
  • 代码随想录算法训练营第46期Day43
  • 前端处理API接口故障:多接口自动切换的实现方案
  • 多租户架构的全景分析(是什么?基本概念、实现策略、资源管理和隔离、数据安全与隔离、性能优化、扩展性与升级、案例研究)
  • Git使用问题汇总附带解决方法(持续更新)
  • Spring Boot驱动的植物健康监测革命
  • element 中 el-dialog 在不同的文件中使用
  • QT中采用QCustomPlot 实现将buffer中的数据绘制成折线图,并且图形随着数据更新而更新
  • 1688API商品详情接口如何获取
  • pytorch + d2l环境配置
  • Go使用exec.Command() 执行脚本时出现:file or directory not found
  • 细节性知识(宏定义解析与宏的外部引用)
  • 面试中的JVM:结合经典书籍的深度解读
  • 使用语音模块的开发智能家居产品(使用雷龙LSYT201B 语音模块)