(001)window 使用 OpenObserve
文章目录
- 安装
- 上传数据
- 报错
- 附录
安装
1.下载安装包:
2. window 设置环境变量:
ZO_ETCD_COMMAND_TIMEOUT = 600
ZO_ETCD_CONNECT_TIMEOUT = 600
ZO_ETCD_LOCK_WAIT_TIMEOUT = 600
ZO_INGEST_ALLOWED_UPTO = 10000
ZO_ROOT_USER_EMAIL = 422615924@qq.com
ZO_ROOT_USER_PASSWORD = 8R4VMmC1Su975e026Ln3
- 直接运行 openobserve.exe 启动程序:
上传数据
1.Gradle 需要的安装包:
// https://mvnrepository.com/artifact/cn.hutool/hutool-allimplementation group: 'cn.hutool', name: 'hutool-all', version: '5.8.23'// https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2implementation group: 'com.alibaba.fastjson2', name: 'fastjson2', version: '2.0.45'implementation group: 'org.slf4j', name: 'log4j-over-slf4j', version: '1.7.25'implementation group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25'implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'implementation group: 'ch.qos.logback', name: 'logback-core', version: '1.2.3'implementation group: 'ch.qos.logback', name: 'logback-access', version: '1.2.3'// https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttpimplementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '5.0.0-alpha.12'implementation group: 'com.alibaba', name: 'druid', version: '1.1.9'
2.数据目录和内容格式:
3.上传代码:
package org.example;import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;/*** 008*/
public class UploadOpenObserve {private static final Logger logger = LoggerFactory.getLogger(UploadOpenObserve.class);private static String targetDirectory = "D:\\S3log\\unzip12\\";private static ConcurrentHashMap<String, String> failFile = new ConcurrentHashMap<>();private static String mail = "422615924@qq.com";private static String password = "4MHyN8BGMaCRyEen";private static String credential = Credentials.basic("422615924@qq.com", "4MHyN8BGMaCRyEen");private static volatile OkHttpClient okHttpClient;private static String buyStreamName = "buy105";private static String payStreamName = "pay105";public static void main(String[] args) {uploadBuyAndPay();}private static void upload_jpy20231216(){List<File> directories = listDirectory(targetDirectory);for (int i = 0; i < directories.size(); i++) {File file = directories.get(i);if (file.getName().startsWith("20231215_") || file.getName().startsWith("20231216_")){logger.debug(file.getName());uploadDirectory_jpy("jpy20231216_002", file);}}}private static void uploadBuyAndPay(){List<File> directories = listDirectory(targetDirectory);ExecutorService executorService = Executors.newFixedThreadPool(Math.max(2, Runtime.getRuntime().availableProcessors() / 2));CountDownLatch countDownLatch = new CountDownLatch(directories.size());for (int i = 0; i < directories.size(); i++) {final File file = directories.get(i);final int j = i;executorService.submit(() -> {try {uploadDirectory(file);}finally {countDownLatch.countDown();logger.debug("目录传输完成: {}, {}/{}", file.getName(), j, directories.size());}});}try {countDownLatch.await();} catch (Exception e) {logger.error("", e);} finally {executorService.shutdown();}logger.debug("任务执行完成.");}private static List<File> listDirectory(String targetDirectory) {File[] files = FileUtil.ls(targetDirectory);List<File> arrFiles = new ArrayList<>();for (int i = 0; i < files.length; i++) {if (files[i].isDirectory()) {arrFiles.add(files[i]);}}return arrFiles;}private static void checkDirectoryFile(File directory) {if (!directory.isDirectory()) {logger.error("not a directory {}", directory.getName());return;}File[] files = FileUtil.ls(directory.getPath());for (int i = 0; i < files.length; i++) {String name = files[i].getName();if (!name.startsWith("2023") && !name.startsWith("JPY")) {logger.error("error file {}", name);}}}private static void uploadDirectory_jpy(String streamName, File directory) {if (!directory.isDirectory()) {logger.error("not a directory {}", directory.getName());return;}File[] files = FileUtil.ls(directory.getPath());for (int i = 0; i < files.length; i++) {String name = files[i].getName();if (!name.startsWith("JPY")) {continue;}uploadFile(directory, streamName, files[i]);}}private static void uploadDirectory(File directory) {if (!directory.isDirectory()) {logger.error("not a directory {}", directory.getName());return;}
// if (FileUtil.exist(StrUtil.format("{}/upload", directory.getPath()))) {
// logger.debug("has upload {}", directory.getPath());
// return;
// }File[] files = FileUtil.ls(directory.getPath());for (int i = 0; i < files.length; i++) {String name = files[i].getName();if (name.startsWith("JPY")) {continue;}if (name.endsWith("Buy.log")) {uploadFile(directory, buyStreamName, files[i]);} else if (name.endsWith("Pay.log")) {uploadFile(directory, payStreamName, files[i]);}}}public static OkHttpClient getOkHttpInstance(){if (null == okHttpClient){synchronized (UploadOpenObserve.class){if (okHttpClient == null){okHttpClient = new OkHttpClient.Builder().callTimeout(7200, TimeUnit.SECONDS).connectTimeout(3600, TimeUnit.SECONDS).readTimeout(3600, TimeUnit.SECONDS).writeTimeout(3600, TimeUnit.SECONDS).connectionPool(new ConnectionPool(32, 5, TimeUnit.MINUTES)).build();return okHttpClient;}}}return okHttpClient;}private static void uploadFile(File directory, String streamName, File file) {String url = StrUtil.format("http://localhost:5080/api/default/{}/_json", streamName);List<String> lines = FileUtil.readLines(file, StandardCharsets.UTF_8);lines.removeIf(item -> !item.startsWith("{"));if (lines.isEmpty()) {return;}List<JSONObject> jsonObjects = new ArrayList<>();lines.forEach(item -> {try {JSONObject jsonObject = JSONObject.parseObject(item);if (jsonObject.containsKey("JSTDate")){String value = jsonObject.getString("JSTDate");jsonObject.put("jst_time", value.substring(0, 8));jsonObject.put("jst_dateday", value.substring(0, 6));jsonObject.put("jst_day", value.substring(6, 8));}jsonObjects.add(jsonObject);} catch (Exception e){logger.error("", e);}});if (jsonObjects.isEmpty())return;RequestBody requestBody = RequestBody.create(JSON.toJSONString(jsonObjects), MediaType.parse("application/x-www-form-urlencoded"));Request request = new Request.Builder().addHeader("Authorization", credential).url(url).post(requestBody).build();boolean success = false;try (Response response = getOkHttpInstance().newCall(request).execute()) {success = response.isSuccessful();} catch (Exception e) {e.printStackTrace();logger.debug("upload fail {}, reason {}", file.getName(), e.getMessage());} finally {if (success){FileUtil.touch(StrUtil.format("{}/upload", directory.getPath()));// String res = response.body().string();// logger.debug("upload success {}, {}", file.getName(), JSON.toJSONString(res));} else {failFile.put(file.getName(), "true");}}}/*** curl -u 422615924@qq.com:8R4VMmC1Su975e026Ln3 -k https://api.openobserve.ai/api/peilin_organization_3737_H87YxMBFXYifSaV/default/_json* -d '[{"level":"info","job":"test","log":"test message for openobserve","_timestamp": 1704958559370}]'*/private static void testUploadJson() {String url = "https://api.openobserve.ai/api/peilin_organization_3737_H87YxMBFXYifSaV/test1/_json";HttpRequest request = HttpUtil.createPost(url);request.basicAuth("422615924@qq.com", "8R4VMmC1Su975e026Ln3");request.body("[{\"level\":\"inf\",\"jo\":43212,\"log\":\"test message for openobserve\"}]", "application/json");HttpResponse response = request.execute();logger.info("{}", JSON.toJSON(response.body()));}
}
报错
一、 发送的数据格式错误:
二、 数据的太旧了:
附录
[1] Github openobserve/openobserve
[2] 官网手册 openobserve