flink-1.13.6 例子
--------------------------------------------------------------
flink版本:
flink-1.13.6
[root@master bin]# pip3 list | grep flink
WARNING: Ignoring invalid distribution -andas (/usr/local/python38/lib/python3.8/site-packages)
apache-flink 1.13.0
apache-flink-libraries 1.13.0
flink 1.0
pyflink 1.0
[root@master bin]# cat /root/flink/t3.py
from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit
settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)
# write all the data to one file
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv().field('word', DataTypes.STRING())) \
.with_schema(Schema().field('word', DataTypes.STRING())) \
.create_temporary_table('mySource')
t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv().field_delimiter('\t') \
.field('word', DataTypes.STRING()) \
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema() \
.field('word', DataTypes.STRING()) \
.field('count', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
tab = t_env.from_path('mySource')
tab.group_by(tab.word).select(tab.word, lit(1).count).execute_insert('mySink').wait()
[root@master bin]# ./flink run -m localhost:8081 -py /root/flink/t3.py
Job has been submitted with JobID f3f2b3d8b5c2e05cdbc31d7f5488aedf
[root@master bin]#
[root@master bin]# cat /tmp/output
flink 2
pyflink 1
lucy 1
jcw 1
m2m 2
[root@master bin]# cat /tmp/input
flink
pyflink
flink
lucy
jcw
m2m
m2m