当前位置: 首页 > news >正文

C# .Net Core通过StreamLoad向Doris写入CSV数据

以下代码可以只关注StreamLoad具体实现。

1.创建StreamLoad返回值Model

public class StreamLoadResponse
{public long TxnId { get; set; }public string Label { get; set; }public string Comment { get; set; }public string TwoPhaseCommit { get; set; }public string Status { get; set; }public string Message { get; set; }public long NumberTotalRows { get; set; }public long NumberLoadedRows { get; set; }public long NumberFilteredRows { get; set; }public long NumberUnselectedRows { get; set; }public long LoadBytes { get; set; }public long LoadTimeMs { get; set; }public long BeginTxnTimeMs { get; set; }public long StreamLoadPutTimeMs { get; set; }public long ReadDataTimeMs { get; set; }public long WriteDataTimeMs { get; set; }public long CommitAndPublishTimeMs { get; set; }
}

2.创建Doris StreamLoad接口

public interface IDorisApiService
{/// <summary>/// /// </summary>/// <param name="database">数据库</param>/// <param name="table">表</param>/// <param name="authorization">认证信息,格式 username:pwd</param>/// <param name="content">csv格式的字符串</param>/// <returns></returns>StreamLoadResponse StreamLoad(string database, string table, string authorization, string content);
}

3.实现接口,核心代码,逻辑并不复杂,组装一个http请求所需的内容。

需要注意的是:(1)示例csv格式的字符串分割符为‘\t’,而不是常用的逗号,这也是官方默认的分割方式,如果你想用其他的分隔符,需要在header里配置column_separator。建议不要用逗号,因为涉及到复杂的json字符串的时候,里面的逗号会导致解析异常,即便官方文档里有相关的处理方式(enclose),似乎仍然存在问题。(2)我们请求了两次,第一次请求会重定向到BE节点的地址,然后用此地址再次请求。这是正常的。(3)我们采用的format是csv_with_names,第一行是列明,请确保跟数据库table列顺序和数量保持一致

public class DorisApiService : IDorisApiService
{private readonly HttpClient _httpClient;public DorisApiService(HttpClient httpClient){_httpClient = httpClient;}public StreamLoadResponse StreamLoad(string database, string table, string authorization, string content){var url = $"/api/{database}/{table}/_stream_load";var request = new HttpRequestMessage(HttpMethod.Put, url);request.Headers.Add("Expect", "100-continue");request.Headers.Add("format", "csv_with_names");request.Headers.Add("column_separator", "\t");request.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", authorization);var response = _httpClient.Send(request, HttpCompletionOption.ResponseHeadersRead);if (response.StatusCode == HttpStatusCode.TemporaryRedirect || response.StatusCode == HttpStatusCode.RedirectKeepVerb){var redirectUrl = response.Headers.Location.ToString();request = new HttpRequestMessage(HttpMethod.Put, redirectUrl){Content = new StringContent(content, Encoding.UTF8, "text/plain")};request.Headers.Add("Expect", "100-continue");request.Headers.Add("format", "csv_with_names");request.Headers.Add("column_separator", "\t");request.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", authorization);response = _httpClient.Send(request, HttpCompletionOption.ResponseHeadersRead);}string responseBody = response.Content.ReadAsStringAsync().Result;           if (response.IsSuccessStatusCode){var streamLoadResponse = JsonSerializer.Deserialize<StreamLoadResponse>(responseBody);if (streamLoadResponse.Status == "Success"){return streamLoadResponse;}else{throw new Exception(responseBody);}}else{throw new Exception(responseBody);}}
}

4.Program配置

services.AddHttpClient<IDorisApiService, DorisApiService>(client =>
{//从配置文件获取Doris的请求地址和端口:settings.ApiHostclient.BaseAddress = new Uri(settings.ApiHost);client.Timeout = TimeSpan.FromSeconds(300);
}).ConfigurePrimaryHttpMessageHandler(() => new HttpClientHandler
{AllowAutoRedirect = false
});

http://www.lryc.cn/news/489268.html

相关文章:

  • React-自定义Hook与逻辑共享
  • 蓝桥杯每日真题 - 第17天
  • 游戏开发实现简易实用的ui框架
  • vue3的attr透传属性详解和使用法方式。以及在css样式的伪元素中实现
  • 【仿真建模-MESA】框架简介
  • Linux环境基础开发工具的使用(yum、vim、gcc、g++、gdb、make/Makefile)
  • VSCode 间距太小
  • 【K8S系列】imagePullSecrets配置正确,但docker pull仍然失败,进一步排查详细步骤
  • 【ARM Coresight OpenOCD 系列 5.1 -- OpenOCD 无法识别CPUID 问题: xxx is unrecognized】
  • 如何实现点击目录跳转到指定位置?【vue】
  • SQL 通配符
  • ubuntu显示管理器_显示导航栏
  • 黑芝麻嵌入式面试题及参考答案
  • 使用 PyTorch-BigGraph 构建和部署大规模图嵌入的完整教程
  • 系统性能优化方法论详解:从理解系统到验证迭代
  • 使用Tengine 对负载均衡进行状态检查(day028)
  • 网站推广实战案例:杭州翔胜科技有限公司如何为中小企业打开市场大门
  • 视频修复技术和实时在线处理
  • 文心一言 VS 讯飞星火 VS chatgpt (396)-- 算法导论25.2 1题
  • 如何使用本地大模型做数据分析
  • 【Nginx从入门到精通】04-安装部署-使用XShell给虚拟机配置静态ip
  • C# 面向对象的接口
  • 使用IDEA+Maven实现MapReduced的WordCount
  • go语言示例代码
  • 华为云容器监控平台
  • 阿里短信发送报错 InvalidTimeStamp.Expired
  • Ubuntu问题 -- 设置ubuntu的IP为静态IP (图形化界面设置) 小白友好
  • Sigrity SPEED2000 TDR TDT Simulation模式如何进行时域阻抗仿真分析操作指导-差分信号
  • Cesium 加载B3DM模型
  • 阿里巴巴官方「SpringCloudAlibaba全彩学习手册」限时开源!