当前位置: 首页 > news >正文

ImportError: cannot import name ‘OldCsv‘ from ‘pyflink.table.descriptors‘

我最近开始使用flink用于数据处理。

当我尝试执行table api 用于计数时

我不能导入OldCsv and FileSystem from pyflink.table.descriptors.

I have also downloaded apache-flink using: pip install apache-flink

[root@master flink]# pip3 list | grep flink
apache-flink           1.17.0
apache-flink-libraries 1.17.0
flink                  1.0
pyflink                1.0

Libraries imported:


from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.descriptors import Schema, OldCsv , FileSystem
from pyflink.table.expressions import lit


    

1
maybe it works in newest version and you have to update module. OR maybe it was avaliable in old version and removed in new version. You links show message that it is old documentation. OR maybe it needs to install other module to get this function. – 
furas
 Nov 23, 2021 at 7:21 
here is link to newest documentation for version 1.15 and I can't find OldCsv in this version 

but I could find in documentation for 1.13 – 
furas
 Nov 23, 2021 at 7:26 
 
 pip3 install apache-flink==1.13
 
 [root@master ~]# pip3 list | grep flink
WARNING: Ignoring invalid distribution -andas (/usr/local/python38/lib/python3.8/site-packages)
apache-flink           1.13.0
apache-flink-libraries 1.13.0
flink                  1.0
pyflink                1.0
[root@master ~]# 


[root@master flink]# cat t3.py 
from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)

# write all the data to one file
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv().field('word', DataTypes.STRING())) \
    .with_schema(Schema().field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv().field_delimiter('\t') \
    .field('word', DataTypes.STRING()) \
    .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema() \
    .field('word', DataTypes.STRING()) \
    .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word).select(tab.word, lit(1).count).execute_insert('mySink').wait()

[root@master flink]# python3 t3.py 
[root@master flink]# cat /tmp/output 
flink    2
pyflink    1

http://www.lryc.cn/news/69211.html

相关文章:

  • YouCompleteMe(YCM)安装
  • day33_css
  • 10个最流行的向量数据库【AI】
  • vite3+vue3 项目打包优化二 —— 依赖分包策略
  • 中国社科院与美国杜兰大学金融管理硕士——与时间赛跑,充分利用每一分钟
  • 什么是Dirichlet分布?
  • web前端开发需要哪些技术?学前端顺序千万千万不要搞错啦!
  • 【AFNetWorking源码(二)AFURLSessionManger和AFHTTPSessionManager】
  • 编程不头秃,Google「AI程序员」来了,聊天就能敲代码
  • 【数据结构与算法】基础数据结构
  • k8s系列(四)——资源对象
  • JavaScript如何使用for循环
  • (浙大陈越版)数据结构 第三章 树(上) 3.1 树和树的表示
  • 平抑风电波动的电-氢混合储能容量优化配置(Matlab代码实现)
  • #机器学习--重新看待线性回归
  • 亚马逊,shopee,lazada卖家如何组建自己的测评团队
  • flink cdc 用mybatis-plus写到mysql5.6
  • 【C++】模板的一点简单介绍
  • SpringCloud概述
  • Metal入门学习:GPU并行计算大数组相加
  • 关于在spyder,jupyter notebook下创建虚拟环境(pytorch,tensorflow)均有效
  • oracle 闪回恢复
  • LeetCode 322 零钱兑换
  • 面试篇SpringMVC是什么以及工作原理
  • jQuery-层级选择器
  • 【Java数据结构】——第十节(下).选择排序与堆排序
  • 45道SQL题目陆续更新
  • 在线PS软件有哪些不错的推荐
  • Java实现天气预报功能
  • python循环语句