当前位置: 首页 > news >正文

flink-1.13.6 例子

--------------------------------------------------------------
flink版本:
flink-1.13.6

[root@master bin]# pip3 list | grep flink
WARNING: Ignoring invalid distribution -andas (/usr/local/python38/lib/python3.8/site-packages)
apache-flink           1.13.0
apache-flink-libraries 1.13.0
flink                  1.0
pyflink                1.0


[root@master bin]# cat /root/flink/t3.py
from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)

# write all the data to one file
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv().field('word', DataTypes.STRING())) \
    .with_schema(Schema().field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv().field_delimiter('\t') \
    .field('word', DataTypes.STRING()) \
    .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema() \
    .field('word', DataTypes.STRING()) \
    .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word).select(tab.word, lit(1).count).execute_insert('mySink').wait()


[root@master bin]# ./flink run -m localhost:8081 -py /root/flink/t3.py 
Job has been submitted with JobID f3f2b3d8b5c2e05cdbc31d7f5488aedf
[root@master bin]# 


[root@master bin]# cat /tmp/output 
flink    2
pyflink    1
lucy    1
jcw    1
m2m    2
[root@master bin]# cat /tmp/input 
flink
pyflink
flink
lucy
jcw
m2m
m2m

http://www.lryc.cn/news/69273.html

相关文章:

  • Go语音基于zap的日志封装
  • 可持续能源技术具有改变世界的潜力,并且已经在多个方面展现出积极的影响。
  • Java常用工具之StringUtils类
  • MyBatis-plus的批量插入方式对比分析
  • 【系分论文】论软件开发模型及应用
  • 渗透测试--5.3.使用john破解密码
  • Go中的变量类型
  • 基于STM32的NRF24L01 2.4G通讯模块的驱动实验(HAL库)
  • DJ5-3 多路访问链路和协议
  • 技术领导力?
  • 计算机的基本工作原理
  • 【论文简述】Cross-Attentional Flow Transformer for Robust Optical Flow(CVPR 2022)
  • 【JAVA】Java中方法的使用,理解方法重载和递归
  • 高级网络计算模式复习
  • 【笔试强训选择题】Day15.习题(错题)解析
  • 图论专题(一)
  • 新星计划2023【网络应用领域基础】————————Day4
  • [CTF/网络安全] 攻防世界 view_source 解题详析
  • 目前流行的9大前端框架
  • 【mysql】explain执行计划之select_type列
  • 网易云音乐开发--音乐播放暂停切换上下首功能实现
  • 如何学习网络安全?
  • 软件测试适合女生吗?
  • 华为云——代码托管的使用
  • ChatGPT从⼊⻔到精通
  • node + alipay-sdk 沙箱环境简单测试电脑网站支付
  • 卷积神经网络详解
  • API架构的选择,RESTful、GraphQL还是gRPC
  • 人机融合智能的测量、计算与评价
  • 虹科新品 | 高可靠性、可适用于高磁/压的线性传感器!