当前位置: 首页 > news >正文

Spark Doris Connector 可以支持通过 Spark 读取 Doris 数据类型不兼容报错解决

1、版本介绍:

  • doris版本: 1.2.8
  • Spark Connector for Apache Doris 版本: spark-doris-connector-3.3_2.12-1.3.0.jar:1.3.0-SNAPSHOT
  • spark版本:spark-3.3.1

2、Spark Doris Connector

Spark Doris Connector - Apache Doris

目前最新发布版本: Release Apache Doris Spark Connector 1.3.0 Release · apache/doris-spark-connector · GitHub

2.1、Spark Doris Connector概述

Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。

代码库地址:GitHub - apache/doris-spark-connector: Spark Connector for Apache Doris

  • 支持从Doris中读取数据
  • 支持Spark DataFrame批量/流式 写入Doris
  • 可以将Doris表映射为DataFrame或者RDD,推荐使用DataFrame
  • 支持在Doris端完成数据过滤,减少数据传输量。

2.2、Doris 和 Spark 列类型映射关系​

Doris TypeSpark Type
NULL_TYPEDataTypes.NullType
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DATEDataTypes.DateType
DATETIMEDataTypes.StringType1
DECIMALDecimalType
CHARDataTypes.StringType
LARGEINTDecimalType
VARCHARDataTypes.StringType
TIMEDataTypes.DoubleType
HLLUnsupported datatype
BitmapUnsupported datatype
  • 注:Connector 中,将DATETIME映射为String。由于Doris底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 String 类型直接返回对应的时间可读文本。

3、doris所有所有类型测试与报错解决方案

3.1、doris建表并插入数据语句如下:

CREATE TABLE spark_connector_test_decimal_v1 (
c1 int NOT NULL, 
c2 VARCHAR(25) NOT NULL, 
c3 VARCHAR(152),
c4 boolean,
c5 tinyint,
c6 smallint,
c7 bigint,
c8 float,
c9 double,
c10 datev2,
c11 datetime,
c12 char,
c13 largeint,
c14 varchar,
c15 decimalv3(15, 5)
)
DUPLICATE KEY(c1)
COMMENT "OLAP"
DISTRIBUTED BY HASH(c1) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);insert  into spark_connector_test_decimal_v1 values(10000,'aaa','abc',true, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);
insert into spark_connector_test_decimal_v1 values(10000,'aaa','abc',true, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);
insert into spark_connector_test_decimal_v1 values(10001,'aaa','abc',false, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);
insert into spark_connector_test_decimal_v1 values(10002,'aaa','abc',True, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);
insert into spark_connector_test_decimal_v1 values(10003,'aaa','abc',False, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);

3.2、spark-sql中创建临时视图,读取数据

spark-sql 中建表:CREATE
TEMPORARY VIEW  spark_connector_test_decimal_v1
USING doris
OPTIONS("table.identifier"="ods.spark_connector_test_decimal_v1","fenodes"="172.xxx.99.199:8030","user"="syncxxx","password"="xxxxx"
);select * from spark_connector_test_decimal_v1;
3.2.1、报错如下:
spark-sql (default)> select * from spark_connector_test_decimal_v1;
17:42:13.979 [task-result-getter-3] ERROR org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 1.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7) (hadoop4 executor 1): org.apache.spark.util.TaskCompletionListenerException: nullPrevious exception in task: null
3.2.2、通过yarn页面追查错误log,具体报错如下:
Log Type: stdoutLog Upload Time: Wed Jan 10 14:54:47 +0800 2024Log Length: 200041Showing 4096 bytes of 200041 total. Click here for the full log..spark.exception.ConnectedFailedException: Connect to Doris BE{host='172.xxx.yyyy.10', port=9060}failed.at org.apache.doris.spark.backend.BackendClient.openScanner(BackendClient.java:153) ~[spark-doris-connector-3.3_2.12-1.3.0.jar:1.3.0-SNAPSHOT]at org.apache.doris.spark.rdd.ScalaValueReader.$anonfun$openResult$1(ScalaValueReader.scala:138) ~[spark-doris-connector-3.3_2.12-1.3.0.jar:1.3.0-SNAPSHOT]at org.apache.doris.spark.rdd.ScalaValueReader.org$apache$doris$spark$rdd$ScalaValueReader$$lockClient(ScalaValueReader.scala:239) ~[spark-doris-connector-3.3_2.12-1.3.0.jar:1.3.0-SNAPSHOT]at org.apache.doris.spark.rdd.ScalaValueReader.<init>(ScalaValueReader.scala:138) ~[spark-doris-connector-3.3_2.12-1.3.0.jar:1.3.0-SNAPSHOT]at org.apache.doris.spark.sql.ScalaDorisRowValueReader.<init>(ScalaDorisRowValueReader.scala:32) ~[spark-doris-connector-3.3_2.12-1.3.0.jar:1.3.0-SNAPSHOT]... 20 more
13:58:50.002 [Executor task launch worker for task 0.3 in stage 7.0 (TID 29)] ERROR org.apache.spark.executor.Executor - Exception in task 0.3 in stage 7.0 (TID 29)
org.apache.spark.util.TaskCompletionListenerException: nullPrevious exception in task: null

3.3、报错查找,定位为spark读取doris类型问题

直接查,报错,是spark-doris-connector连接器的类型转换问题:
https://github.com/apache/doris-spark-connector/issues/101
https://github.com/apache/doris-spark-connector/issues/101#issuecomment-1563765357

3.4、查看数据的该doris表底层字段列存储情况:

select * FROM information_schema.`columns` where TABLE_NAME='spark_connector_test_decimal_v1'

TABLE_CATALOG |TABLE_SCHEMA |TABLE_NAME                   |COLUMN_NAME |ORDINAL_POSITION |COLUMN_DEFAULT |IS_NULLABLE |DATA_TYPE       |CHARACTER_MAXIMUM_LENGTH |CHARACTER_OCTET_LENGTH |NUMERIC_PRECISION |NUMERIC_SCALE |DATETIME_PRECISION |CHARACTER_SET_NAME |COLLATION_NAME |COLUMN_TYPE      |COLUMN_KEY |EXTRA |PRIVILEGES |COLUMN_COMMENT |COLUMN_SIZE |DECIMAL_DIGITS |GENERATION_EXPRESSION |SRS_ID |
--------------|-------------|-----------------------------|------------|-----------------|---------------|------------|----------------|-------------------------|-----------------------|------------------|--------------|-------------------|-------------------|---------------|-----------------|-----------|------|-----------|---------------|------------|---------------|----------------------|-------|
internal      |ods       |spark_connector_test_decimal_v1 |c1          |1                |               |NO          |int             |                         |                       |10                |0             |                   |                   |               |int(11)          |DUP        |      |           |               |10          |0              |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c2          |2                |               |NO          |varchar         |25                       |100                    |                  |              |                   |                   |               |varchar(25)      |           |      |           |               |25          |               |                      |       |
internal      |ods       |spark_connector_test_decimal_v1  |c3          |3                |               |YES         |varchar         |152                      |608                    |                  |              |                   |                   |               |varchar(152)     |           |      |           |               |152         |               |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c4          |4                |               |YES         |tinyint         |                         |                       |                  |0             |                   |                   |               |tinyint(1)       |           |      |           |               |            |0              |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c5          |5                |               |YES         |tinyint         |                         |                       |3                 |0             |                   |                   |               |tinyint(4)       |           |      |           |               |3           |0              |                      |       |
internal      |ods       |spark_connector_test_decimal_v1  |c6          |6                |               |YES         |smallint        |                         |                       |5                 |0             |                   |                   |               |smallint(6)      |           |      |           |               |5           |0              |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c7          |7                |               |YES         |bigint          |                         |                       |19                |0             |                   |                   |               |bigint(20)       |           |      |           |               |19          |0              |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c8          |8                |               |YES         |float           |                         |                       |7                 |7             |                   |                   |               |float            |           |      |           |               |7           |7              |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c9          |9                |               |YES         |double          |                         |                       |15                |15            |                   |                   |               |double           |           |      |           |               |15          |15             |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c10         |10               |               |YES         |date            |                         |                       |                  |              |                   |                   |               |datev2           |           |      |           |               |            |               |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c11         |11               |               |YES         |datetime        |                         |                       |                  |              |                   |                   |               |datetime         |           |      |           |               |            |               |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c12         |12               |               |YES         |char            |1                        |4                      |                  |              |                   |                   |               |char(1)          |           |      |           |               |1           |               |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c13         |13               |               |YES         |bigint unsigned |                         |                       |39                |              |                   |                   |               |largeint         |           |      |           |               |39          |               |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c14         |14               |               |YES         |varchar         |1                        |4                      |                  |              |                   |                   |               |varchar(1)       |           |      |           |               |1           |               |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c15         |15               |               |YES         |decimal         |                         |                       |15                |5             |                   |                   |               |decimalv3(15, 5) |           |      |           |               |15          |5              |                      |       |

3.5、Dbeaver 修改doris表的字段类型:

修改doris表的字段类型:
在 1.2.0 版本之后, 开启 "light_schema_change"="true" 选项时,可以支持修改列名。alter table ods.spark_connector_test_decimal_v1 MODIFY COLUMN c15 DECIMAL(15,5);alter table ods.spark_connector_test_decimal_v1 MODIFY COLUMN c10 datetime;

在 1.2.0 版本之后, 开启 "light_schema_change"="true" 选项时,可以支持修改列名。

建表成功后,即可对列名进行修改,语法:

alter table ods.spark_connector_test_decimal_v1 RENAME COLUMN c10 c100; insert into spark_connector_test_decimal_v1 values(10003,'aaa','abc',False, 100, 3000, 100000, 1234.567, 12345.678, '2022-12-01','2022-12-01 12:00:00', 'a', 200000, 'g', 1000.12345);

3.6、修改字段类型的后doris表字的column_type取值

select * FROM information_schema.`columns` where TABLE_NAME='spark_connector_test_decimal_v1'TABLE_CATALOG |TABLE_SCHEMA |TABLE_NAME                      |COLUMN_NAME |ORDINAL_POSITION |COLUMN_DEFAULT |IS_NULLABLE |DATA_TYPE       |CHARACTER_MAXIMUM_LENGTH |CHARACTER_OCTET_LENGTH |NUMERIC_PRECISION |NUMERIC_SCALE |DATETIME_PRECISION |CHARACTER_SET_NAME |COLLATION_NAME |COLUMN_TYPE   |COLUMN_KEY |EXTRA |PRIVILEGES |COLUMN_COMMENT |COLUMN_SIZE |DECIMAL_DIGITS |GENERATION_EXPRESSION |SRS_ID |
--------------|-------------|--------------------------------|------------|-----------------|---------------|------------|----------------|-------------------------|-----------------------|------------------|--------------|-------------------|-------------------|---------------|--------------|-----------|------|-----------|---------------|------------|---------------|----------------------|-------|
internal      |ods       |spark_connector_test_decimal_v1 |c1          |1                |               |NO          |int             |                         |                       |10                |0             |                   |                   |               |int(11)       |DUP        |      |           |               |10          |0              |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c2          |2                |               |NO          |varchar         |25                       |100                    |                  |              |                   |                   |               |varchar(25)   |           |      |           |               |25          |               |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c3          |3                |               |YES         |varchar         |152                      |608                    |                  |              |                   |                   |               |varchar(152)  |           |      |           |               |152         |               |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c4          |4                |               |YES         |tinyint         |                         |                       |                  |0             |                   |                   |               |tinyint(1)    |           |      |           |               |            |0              |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c5          |5                |               |YES         |tinyint         |                         |                       |3                 |0             |                   |                   |               |tinyint(4)    |           |      |           |               |3           |0              |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c6          |6                |               |YES         |smallint        |                         |                       |5                 |0             |                   |                   |               |smallint(6)   |           |      |           |               |5           |0              |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c7          |7                |               |YES         |bigint          |                         |                       |19                |0             |                   |                   |               |bigint(20)    |           |      |           |               |19          |0              |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c8          |8                |               |YES         |float           |                         |                       |7                 |7             |                   |                   |               |float         |           |      |           |               |7           |7              |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c9          |9                |               |YES         |double          |                         |                       |15                |15            |                   |                   |               |double        |           |      |           |               |15          |15             |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c10         |10               |               |YES         |datetime        |                         |                       |                  |              |                   |                   |               |datetime      |           |      |           |               |            |               |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c11         |11               |               |YES         |datetime        |                         |                       |                  |              |                   |                   |               |datetime      |           |      |           |               |            |               |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c12         |12               |               |YES         |char            |1                        |4                      |                  |              |                   |                   |               |char(1)       |           |      |           |               |1           |               |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c13         |13               |               |YES         |bigint unsigned |                         |                       |39                |              |                   |                   |               |largeint      |           |      |           |               |39          |               |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c14         |14               |               |YES         |varchar         |1                        |4                      |                  |              |                   |                   |               |varchar(1)    |           |      |           |               |1           |               |                      |       |
internal      |ods       |spark_connector_test_decimal_v1 |c15         |15               |               |YES         |decimal         |                         |                       |15                |5             |                   |                   |               |decimal(15,5) |           |      |           |               |15          |5              |                      |       |

3.7、删除视图成功后,再次创建视图,即可查询成功

修改表类型之后,需要删除视图,重新建视图,否则直接查,会报如下错误:

Caused by: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of date

spark-sql (default)> drop view spark_connector_test_decimal_v1;
Response code
Time taken: 0.039 secondsspark-sql (default)> select * from spark_connector_test_decimal_v1;
c1    c2    c3    c4    c5    c6    c7    c8    c9    c10    c11    c12    c13    c14    c15
10000    aaa    abc    true    100    3000    100000    1234.567    12345.678    2022-12-01 00:00:00    2022-12-01 12:00:00    a    200000    g    1000.12345
10001    aaa    abc    false    100    3000    100000    1234.567    12345.678    2022-12-01 00:00:00    2022-12-01 12:00:00    a    200000    g    1000.12345
10002    aaa    abc    true    100    3000    100000    1234.567    12345.678    2022-12-01 00:00:00    2022-12-01 12:00:00    a    200000    g    1000.12345
10003    aaa    abc    false    100    3000    100000    1234.567    12345.678    2022-12-01 00:00:00    2022-12-01 12:00:00    a    200000    g    1000.12345
Time taken: 0.233 seconds, Fetched 4 row(s)
spark-sql (default)> 

参考:

Spark Doris Connector - Apache Doris

Release Apache Doris Spark Connector 1.3.0 Release · apache/doris-spark-connector · GitHub

[Bug] ConnectedFailedException: Connect to Doris BE{host='xxx', port=9060}failed · Issue #101 · apache/doris-spark-connector · GitHub

http://www.lryc.cn/news/281424.html

相关文章:

  • 深入理解 go chan
  • java+vue基于Spring Boot的渔船出海及海货统计系统
  • Linux第25步_在虚拟机中备份“ST官方的TF-A源码”
  • 统计学-R语言-4.1
  • C++(1) —— 基础语法入门
  • 构建基于RHEL8系列(CentOS8,AlmaLinux8,RockyLinux8等)的支持63个常见模块的PHP8.1.20的RPM包
  • Vue-插槽(Slots)
  • 新火种AI|GPT-5前瞻!GPT-5将具备哪些新能力?
  • 安防视频监控系统EasyCVR设备分组中在线/离线数量统计的开发与实现
  • spring cloud之集成sentinel
  • 让车辆做到“耳听八方”,毫米波雷达芯片与系统设计
  • Python如何实现数据驱动的接口自动化测试
  • 高级分布式系统-第15讲 分布式机器学习--联邦学习
  • 小程序基础学习(事件处理)
  • 网络协议与攻击模拟_01winshark工具简介
  • 【linux学习笔记】网络
  • JUC-线程中断机制和LockSupport
  • 哈希表与哈希算法(Python系列30)
  • 『 C++ 』AVL树详解 ( 万字 )
  • Python下载安装pip方法与步骤_pip国内镜像
  • 自动化测试框架pytest系列之基础概念介绍(一)
  • 编码技巧:如何在Golang中高效解析和生成XML
  • 24校招,帆书测试开发工程师一面
  • Java 方法以及在计算机内部的调用问题
  • 【算法与数据结构】343、LeetCode整数拆分
  • 中级Python面试问题
  • Lede(OpenWrt)安装和双宽带叠加
  • HTML+JS + layer.js +qrcode.min.js 实现二维码弹窗
  • leetcode 142 环形链表II
  • 电阻表示方法和电路应用