TDengine STMT2 API 使用指南
TDengine STMT2 API 使用指南
概述
TDengine STMT2 API 是 TDengine 3.x 版本引入的快速写入接口,它是对原有 STMT API 的升级和优化。STMT2 提供了更高效的批量数据写入能力,支持预编译 SQL 语句,可以有效防止 SQL 注入攻击。
主要特性
- 高性能:预编译 SQL 语句,无需重复解析
- 批量写入:支持多表批量数据写入,无单次写入条数限制
- 安全性:参数绑定方式防止 SQL 注入攻击
- 灵活性:支持同步和异步执行模式
- 兼容性:兼容原有 STMT API 的使用方式
快速开始
基本使用流程
调用顺序图解:
// 1. 初始化 STMT2 实例
TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL};
TAOS_STMT2 *stmt = taos_stmt2_init(taos, &option);// 2. 准备 SQL 语句
taos_stmt2_prepare(stmt, "INSERT INTO ? USING stb TAGS(?,?) VALUES(?,?)", 0);// 3. 绑定参数
TAOS_STMT2_BINDV bindv = {count, tbnames, tags, bind_cols};
taos_stmt2_bind_param(stmt, &bindv, -1);// 4. 执行语句
int affected_rows;
taos_stmt2_exec(stmt, &affected_rows);// 5. 释放资源
taos_stmt2_close(stmt);
适用场景
- 多表批量写入:需要同时向多个表写入数据
- 高频写入:需要频繁执行相同结构的 SQL 语句
- 数据安全:对 SQL 注入防护有要求的场景
- 性能敏感:对写入性能有较高要求的应用
性能优化指南
写入模式选择
STMT2 提供了两种写入模式,适用于不同的数据特征:
高效写入模式(列转行绑定)
- 适用场景:多表少行写入
- 特点:在客户端进行列转行绑定
- 推荐条件:表数量 > 行数量
非高效写入模式(列模式绑定)
- 适用场景:少表多行写入
- 特点:采用列模式绑定
- 推荐条件:行数量 > 表数量
性能对比数据
场景配置 | 列存模式 | 行存模式 | 性能比例 |
---|---|---|---|
10000表×100行×100列 | bind:44.76s, exec:52.30s, total:97.08s | bind:105.10s, exec:37.07s, total:142.20s | 1:1.46 |
10000表×10行×1000列 | bind:55.17s, exec:105.55s, total:160.73s | bind:105.62s, exec:54.38s, total:160.01s | 1:1.00 |
10000表×1行×1000列 | bind:22.78s, exec:57.02s, total:79.80s | bind:15.74s, exec:2.55s, total:18.29s | 1:0.23 |
10000表×1000行×1列 | bind:21.69s, exec:42.36s, total:64.73s | bind:65.16s, exec:51.64s, total:117.19s | 1:1.81 |
绑定模式选择
行绑定(col_idx = -2)
- 适用场景:多列少行的数据写入
- 推荐条件:列数 > 行数 × 10
- 限制:只支持非高效写入模式
列绑定(col_idx = -1)
- 适用场景:少列多行的数据写入
- 特点:默认绑定模式
- 灵活性:支持分列绑定
注意:同一个 stmt 实例的行绑定和列绑定不能交叉混用。
数据结构定义
TAOS_STMT2_OPTION
初始化配置结构体,用于创建 STMT2 实例时指定各种选项。
typedef struct TAOS_STMT2_OPTION {int64_t reqid; // 请求IDbool singleStbInsert; // 单超级表插入模式bool singleTableBindOnce; // 单表单次绑定模式__taos_async_fn_t asyncExecFn; // 异步执行回调函数void *userdata; // 用户数据指针
} TAOS_STMT2_OPTION;
参数说明:
reqid
:请求ID,用于标识和跟踪特定的请求singleStbInsert
:是否向单个超级表插入数据singleTableBindOnce
:每个表执行前是否只绑定一次asyncExecFn
:异步执行回调函数,异步模式下必须设置userdata
:传递给回调函数的用户数据
异步回调函数定义:
typedef void (*__taos_async_fn_t)(void *userdata, TAOS_RES *res, int code);
userdata
:用户传入的数据指针res
:执行结果集,不可使用taos_free_result
释放code
:执行返回码,0 表示成功
TAOS_STMT2_BIND
数据绑定结构体,相比 STMT 版本优化了内存布局。
// STMT2 绑定参数(推荐使用)
typedef struct TAOS_STMT2_BIND {int buffer_type; // 数据类型void *buffer; // 数据缓冲区int32_t *length; // 数据长度数组char *is_null; // 空值标识数组int num; // 绑定行数
} TAOS_STMT2_BIND;// STMT 绑定参数(兼容性)
typedef struct TAOS_MULTI_BIND {int buffer_type; // 数据类型void *buffer; // 数据缓冲区uintptr_t buffer_length; // 缓冲区长度int32_t *length; // 数据长度数组char *is_null; // 空值标识数组int num; // 绑定行数
} TAOS_MULTI_BIND;
参数说明:
buffer_type
:数据类型,取值范围 0-21,详见数据类型定义buffer
:数据缓冲区指针length
:每行数据长度数组,长度为num
is_null
:空值标识数组,0=有值,1=NULL,2=NONEnum
:绑定的行数
内存分配差异:
- STMT:需要分配
num * buffer_length
大小的内存 - STMT2:变长数据只需分配实际长度,即
length[0] + length[1] + ... + length[num-1]
支持的数据类型
#define TSDB_DATA_TYPE_NULL 0 // 空值
#define TSDB_DATA_TYPE_BOOL 1 // 布尔型 (1 byte)
#define TSDB_DATA_TYPE_TINYINT 2 // 微整型 (1 byte)
#define TSDB_DATA_TYPE_SMALLINT 3 // 短整型 (2 bytes)
#define TSDB_DATA_TYPE_INT 4 // 整型 (4 bytes)
#define TSDB_DATA_TYPE_BIGINT 5 // 长整型 (8 bytes)
#define TSDB_DATA_TYPE_FLOAT 6 // 单精度浮点 (4 bytes)
#define TSDB_DATA_TYPE_DOUBLE 7 // 双精度浮点 (8 bytes)
#define TSDB_DATA_TYPE_VARCHAR 8 // 变长字符串
#define TSDB_DATA_TYPE_TIMESTAMP 9 // 时间戳 (8 bytes)
#define TSDB_DATA_TYPE_NCHAR 10 // 宽字符串
#define TSDB_DATA_TYPE_UTINYINT 11 // 无符号微整型 (1 byte)
#define TSDB_DATA_TYPE_USMALLINT 12 // 无符号短整型 (2 bytes)
#define TSDB_DATA_TYPE_UINT 13 // 无符号整型 (4 bytes)
#define TSDB_DATA_TYPE_UBIGINT 14 // 无符号长整型 (8 bytes)
#define TSDB_DATA_TYPE_JSON 15 // JSON 数据
#define TSDB_DATA_TYPE_VARBINARY 16 // 二进制数据
#define TSDB_DATA_TYPE_DECIMAL 17 // 精确数值
#define TSDB_DATA_TYPE_BLOB 18 // 二进制大对象
#define TSDB_DATA_TYPE_MEDIUMBLOB 19 // 中等二进制大对象
#define TSDB_DATA_TYPE_GEOMETRY 20 // 几何数据 (WKB格式)
数据类型分类:
- 定长类型:除变长类型外的所有类型
- 变长类型:
VARCHAR
,VARBINARY
,NCHAR
,JSON
,GEOMETRY
TAOS_STMT2_BINDV
批量绑定结构体,支持多表多行数据批量绑定。
typedef struct TAOS_STMT2_BINDV {int count; // 表数量char **tbnames; // 表名数组TAOS_STMT2_BIND **tags; // 标签数据二维数组TAOS_STMT2_BIND **bind_cols; // 列数据二维数组
} TAOS_STMT2_BINDV;
参数说明:
count
:要绑定的表数量tbnames
:表名字符串数组,长度为count
tags
:标签数据二维数组[count][tag_count]
bind_cols
:列数据二维数组[count][col_count]
注意事项:
- 非 ASCII 字符表名需要用反引号包围
tags[*]->num
必须为 1,因为每个表只能绑定一行标签bind_cols
的行数可以大于 1
TAOS_FIELD_ALL
字段信息结构体,用于获取 SQL 中占位符的详细信息。
typedef struct TAOS_FIELD_ALL {char name[65]; // 字段名int8_t type; // 数据类型uint8_t precision; // 精度uint8_t scale; // 标度int32_t bytes; // 字节数uint8_t field_type; // 字段类型
} TAOS_FIELD_ALL;
参数说明:
name
:字段名称type
:数据类型,对应TSDB_DATA_TYPE_*
precision
:时间戳精度(ms/us/ns)scale
:保留字段,暂未使用bytes
:数据类型占用字节数field_type
:字段类型,1=列,2=标签,3=谓词,4=表名
API 函数参考
基础函数
taos_stmt2_init
TAOS_STMT2 *taos_stmt2_init(TAOS *taos, TAOS_STMT2_OPTION *option);
功能说明:
初始化 STMT2 实例,创建参数化查询的执行环境。
参数:
taos
:数据库连接句柄,由taos_connect
返回option
:初始化选项,可以为 NULL 使用默认配置
返回值:
- 成功:返回 STMT2 实例指针
- 失败:返回 NULL
示例:
TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL};
TAOS_STMT2 *stmt = taos_stmt2_init(taos, &option);
taos_stmt2_prepare
int taos_stmt2_prepare(TAOS_STMT2 *stmt, const char *sql, unsigned long length);
功能说明:
准备 SQL 语句,解析占位符并建立执行计划。
参数:
stmt
:STMT2 实例指针sql
:带占位符的 SQL 语句,如INSERT INTO ? USING stb TAGS(?,?) VALUES(?,?)
length
:SQL 语句长度,为 0 时自动计算
返回值:
- 成功:返回 0
- 失败:返回错误码
支持的 SQL 格式:
-- 插入语句
INSERT INTO ? USING ? TAGS(?,?) VALUES(?,?)
INSERT INTO ? VALUES(?,?)-- 查询语句
SELECT * FROM ? WHERE ? = ?
taos_stmt2_bind_param
int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx);
功能说明:
绑定参数数据,支持批量绑定多个表的数据。
参数:
stmt
:STMT2 实例指针bindv
:绑定数据结构体col_idx
:列索引,-1=全列绑定,-2=行绑定,>=0=单列绑定
返回值:
- 成功:返回 0
- 失败:返回错误码
绑定模式:
- 全列绑定(
col_idx = -1
):一次绑定所有列 - 行绑定(
col_idx = -2
):按行格式绑定,适用于多列少行 - 单列绑定(
col_idx >= 0
):逐列绑定,需要绑定所有列
taos_stmt2_exec
int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows);
功能说明:
执行已绑定数据的 SQL 语句。
参数:
stmt
:STMT2 实例指针affected_rows
:受影响行数(同步模式)
返回值:
- 成功:返回 0
- 失败:返回错误码
执行模式:
- 同步模式:函数阻塞直到执行完成
- 异步模式:函数立即返回,通过回调函数获取结果
taos_stmt2_close
int taos_stmt2_close(TAOS_STMT2 *stmt);
功能说明:
关闭 STMT2 实例,释放相关资源。
参数:
stmt
:STMT2 实例指针
返回值:
- 成功:返回 0
- 失败:返回错误码
辅助函数
taos_stmt2_is_insert
int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert);
功能说明:
判断 SQL 语句是否为插入语句。
参数:
stmt
:STMT2 实例指针insert
:输出参数,1=插入语句,0=非插入语句
返回值:
- 成功:返回 0
- 失败:返回错误码
taos_stmt2_get_fields
int taos_stmt2_get_fields(TAOS_STMT2 *stmt, int *count, TAOS_FIELD_ALL **fields);
功能说明:
获取 SQL 语句中占位符的详细信息。
参数:
stmt
:STMT2 实例指针count
:输出参数,占位符数量fields
:输出参数,字段信息数组
返回值:
- 成功:返回 0
- 失败:返回错误码
注意事项:
- 需要先绑定表名或在 SQL 中指定表名
- 非插入语句返回 NULL
taos_stmt2_free_fields
void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_ALL *fields);
功能说明:
释放 taos_stmt2_get_fields
返回的内存。
参数:
stmt
:STMT2 实例指针fields
:要释放的字段信息数组
taos_stmt2_result
TAOS_RES *taos_stmt2_result(TAOS_STMT2 *stmt);
功能说明:
获取查询结果集句柄。
参数:
stmt
:STMT2 实例指针
返回值:
- 成功:返回结果集句柄
- 失败:返回 NULL
注意:
- 不能使用
taos_free_result
释放此结果集 - 主要用于查询语句的结果处理
taos_stmt2_error
char *taos_stmt2_error(TAOS_STMT2 *stmt);
功能说明:
获取最后一次错误的详细信息。
参数:
stmt
:STMT2 实例指针
返回值:
- 错误信息字符串
异步函数
taos_stmt2_bind_param_a
int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp, void *param);
功能说明:
异步绑定参数(不推荐使用)。
参数:
stmt
:STMT2 实例指针bindv
:绑定数据结构体col_idx
:列索引fp
:回调函数param
:用户数据指针
返回值:
- 成功:返回 0
- 失败:返回错误码
注意:绑定操作主要是内存复制,不涉及网络I/O,建议使用同步版本。
完整示例
以下是一个完整的使用示例,展示了如何使用 STMT2 API 进行批量数据写入:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taos.h"// 错误检查函数
void checkError(TAOS_STMT2* stmt, int code) {if (code != TSDB_CODE_SUCCESS) {printf("STMT2 错误: %s\n", taos_stmt2_error(stmt));exit(1);}
}// 执行查询
void executeQuery(TAOS* taos, const char* sql) {TAOS_RES* result = taos_query(taos, sql);int code = taos_errno(result);if (code != TSDB_CODE_SUCCESS) {printf("查询失败: %s\n", taos_errstr(result));exit(1);}taos_free_result(result);
}int main() {// 连接数据库TAOS* taos = taos_connect("localhost", "root", "taosdata", "", 0);if (taos == NULL) {printf("数据库连接失败\n");return -1;}// 创建数据库和表executeQuery(taos, "DROP DATABASE IF EXISTS test_db");executeQuery(taos, "CREATE DATABASE test_db");executeQuery(taos, "USE test_db");executeQuery(taos, "CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) ""TAGS (location BINARY(64), groupId INT)");// 初始化 STMT2TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL};TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);if (stmt == NULL) {printf("STMT2 初始化失败\n");taos_close(taos);return -1;}// 准备 SQLconst char* sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES(?,?,?,?)";int code = taos_stmt2_prepare(stmt, sql, 0);checkError(stmt, code);// 准备数据const int TABLE_COUNT = 3;const int ROW_COUNT = 5;// 表名char* tbnames[TABLE_COUNT];for (int i = 0; i < TABLE_COUNT; i++) {tbnames[i] = malloc(64);sprintf(tbnames[i], "d%d", i);}// 标签数据TAOS_STMT2_BIND** tags = malloc(TABLE_COUNT * sizeof(TAOS_STMT2_BIND*));for (int i = 0; i < TABLE_COUNT; i++) {tags[i] = malloc(2 * sizeof(TAOS_STMT2_BIND));// location 标签static char locations[3][64] = {"Beijing", "Shanghai", "Guangzhou"};int location_len = strlen(locations[i]);tags[i][0].buffer_type = TSDB_DATA_TYPE_BINARY;tags[i][0].buffer = locations[i];tags[i][0].length = &location_len;tags[i][0].is_null = NULL;tags[i][0].num = 1;// groupId 标签static int groupIds[3] = {1, 2, 3};int groupId_len = sizeof(int);tags[i][1].buffer_type = TSDB_DATA_TYPE_INT;tags[i][1].buffer = &groupIds[i];tags[i][1].length = &groupId_len;tags[i][1].is_null = NULL;tags[i][1].num = 1;}// 列数据TAOS_STMT2_BIND** bind_cols = malloc(TABLE_COUNT * sizeof(TAOS_STMT2_BIND*));for (int i = 0; i < TABLE_COUNT; i++) {bind_cols[i] = malloc(4 * sizeof(TAOS_STMT2_BIND));// 时间戳int64_t* ts = malloc(ROW_COUNT * sizeof(int64_t));int* ts_len = malloc(ROW_COUNT * sizeof(int));for (int j = 0; j < ROW_COUNT; j++) {ts[j] = 1591060628000 + j * 1000;ts_len[j] = sizeof(int64_t);}bind_cols[i][0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;bind_cols[i][0].buffer = ts;bind_cols[i][0].length = ts_len;bind_cols[i][0].is_null = NULL;bind_cols[i][0].num = ROW_COUNT;// currentfloat* current = malloc(ROW_COUNT * sizeof(float));int* current_len = malloc(ROW_COUNT * sizeof(int));for (int j = 0; j < ROW_COUNT; j++) {current[j] = 10.0 + i + j * 0.1;current_len[j] = sizeof(float);}bind_cols[i][1].buffer_type = TSDB_DATA_TYPE_FLOAT;bind_cols[i][1].buffer = current;bind_cols[i][1].length = current_len;bind_cols[i][1].is_null = NULL;bind_cols[i][1].num = ROW_COUNT;// voltageint* voltage = malloc(ROW_COUNT * sizeof(int));int* voltage_len = malloc(ROW_COUNT * sizeof(int));for (int j = 0; j < ROW_COUNT; j++) {voltage[j] = 220 + i + j;voltage_len[j] = sizeof(int);}bind_cols[i][2].buffer_type = TSDB_DATA_TYPE_INT;bind_cols[i][2].buffer = voltage;bind_cols[i][2].length = voltage_len;bind_cols[i][2].is_null = NULL;bind_cols[i][2].num = ROW_COUNT;// phasefloat* phase = malloc(ROW_COUNT * sizeof(float));int* phase_len = malloc(ROW_COUNT * sizeof(int));for (int j = 0; j < ROW_COUNT; j++) {phase[j] = 0.1 * i + j * 0.05;phase_len[j] = sizeof(float);}bind_cols[i][3].buffer_type = TSDB_DATA_TYPE_FLOAT;bind_cols[i][3].buffer = phase;bind_cols[i][3].length = phase_len;bind_cols[i][3].is_null = NULL;bind_cols[i][3].num = ROW_COUNT;}// 绑定参数TAOS_STMT2_BINDV bindv = {TABLE_COUNT, tbnames, tags, bind_cols};code = taos_stmt2_bind_param(stmt, &bindv, -1);checkError(stmt, code);// 执行int affected_rows;code = taos_stmt2_exec(stmt, &affected_rows);checkError(stmt, code);printf("成功插入 %d 行数据\n", affected_rows);// 释放资源for (int i = 0; i < TABLE_COUNT; i++) {free(tbnames[i]);free(tags[i]);free(bind_cols[i][0].buffer);free(bind_cols[i][0].length);free(bind_cols[i][1].buffer);free(bind_cols[i][1].length);free(bind_cols[i][2].buffer);free(bind_cols[i][2].length);free(bind_cols[i][3].buffer);free(bind_cols[i][3].length);free(bind_cols[i]);}free(tags);free(bind_cols);taos_stmt2_close(stmt);taos_close(taos);return 0;
}
最佳实践
错误处理
void checkError(TAOS_STMT2* stmt, int code) {if (code != TSDB_CODE_SUCCESS) {printf("STMT2 错误: %s\n", taos_stmt2_error(stmt));// 发生错误时,需要关闭并重新初始化 stmttaos_stmt2_close(stmt);// 重新初始化...}
}
资源管理
// 正确的资源管理示例
TAOS_STMT2 *stmt = taos_stmt2_init(taos, &option);
if (stmt == NULL) {printf("初始化失败\n");return -1;
}// 使用完毕后必须关闭
taos_stmt2_close(stmt);
性能优化技巧
-
选择合适的绑定模式
- 多列少行:使用行绑定(
col_idx = -2
) - 少列多行:使用列绑定(
col_idx = -1
) - 大量表:使用高效写入模式
- 多列少行:使用行绑定(
-
批量操作
- 尽可能批量绑定多个表的数据
- 避免频繁的单行插入
-
内存管理
- STMT2 优化了内存使用,变长数据只分配实际需要的空间
- 及时释放字段信息内存
常见问题与解决方案
错误处理
问题: STMT2 流程中出现错误后整个实例不可用
解决方案:
if (error_occurred) {taos_stmt2_close(stmt);stmt = taos_stmt2_init(taos, &option);// 重新开始流程
}
使用顺序
问题: 违反使用顺序导致报错
解决方案: 严格按照以下顺序使用
taos_stmt2_init
- 初始化taos_stmt2_prepare
- 准备SQLtaos_stmt2_bind_param
- 绑定参数taos_stmt2_exec
- 执行taos_stmt2_close
- 关闭
字段信息获取
问题: taos_stmt2_get_fields
无法返回字段信息
解决方案:
- 确保已绑定表名,或在 SQL 中指定完整表名
- 对于
INSERT INTO ? VALUES(?,?)
格式,必须先绑定表名
中文表名处理
问题: 包含中文字符的表名解析失败
解决方案:
// 使用反引号包围中文表名
char *tbname = "`数据库`.`中文表名`";
参数绑定限制
问题: 固定值和占位符混合使用
解决方案:
-- 错误:混合使用固定值和占位符
INSERT INTO ? USING stb TAGS(100, ?) VALUES(?, now())-- 正确:全部使用占位符
INSERT INTO ? USING stb TAGS(?, ?) VALUES(?, ?)
模式切换
问题: 需要在高效写入和普通模式间切换
解决方案:
// 需要重新初始化
taos_stmt2_close(stmt);
option.singleStbInsert = false; // 切换模式
stmt = taos_stmt2_init(taos, &option);
版本兼容性
STMT vs STMT2
特性 | STMT | STMT2 |
---|---|---|
内存使用 | 有内存空洞 | 优化内存布局 |
性能 | 较好 | 更好 |
接口复杂度 | 中等 | 略高 |
批量操作 | 支持 | 增强支持 |
异步支持 | 有限 | 完整支持 |
迁移建议
- 新项目:直接使用 STMT2 API
- 旧项目:逐步迁移,STMT2 提供了兼容性支持
- 性能要求高:优先使用 STMT2
故障排除
常见错误码
TSDB_CODE_SUCCESS (0)
:操作成功TSDB_CODE_INVALID_PARA
:参数错误TSDB_CODE_OUT_OF_MEMORY
:内存不足TSDB_CODE_INVALID_SQL
:SQL 语法错误
调试技巧
-
启用详细日志
// 在程序开始时设置 taos_options(TSDB_OPTION_DEBUGFLAG, "DEBUG");
-
检查错误信息
char *error = taos_stmt2_error(stmt); printf("详细错误: %s\n", error);
-
验证字段信息
int count; TAOS_FIELD_ALL *fields; taos_stmt2_get_fields(stmt, &count, &fields); for (int i = 0; i < count; i++) {printf("字段 %d: %s, 类型: %d\n", i, fields[i].name, fields[i].type); } taos_stmt2_free_fields(stmt, fields);
相关资源
- TDengine 官方文档
- STMT2 单元测试
- 性能测试报告
- 社区支持