当前位置: 首页 > news >正文

从电商平台下载数据的项目经验分享 (part 1)

文章目录

  • 背景
  • 本公司的设计
    • 业务调研
    • 资源调研
    • 架构设计
    • 定时任务设计
      • httpclient连接池
      • 重试机制
      • 请求收敛
      • 时间切片的动态分配
      • API调用大盘
      • 告警
      • 手动抓取接口
      • 线程池提速
        • 定义业务线程池
          • 拒绝策略
      • 不丢弃任务的处理方案
        • 规避触发拒绝策略
        • 触发拒绝策略之后的处理
      • 分布式锁
      • 平台接口限频处理
  • 待扩展点
  • todo

背景

我们在抖音,快手,视频等平台运营者很多账号, 各个平台也提供了后台服务供观察投流效果。 但是每个平台都需要登录很麻烦,公司也想收集投流的数据,整合一套投流的智能系统,提高roi收益。所以最近经手了从快手,视频号定时下载数据的工作。 另外曾供职某erp公司的平台数据部门,所以这里总结下定时任务下载任务要怎么设计。

本公司的设计

业务调研

公司在抖音,快手,视频号,小红书,天猫,京东,拼多多,得物,百度平台投流,这些平台在各自的平台提供了api。所以整体的模式是
step1 --> 请求平台的接口,拿到投流数据
step2 --> 落到mysql数据库
step3 --> 数仓读数据, 然后定期清理mysql数据(mysql只有500G 不清理的情况下很容易被业务数据灌满)

资源调研

公司客观限制

  1. 前期只能提供一台8c 16G 200G的服务器。资源明确不足时 才可以申请新资源
  2. 业务没有上云, 缺少k8s的基础

架构设计

标准的springcloud alibaba架构, 在单实例无法应该业务的时候, 部署多台platform实例, 预留扩展空间
platform维护定时任务的具体实现
system 系统服务
auth 鉴权相关
job 定时任务

在这里插入图片描述

定时任务设计

  1. 定时触发

job依赖quartz完成定时任务的触发

对比项QuartzXXL-Job
分布式调度❌ 原生不支持✅ 支持简单分布式调度
UI 管理❌ 无官方 UI,需集成✅ 提供 Web 控制台
动态任务配置❌ 靠代码配置✅ 控制台配置
失败重试❌ 需自定义✅ 内建支持
定时精度✅ 毫秒级⚠️ 秒级为主
易用性❌ 较复杂✅ 简单易用
执行隔离✅ 自定义线程池⚠️ 需手动配置隔离,默认不隔离
社区支持✅ 社区较活跃⚠️ 活跃度一般
  1. job服务通过feign调用platform服务

微服务内部调用不通过gateway, 在服务注册与发现中心查找ServiceNameConstants.PLATFORM_SERVICE服务列表,使用负载均衡器选择一个可用实例, 然后发起http请求

@FeignClient(contextId = "remotePlatformService",value = ServiceNameConstants.PLATFORM_SERVICE,fallbackFactory = RemotePlatformFallbackFactory.class)
public interface RemotePlatformService {
}
  1. 工厂,模版模式实现主流程
    3.1 模版实现公共逻辑
    3.1.1 将任务单元准备好
public interface DownloadTaskBaseService {/*** 下载任务的入口*/void downloadJob(SphJobEnum jobEnum, String requestId);/*** 手动下载任务* @param reqVO*/void downloadByManual(SphManualReqVO reqVO);
}@Overridepublic void downloadJob(SphJobEnum jobEnum, String requestId) {// 0.0 这是长耗时任务, 加锁1小时try {// 1.0 查询可用的授权列表// 2.0 轮训各个授权的对应的广告主的增量广告计划loopDownloadJob()} finally {// 3.0 解锁,统计耗时}}private void loopDownloadJob(List<SphOauth2AccessTokenDO> validAuthList, String requestId, SphJobEnum jobEnum) {// 1.0 乱序广告主,让广告主的执行更公平// 2.0 按照配置对广告主分组,为线程池提供数据支撑}private void dispatchTask(List<SphAuthAdvertiserInfoDO> advertiserList, SphOauth2AccessTokenDO tokenDO,String requestId, SphJobEnum jobEnum, AtomicInteger oauthCount, AtomicInteger oauthCurrentCounter,AtomicInteger taskTotalCounter, AtomicInteger taskCurrentCounter) {// 线程池加速下载, 编排任务 规避线程池拒绝List<CompletableFuture<String>> featureList = new ArrayList<>();for (SphAuthAdvertiserInfoDO advertiserInfoDO : advertiserList) {CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {downloadJob(advertiserInfoDO, tokenDO,authProcess,advertiserProcess,requestId, jobEnum);return authProcess + advertiserProcess + "执行完毕";}, sphThreadPoolExecutor).exceptionally((ex -> {return "执行失败:" + ex.getCause().getMessage();}));featureList.add(feature);}CompletableFuture.allOf(featureList.toArray(new CompletableFuture[0])).join();}

3.1.2 子任务完成数据下载以及数据入库

public abstract class SphDownloadTaskBaseServiceImpl implements SphDownloadTaskBaseService {/*** 需要具体的子任务实现* 1. 查询查询结果* 2. 批量入库* @param lastRequestTimeDO* @param endTime* @param advertiserInfoDO* @param tokenDO*/protected abstract void downloadJob(SysDictData lastRequestTimeDO, LocalDateTime endTime, SphAuthAdvertiserInfoDO advertiserInfoDO, SphOauth2AccessTokenDO tokenDO, String authProgress, String advertiserProgress);/*** 需要具体的子任务实现* 查询的上次查询时间* @return*/protected abstract SysDictData getLastRequestTime(SphAuthAdvertiserInfoDO advertiserInfoDO);/*** 子任务实现** 保存请求时间* @param lastRequestTimeDO* @param endTime* @param advertiserInfoDO*/protected abstract void saveLastRequestTime(SysDictData lastRequestTimeDO, LocalDateTime endTime, SphAuthAdvertiserInfoDO advertiserInfoDO);
}

3.1.3 工厂模式调用各个子任务

platform服务对外job服务暴漏下载功能

    /*** 查询服务* @param jobEnum* @return*/private SphDownloadTaskBaseService getTaskService(SphJobEnum jobEnum) {return SpringUtils.getBean(jobEnum.getJobName());}private void downloadJob(SphJobEnum jobEnum, String requestId) {switch (jobEnum) {case SPH_ADGROUP_DAY_REPORT:case SPH_ADGROUP_HOUR_REPORT:case SPH_REFRESH_TOKEN:case SPH_DOWNLOAD_ADGROUP:case SPH_DOWNLOAD_VIDEO:case SPH_DOWNLOAD_PIC:case SPH_DOWNLOAD_WECHAT_AUTHORIZATION:case SPH_DOWNLOAD_CAMPAIGN:case SPH_DOWNLOAD_VIDEO_DAY_REPORT:case SPH_DOWNLOAD_IMAGE_DAY_REPORT:getTaskService(jobEnum).downloadJob(jobEnum, requestId);break;default: {log.error("不支持该该类型的定时任务 ServiceName:{} 任务描述:{}", jobEnum.getJobName(), jobEnum.getDesc());throw new ServiceException("不支持该该类型的定时任务");}}}

httpclient连接池

从业务模式可以看出需要对平台发起大量的请求, 所以很有必要引入httpclient连接池来管理连接

  1. 引入httpclient依赖
        <dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId></dependency>
  1. 维护工具类 - 请求工具类 HttpUtils
    2.1 HttpUtils.getCallWithPojo 请求入参的GET请求
    2.2 HttpUtils.postCallWithToken POST请求
public class HttpUtils {private static final Logger log = LoggerFactory.getLogger(HttpUtils.class);public static RequestConfig requestConfig;private static CloseableHttpClient httpClient;private static PoolingHttpClientConnectionManager connMgr;private static IdleConnectionMonitorThread idleThread;static {HttpUtils.initClient();}/*** 向指定 URL 发送GET方法的请求** @param url 发送请求的 URL* @return 所代表远程资源的响应结果*/public static String sendGet(String url) {return sendGet(url, StringUtils.EMPTY);}/*** 向指定 URL 发送GET方法的请求** @param url   发送请求的 URL* @param param 请求参数,请求参数应该是 name1=value1&name2=value2 的形式。* @return 所代表远程资源的响应结果*/public static String sendGet(String url, String param) {return sendGet(url, param, Constants.UTF8);}/*** 向指定 URL 发送GET方法的请求** @param url         发送请求的 URL* @param param       请求参数,请求参数应该是 name1=value1&name2=value2 的形式。* @param contentType 编码类型* @return 所代表远程资源的响应结果*/public static String sendGet(String url, String param, String contentType) {StringBuilder result = new StringBuilder();BufferedReader in = null;try {String urlNameString = StringUtils.isNotBlank(param) ? url + "?" + param : url;log.info("sendGet - {}", urlNameString);URL realUrl = new URL(urlNameString);URLConnection connection = realUrl.openConnection();connection.setRequestProperty("accept", "*/*");connection.setRequestProperty("connection", "Keep-Alive");connection.setRequestProperty("user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64)");connection.connect();in = new BufferedReader(new InputStreamReader(connection.getInputStream(), contentType));String line;while ((line = in.readLine()) != null) {result.append(line);}log.info("recv - {}", result);} catch (ConnectException e) {log.error("调用HttpUtils.sendGet ConnectException, url=" + url + ",param=" + param, e);} catch (SocketTimeoutException e) {log.error("调用HttpUtils.sendGet SocketTimeoutException, url=" + url + ",param=" + param, e);} catch (IOException e) {log.error("调用HttpUtils.sendGet IOException, url=" + url + ",param=" + param, e);} catch (Exception e) {log.error("调用HttpsUtil.sendGet Exception, url=" + url + ",param=" + param, e);} finally {try {if (in != null) {in.close();}} catch (Exception ex) {log.error("调用in.close Exception, url=" + url + ",param=" + param, ex);}}return result.toString();}/*** 向指定 URL 发送POST方法的请求** @param url   发送请求的 URL* @param param 请求参数,请求参数应该是 name1=value1&name2=value2 的形式。* @return 所代表远程资源的响应结果*/public static String sendPost(String url, String param) {PrintWriter out = null;BufferedReader in = null;StringBuilder result = new StringBuilder();try {log.info("sendPost - {}", url);URL realUrl = new URL(url);URLConnection conn = realUrl.openConnection();conn.setRequestProperty("accept", "*/*");conn.setRequestProperty("connection", "Keep-Alive");conn.setRequestProperty("user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64)");conn.setRequestProperty("Accept-Charset", "utf-8");conn.setRequestProperty("contentType", "utf-8");conn.setDoOutput(true);conn.setDoInput(true);out = new PrintWriter(conn.getOutputStream());out.print(param);out.flush();in = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8));String line;while ((line = in.readLine()) != null) {result.append(line);}log.info("recv - {}", result);} catch (ConnectException e) {log.error("调用HttpUtils.sendPost ConnectException, url=" + url + ",param=" + param, e);} catch (SocketTimeoutException e) {log.error("调用HttpUtils.sendPost SocketTimeoutException, url=" + url + ",param=" + param, e);} catch (IOException e) {log.error("调用HttpUtils.sendPost IOException, url=" + url + ",param=" + param, e);} catch (Exception e) {log.error("调用HttpsUtil.sendPost Exception, url=" + url + ",param=" + param, e);} finally {try {if (out != null) {out.close();}if (in != null) {in.close();}} catch (IOException ex) {log.error("调用in.close Exception, url=" + url + ",param=" + param, ex);}}return result.toString();}public static String sendSSLPost(String url, String param) {StringBuilder result = new StringBuilder();String urlNameString = url + "?" + param;try {log.info("sendSSLPost - {}", urlNameString);SSLContext sc = SSLContext.getInstance("SSL");sc.init(null, new TrustManager[]{new TrustAnyTrustManager()}, new java.security.SecureRandom());URL console = new URL(urlNameString);HttpsURLConnection conn = (HttpsURLConnection) console.openConnection();conn.setRequestProperty("accept", "*/*");conn.setRequestProperty("connection", "Keep-Alive");conn.setRequestProperty("user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64)");conn.setRequestProperty("Accept-Charset", "utf-8");conn.setRequestProperty("contentType", "utf-8");conn.setDoOutput(true);conn.setDoInput(true);conn.setSSLSocketFactory(sc.getSocketFactory());conn.setHostnameVerifier(new TrustAnyHostnameVerifier());conn.connect();InputStream is = conn.getInputStream();BufferedReader br = new BufferedReader(new InputStreamReader(is));String ret = "";while ((ret = br.readLine()) != null) {if (ret != null && !ret.trim().equals("")) {result.append(new String(ret.getBytes(StandardCharsets.ISO_8859_1), StandardCharsets.UTF_8));}}log.info("recv - {}", result);conn.disconnect();br.close();} catch (ConnectException e) {log.error("调用HttpUtils.sendSSLPost ConnectException, url=" + url + ",param=" + param, e);} catch (SocketTimeoutException e) {log.error("调用HttpUtils.sendSSLPost SocketTimeoutException, url=" + url + ",param=" + param, e);} catch (IOException e) {log.error("调用HttpUtils.sendSSLPost IOException, url=" + url + ",param=" + param, e);} catch (Exception e) {log.error("调用HttpsUtil.sendSSLPost Exception, url=" + url + ",param=" + param, e);}return result.toString();}private static class TrustAnyTrustManager implements X509TrustManager {@Overridepublic void checkClientTrusted(X509Certificate[] chain, String authType) {}@Overridepublic void checkServerTrusted(X509Certificate[] chain, String authType) {}@Overridepublic X509Certificate[] getAcceptedIssuers() {return new X509Certificate[]{};}}private static class TrustAnyHostnameVerifier implements HostnameVerifier {@Overridepublic boolean verify(String hostname, SSLSession session) {return true;}}/*** 获取httpClient** @return*/public static CloseableHttpClient getHttpClient() {if (httpClient != null) {return httpClient;} else {return HttpClients.createDefault();}}/*** 创建连接池管理器** @return*/private static PoolingHttpClientConnectionManager createConnectionManager() {PoolingHttpClientConnectionManager connMgr = new PoolingHttpClientConnectionManager();// 将最大连接数增加到connMgr.setMaxTotal(HttpConf.MAX_TOTAL_CONN);// 将每个路由基础的连接增加到connMgr.setDefaultMaxPerRoute(HttpConf.MAX_ROUTE_CONN);return connMgr;}/*** 根据当前配置创建HTTP请求配置参数。** @return 返回HTTP请求配置。*/private static RequestConfig createRequestConfig() {Builder builder = RequestConfig.custom();builder.setConnectionRequestTimeout(StringUtils.nvl(HttpConf.WAIT_TIMEOUT, 10000));builder.setConnectTimeout(StringUtils.nvl(HttpConf.CONNECT_TIMEOUT, 10000));builder.setSocketTimeout(StringUtils.nvl(HttpConf.SO_TIMEOUT, 10000));return builder.build();}/*** 创建默认的HTTPS客户端,信任所有的证书。** @return 返回HTTPS客户端,如果创建失败,返回HTTP客户端。*/private static CloseableHttpClient createHttpClient(HttpClientConnectionManager connMgr) {try {final SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {@Overridepublic boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {// 信任所有return true;}}).build();final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext);// 重试机制HttpRequestRetryHandler retryHandler = new DefaultHttpRequestRetryHandler(HttpConf.RETRY_COUNT, true);ConnectionKeepAliveStrategy connectionKeepAliveStrategy = new ConnectionKeepAliveStrategy() {@Overridepublic long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) {return HttpConf.KEEP_ALIVE_TIMEOUT; // tomcat默认keepAliveTimeout为20s}};httpClient = HttpClients.custom().setSSLSocketFactory(sslsf).setConnectionManager(connMgr).setDefaultRequestConfig(requestConfig).setRetryHandler(retryHandler).setKeepAliveStrategy(connectionKeepAliveStrategy).build();} catch (Exception e) {log.error("Create http client failed", e);httpClient = HttpClients.createDefault();}return httpClient;}/*** 初始化 只需调用一次*/public synchronized static CloseableHttpClient initClient() {if (httpClient == null) {connMgr = createConnectionManager();requestConfig = createRequestConfig();// 初始化httpClient连接池httpClient = createHttpClient(connMgr);// 清理连接池idleThread = new IdleConnectionMonitorThread(connMgr);idleThread.start();}return httpClient;}/*** 关闭HTTP客户端。** @param*/public synchronized static void shutdown() {try {if (idleThread != null) {idleThread.shutdown();idleThread = null;}} catch (Exception e) {log.error("httpclient connection manager close", e);}try {if (httpClient != null) {httpClient.close();httpClient = null;}} catch (IOException e) {log.error("httpclient close", e);}}/*** 请求上游 GET提交** @param uri* @throws IOException*/public static String getCall(final String uri) throws Exception {return getCall(uri, null, Constants.UTF8);}/*** 请求上游 GET提交** @param uri* @param contentType* @throws IOException*/public static String getCall(final String uri, String contentType) throws Exception {return getCall(uri, contentType, Constants.UTF8);}/*** 携带入参的GET请求* @param baseUri 请求基础URL* @param pojo 请求参数对象* @param <K> 参数类型* @return 响应字符串(JSON格式)* @throws IOException 当HTTP请求或响应处理失败时抛出* @throws JSONException 当响应不是有效JSON时抛出*/public static <K> String getCallWithPojo(final String baseUri, K pojo) throws IOException, JSONException {// 参数校验if (baseUri == null || baseUri.trim().isEmpty()) {throw new IllegalArgumentException("Base URI cannot be null or empty");}// 构建URIUriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(baseUri);List<Field> fields = getAllFields(pojo.getClass());// 缓存反射结果以提高性能for (Field field : fields) {field.setAccessible(true);try {Object value = field.get(pojo);if (value != null) {String paramName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE,field.getName());// 处理不同类型参数if (value instanceof String){// URL编码普通参数值builder.queryParam(paramName, URLEncoder.encode(value.toString(), StandardCharsets.UTF_8.name()));} else {// 对JSON字符串进行URL编码String jsonValue = JsonUtils.toJsonString(value);builder.queryParam(paramName, URLEncoder.encode(jsonValue, StandardCharsets.UTF_8.name()));}}} catch (IllegalAccessException | UnsupportedEncodingException e) {log.warn("Failed to process field {}: {}", field.getName(), e.getMessage());}}String finalUrl = builder.build().toUriString();HttpGet httpGet = new HttpGet(finalUrl);httpGet.setConfig(requestConfig);// 使用try-with-resources确保资源释放try (CloseableHttpResponse httpRsp = getHttpClient().execute(httpGet)) {int statusCode = httpRsp.getStatusLine().getStatusCode();// 只处理成功或特定错误状态if (statusCode == HttpStatus.SC_OK || statusCode == HttpStatus.SC_FORBIDDEN) {HttpEntity entity = httpRsp.getEntity();try {String rspText = EntityUtils.toString(entity, StandardCharsets.UTF_8);// 提取跟踪IDHeader traceIdHeader = httpRsp.getFirstHeader("X-Tsa-Trace-Id");String traceId = traceIdHeader != null ? traceIdHeader.getValue() : null;// 构造响应JSONJSONObject responseJson = new JSONObject(rspText);if (traceId != null) {responseJson.putOpt("request_id", traceId);}return responseJson.toString();} finally {EntityUtils.consumeQuietly(entity);}} else {throw new IOException("HTTP request failed with status code: " + statusCode +", URL: " + finalUrl);}} catch (JSONException e) {throw new JSONException("Invalid JSON response from: " + finalUrl, e);} catch (Exception e) {throw new IOException("HTTP request failed for URL: " + finalUrl, e);}}/*** 递归获取类所有字段(包括父类)* @param type 目标类* @return 字段列表*/private static List<Field> getAllFields(Class<?> type) {List<Field> fields = new ArrayList<>();Class<?> currentClass = type;// 使用循环替代递归提高性能while (currentClass != null && currentClass != Object.class) {fields.addAll(Arrays.asList(currentClass.getDeclaredFields()));currentClass = currentClass.getSuperclass();}return fields;}/*** 请求上游 GET提交** @param uri* @param contentType* @param charsetName* @throws IOException*/public static String getCall(final String uri, String contentType, String charsetName) throws Exception {final String url = uri;final HttpGet httpGet = new HttpGet(url);httpGet.setConfig(requestConfig);if (!StringUtils.isEmpty(contentType)) {httpGet.addHeader("Content-Type", contentType);}final CloseableHttpResponse httpRsp = getHttpClient().execute(httpGet);try {if (httpRsp.getStatusLine().getStatusCode() == HttpStatus.SC_OK|| httpRsp.getStatusLine().getStatusCode() == HttpStatus.SC_FORBIDDEN) {final HttpEntity entity = httpRsp.getEntity();final String rspText = EntityUtils.toString(entity, charsetName);// 提取 X-Tsa-Trace-IdHeader traceIdHeader = httpRsp.getFirstHeader("X-Tsa-Trace-Id");String traceId = traceIdHeader != null ? traceIdHeader.getValue() : null;EntityUtils.consume(entity);// 构造返回的 JSON(包含响应体和 traceId)JSONObject responseJson = new JSONObject(rspText); // 解析原始JSONresponseJson.putOpt("request_id", traceId); // 直接添加到顶层return responseJson.toString(); // 返回修改后的JSON} else {throw new IOException("HTTP StatusCode=" + httpRsp.getStatusLine().getStatusCode());}} finally {try {httpRsp.close();} catch (Exception e) {log.error("关闭httpRsp异常", e);}}}/*** 请求上游 POST提交** @param uri* @param paramsMap* @throws IOException*/public static String postCall(final String uri, Map<String, Object> paramsMap) throws Exception {return postCall(uri, null, paramsMap, Constants.UTF8);}/*** 请求上游 POST提交** @param uri* @param contentType* @param paramsMap* @throws IOException*/public static String postCall(final String uri, String contentType, Map<String, Object> paramsMap) throws Exception {return postCall(uri, contentType, paramsMap, Constants.UTF8);}/*** 请求上游 POST提交** @param uri* @param contentType* @param paramsMap* @param charsetName* @throws IOException*/public static String postCall(final String uri, String contentType, Map<String, Object> paramsMap,String charsetName) throws Exception {final String url = uri;final HttpPost httpPost = new HttpPost(url);httpPost.setConfig(requestConfig);if (!StringUtils.isEmpty(contentType)) {httpPost.addHeader("Content-Type", contentType);}// 添加参数List<NameValuePair> list = new ArrayList<NameValuePair>();if (paramsMap != null) {for (Map.Entry<String, Object> entry : paramsMap.entrySet()) {list.add(new BasicNameValuePair(entry.getKey(), (String) entry.getValue()));}}httpPost.setEntity(new UrlEncodedFormEntity(list, charsetName));final CloseableHttpResponse httpRsp = getHttpClient().execute(httpPost);try {if (httpRsp.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {final HttpEntity entity = httpRsp.getEntity();final String rspText = EntityUtils.toString(entity, charsetName);EntityUtils.consume(entity);return rspText;} else {throw new IOException("HTTP StatusCode=" + httpRsp.getStatusLine().getStatusCode());}} finally {try {httpRsp.close();} catch (Exception e) {log.error("关闭httpRsp异常", e);}}}/*** 请求上游 POST提交** @param uri* @param param* @throws IOException*/public static String postCall(final String uri, String param) throws Exception {return postCall(uri, null, param, Constants.UTF8);}/*** 携带token的post请求** @param uri* @param param* @param token* @return* @throws Exception*/public static String postCallWithToken(final String uri, String param, String token) throws Exception {return postCall(uri, null, param, Constants.UTF8, token);}/*** 请求上游 POST提交** @param uri* @param contentType* @param param* @throws IOException*/public static String postCall(final String uri, String contentType, String param) throws Exception {return postCall(uri, contentType, param, Constants.UTF8);}/*** 请求上游 POST提交** @param uri* @param contentType* @param param* @param charsetName* @throws IOException*/public static String postCall(final String uri, String contentType, String param, String charsetName)throws Exception {return postCall(uri, contentType, param, charsetName, null);}/*** 请求上游 POST提交** @param uri* @param contentType* @param param* @param charsetName* @throws IOException*/public static String postCall(final String uri, String contentType, String param, String charsetName, String token)throws Exception {final String url = uri;final HttpPost httpPost = new HttpPost(url);httpPost.setConfig(requestConfig);if (!StringUtils.isEmpty(contentType)) {httpPost.addHeader("Content-Type", contentType);} else {httpPost.addHeader("Content-Type", "application/json");}// 设置tokenif (StrUtil.isNotEmpty(token)) {httpPost.addHeader("Access-Token", token);}// 添加参数StringEntity paramEntity = new StringEntity(param, charsetName);httpPost.setEntity(paramEntity);final CloseableHttpResponse httpRsp = getHttpClient().execute(httpPost);try {if (httpRsp.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {final HttpEntity entity = httpRsp.getEntity();final String rspText = EntityUtils.toString(entity, charsetName);EntityUtils.consume(entity);return rspText;} else {throw new IOException("HTTP StatusCode=" + httpRsp.getStatusLine().getStatusCode());}} finally {try {httpRsp.close();} catch (Exception e) {log.error("关闭httpRsp异常", e);}}}/*** 判断HTTP异常是否为读取超时。** @param e 异常对象。* @return 如果是读取引起的异常(而非连接),则返回true;否则返回false。*/public static boolean isReadTimeout(final Throwable e) {return (!isCausedBy(e, ConnectTimeoutException.class) && isCausedBy(e, SocketTimeoutException.class));}/*** 检测异常e被触发的原因是不是因为异常cause。检测被封装的异常。** @param e     捕获的异常。* @param cause 异常触发原因。* @return 如果异常e是由cause类异常触发,则返回true;否则返回false。*/public static boolean isCausedBy(final Throwable e, final Class<? extends Throwable> cause) {if (cause.isAssignableFrom(e.getClass())) {return true;} else {Throwable t = e.getCause();while (t != null && t != e) {if (cause.isAssignableFrom(t.getClass())) {return true;}t = t.getCause();}return false;}}
}
  1. 关闭httpclient连接池,在项目重启时释放资源
@Component
public class ShutdownManagerConfig {private static final Logger logger = LoggerFactory.getLogger(ShutdownManagerConfig.class);@PreDestroypublic void destroy(){// 关闭线程池HttpUtils.shutdown();logger.info("关闭http连接线程池");}
}

重试机制

http请求难免有些需要重试的, 这里引入spring-retry 解决

  1. 引入依赖
        <dependency><groupId>org.springframework.retry</groupId><artifactId>spring-retry</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency>
  1. 使用例子
    2.1 value 指定要触发重试的异常, 不是所有场景都需要重试的
    2.2 maxAttempts 指定重试的最大次数
    2.3 backoff 重试次略
    2.3.1 backoff.delay 首次延迟时间
    2.3.2 backoff.multiplier 之后的每次重试延迟时间乘multiplier
    2.3.3 backoff.random 延迟时间随机抖动,避免多个客户端同时重试
@Retryable(value = {RetryException.class}, maxAttempts = 5, backoff = @Backoff(delay = 1000, multiplier = 2, random = true))
    /*** 广告计划数据(实时)*  触发限频,重试策略*  1. 最大重试次数(包括第一次调用) 5次*  2. 重试间隔(ms) 1000ms*  3. multiplier 延迟乘数(下次延迟 = 当前延迟 * multiplier)*  4. random 是否在延迟时间上添加随机抖动 true* @param reqVO* @param token* @return*/@Override@Retryable(value = {RetryException.class}, maxAttempts = 5, backoff = @Backoff(delay = 1000, multiplier = 2, random = true))@Override@Retryable(value = {RetryException.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000, multiplier = 2, random = true))public SphApBaseRespVO<SphApiImageDailyReportsRespVO> getImageDailyReports(SphApiDailyReportsReqVO req) {SphApBaseRespVO<SphApiImageDailyReportsRespVO> apiResp = getApiResp(SphApiEnum.GET_DAILY_REPORTS, req, SphApiImageDailyReportsRespVO.class);apiResp.setData(JsonUtils.parseObject(JsonUtils.toJsonString(apiResp.getData()), SphApiImageDailyReportsRespVO.class));return apiResp;}

请求收敛

将对平台接口的请求封装到一个函数中,这么做有什么好处呢?

  1. 统一入口,降低复杂度
    1.1 简化调用:所有接口请求通过单一函数处理,调用方无需关心SDK的具体实现,只需关注业务参数。
    1.2 减少重复代码:避免在每个调用处重复初始化SDK、处理认证等逻辑。 如下图每次请求参数都统一设置一个随机数。
  2. 集中管理请求逻辑
    2.1 参数标准化:统一处理参数校验、默认值、格式转换(如时间戳、枚举值)。
    2.2 错误处理:集中捕获网络异常、快手API错误码,并转换为一致的错误格式(例如抛出特定异常或返回统一错误对象)。
    2.3 日志与监控:方便统一添加请求日志、性能监控(如耗时统计)和埋点。
    3 扩展性优化
    3.1 接口重试Retryable 可以用在这一层 (需要处理自调用导致Retryable失效的问题)
    3.2 各个接口的统计记数, 错误分析 都可以在这一层通过切面很方便的完成
    private <K extends SphApiBaseReqVO, T> SphApBaseRespVO<T> getApiResp(SphApiEnum apiEnum, K params, Class<T> dataType) {String resp = "";try {params.setNonce(UUID.randomUUID().toString());SphRateLimiterManager.getApiRateLimiter(apiEnum).acquire();resp = HttpUtils.getCallWithPojo(apiEnum.getUrl(), params);return getFormatApiResp(apiEnum, resp, dataType, params);} catch (ServiceException | RetryException e) {saveErrorResponse(apiEnum, params, resp, "");throw e;} catch (Exception e) {log.error("视频号api请求失败 {}接口 入参:{} 异常信息:{}", apiEnum.getDesc(), params, e.getMessage());saveErrorResponse(apiEnum, params, resp, "");throw new ServiceException(e.getMessage(), SPH_API_REQUEST_ERROR.getCode());}}

时间切片的动态分配

很多接口会限制单次分页请求可以抓到数据, 这也是常见的解决深度分页问题一个方案了。
那这时候 就需要在发生这种情况时,把时间参数切成更小的粒度然后再次请求。

下面是递归时间切片的demo

    private List<SphImageDayReportDO> downloadDayReport(List<LocalDateTime[]> requestTimeList, SphAuthAdvertiserInfoDO advertiserInfoDO,SphOauth2AccessTokenDO tokenDO,String authProgress, String advertiserProgress, DateIntervalEnum dateIntervalEnum) {List<SphImageDayReportDO> detailsList = new ArrayList<>();for (LocalDateTime[] timeRange : requestTimeList) {try {// 1.0 常规请求} catch (ServiceException e) {// 2.0 触发深度分页的场景if (!e.getMessage().contains("请降低查询数据范围")) {throw e;}// 2.1 递归切成更小的时间片detailsList.addAll(getRollbackDOList(timeRange, advertiserInfoDO, tokenDO,authProgress, advertiserProgress, dateIntervalEnum));}if (detailsList.size() >= 100) {// 插入本批次的数据}}return detailsList;}private List<SphImageDayReportDO> getRollbackDOList(LocalDateTime[] timeRange, SphAuthAdvertiserInfoDO advertiserInfoDO,SphOauth2AccessTokenDO tokenDO,String authProgress, String advertiserProgress, DateIntervalEnum dateIntervalEnum) {DateIntervalEnum oldDateIntervalEnum = dateIntervalEnum;// 1.0 换成更细粒度的切片骨子额dateIntervalEnum = getRollbackDateIntervalEnum(dateIntervalEnum);// 2.0 避免死循环,判断是否已经是最小粒度if (oldDateIntervalEnum.equals(dateIntervalEnum)) throw new ServiceException("已经进入了最小时间切片颗粒, 但是还是太多了");}// 新时间切片, 递归return downloadDayReport(LocalDateTimeUtils.getDateRangeList(timeRange[0], timeRange[1], dateIntervalEnum), advertiserInfoDO, tokenDO, authProgress, advertiserProgress, dateIntervalEnum);}protected DateIntervalEnum getRollbackDateIntervalEnum(DateIntervalEnum dateIntervalEnum) {// 分钟为最小维度if (DateIntervalEnum.MINUTE.equals(dateIntervalEnum)) {return DateIntervalEnum.MINUTE;}int rollbackLevel = dateIntervalEnum.getInterval() - 1;return DateIntervalEnum.valueOf(rollbackLevel);}

API调用大盘

这一项是非常非常重要,但是没排期做的。

告警

这一项是非常非常重要,但是没排期做的。
这个跟API大盘搭配起来才能不断地优化项目。

手动抓取接口

这个功能是非常常见的功能,如果出现漏单或者单据状态与平台不一致的情况,可以手动抓某个单据或者时间段,
以达成

更新数据

用来核验平台接口响应,我们入库数据,平台数据三方有什么异同的依据

重抓某个时段的数据

    public String downloadByManual(SphManualReqVO reqVO) {// 1.0 获取任务枚举类SphJobEnum jobEnum = SphJobEnum.of(reqVO.getJobName());// 2.0 加锁redisLock.tryLock(buildManualLockKey(jobEnum.getJobName()), 5, 1200, TimeUnit.SECONDS);try {// 3.0 验参validateManualParam(reqVO, jobEnum);// 4.0 执行手动任务getTaskService(jobEnum).downloadByManual(reqVO);} finally {// 5.0 释放锁redisLock.unlock(buildManualLockKey(jobEnum.getJobName()));}}

线程池提速

视频号有7300个广告主, 每个广告主执行1分钟 那么任务轮训一遍需要5天, 那这个效率自然是不能接受的, 这时候自然要引入线程池

定义业务线程池

从线程池的构造函数 可以看到

  1. corePoolSize 核心线数,池子中会一直持有的线程数
  2. maximumPoolSize 最大临时线程数
  3. keepAliveTime 临时线程在指定空闲时间之后会被释放
  4. unit 空闲时间单位
  5. workQueue 阻塞队列, 为避免oom一定要设置成有界阻塞队列
    threadFactory 线程工厂, 各个业务要有不同的名字,可以通过该工厂实现
    RejectedExecutionHandler 拒绝策略, 自定义拒绝策略 我这里只记录了日志,因为下面会展示通过编排任务 来规避触发拒绝策略

线程池优先使用核心线程池处理,核心线程池打满之后, 丢入

拒绝策略
  1. AbortPolicy(默认策略)

    • 直接抛出RejectedExecutionException异常
    • 适用于需要明确知道任务被拒绝的场景
  2. CallerRunsPolicy

    • 由提交任务的线程(调用者线程)直接执行该任务
    • 适用于不希望丢失任务,且可以接受任务执行变慢的场景
  3. DiscardPolicy

    • 直接静默丢弃被拒绝的任务,不做任何处理
    • 适用于允许丢失任务的场景
  4. DiscardOldestPolicy

    • 丢弃队列中最老的任务(队列头部的任务),然后尝试重新提交当前任务
    • 适用于允许丢弃老任务,保留新任务的场景
# 线程池的构造函数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) 
    /*** 创建视频号的线程池* @return*/@Bean("sphThreadPoolExecutor")public ThreadPoolExecutor createSphThreadPoolExecutor() {return new ThreadPoolExecutor(sphThreadPoolConfig.getCoreSize(),sphThreadPoolConfig.getMaxSize(),sphThreadPoolConfig.getKeepAliveSeconds(),TimeUnit.SECONDS,new LinkedBlockingQueue<>(sphThreadPoolConfig.getQueueCapacity()),createThreadFactory(sphThreadPoolConfig.getName()),createRejectedHandler(sphThreadPoolConfig.getName()));}/*** 创建拒绝策略** @param threadName* @return*/private RejectedExecutionHandler createRejectedHandler(String threadName) {return (r, executor) -> {log.error("触发{}线程池的拒绝策略 线程池: {}, 活跃线程数: {}, 队列大小: {} 最大线程池数:{}", threadName,executor.toString(), executor.getActiveCount(), executor.getQueue().size(), executor.getMaximumPoolSize());throw new RejectedExecutionException("触发" + threadName + "线程池的拒绝策略");};}/*** 创建工厂** @param name* @return*/private ThreadFactory createThreadFactory(String name) {return new ThreadFactory() {private final AtomicInteger threadNumber = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, name + threadNumber.getAndIncrement());}};}

不丢弃任务的处理方案

规避触发拒绝策略

从线程池工作原理可以知道, 只要把统一时间的任务总数控制在最大核心线程数 + 阻塞队列长度之内,就可以规避触发拒绝策略, 下面是个demo

  1. 给各个任务分配同一时间的最大线程数

视频号连接池配置如下, 最大线程数800, 队列长度800, 核心线程400, 那么只要控制同一时刻丢入线程池的任务总数少于1600个, 那么就不会触发拒绝策略。 下面的枚举类分配了各个任务同一时刻丢入队列最大数, 累计1050个线程。满负荷运行时堆不满有界队列, 所以线程池只会有400个工作线程,也不会触发拒绝。

platform-pool:sph:name: "视频号线程池"coreSize: 400maxSize: 800queueCapacity: 800keepAliveSeconds: 60  
@Getter
@AllArgsConstructor
public enum SphJobEnum {/*** 服务列表*/SPH_ADGROUP_DAY_REPORT("sphDownloadAdgroupDayReportService", "视频号 - 拉取广告报表数据(天级别)", 100),SPH_ADGROUP_HOUR_REPORT("sphDownloadAdgroupHourlyReportService", "视频号 - 拉取广告报表数据(小时级别)", 300),SPH_REFRESH_TOKEN("sphRefreshTokenService", "视频号 - 刷新token", 1),SPH_DOWNLOAD_ADGROUP("sphDownloadAdgroupService", "视频号 - 下载广告组", 50),SPH_DOWNLOAD_VIDEO("sphDownloadVideoService", "视频号 - 下载视频素材", 100),SPH_DOWNLOAD_PIC("sphDownloadPicService", "视频号 - 下载图片素材", 100),SPH_DOWNLOAD_WECHAT_AUTHORIZATION("sphDownloadWechatChannelsAuthorizationService", "视频号 - 下载获取授权记录列表", 100),SPH_DOWNLOAD_CAMPAIGN("sphDownloadCampaignsService", "视频号 - 下载广告计划(即将下架)", 100),SPH_DOWNLOAD_VIDEO_DAY_REPORT("sphDownloadVideoDayReportService", "视频号 - 下载视频日报", 100),SPH_DOWNLOAD_IMAGE_DAY_REPORT("sphDownloadImageDayReportService", "视频号 - 下载图片日报", 100),;private final String jobName;private final String desc;/*** 使用线程池处理, 设置占用的线程数, 避免线程池耗尽* 线程池的设置是: 400个核心线程, 800个队列长度, 600个最大线程数.因为采用的是阻塞等待执行完的方式,所以可以设置的最大长度是为 1200个,即填满队列** 300 + 300 + 400 + 50 = 1050线程, 现成核心线程数400个** */private final Integer dispatchThreadCount;public static SphJobEnum of(String serviceName) {for (SphJobEnum jobEnum : values()) {if (jobEnum.getJobName().equals(serviceName)) {return jobEnum;}}return null;}
}
  1. 编排任务,阻塞执行

场景: 各个任务已经分配了线程资源数,那么只需要

  1. 通过任务编排,每次丢入各个任务的最大任务数, 调用者线程阻塞,等待任务完成 CompletableFuture.join
  2. 再次丢入线程池自己的最大任务数, 循环直到任务全部完成
    private void loopDownloadJob(List<SphOauth2AccessTokenDO> validAuthList, String requestId, SphJobEnum jobEnum) {// 1.0 CollectionUtils.partition按照数量分批for (List<SphAuthAdvertiserInfoDO> partList : CollectionUtils.partition(advertiserList, jobEnum.getDispatchThreadCount())) {dispatchTask(partList, tokenDO, requestId, jobEnum, oauthCount, oauthCurrentCounter, taskTotalCounter, taskCurrentCounter);}}private void dispatchTask(List<SphAuthAdvertiserInfoDO> advertiserList, SphOauth2AccessTokenDO tokenDO,String requestId, SphJobEnum jobEnum, AtomicInteger oauthCount, AtomicInteger oauthCurrentCounter,AtomicInteger taskTotalCounter, AtomicInteger taskCurrentCounter) {List<CompletableFuture<String>> featureList = new ArrayList<>();for (SphAuthAdvertiserInfoDO advertiserInfoDO : advertiserList) {CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {downloadJob(advertiserInfoDO, tokenDO,authProcess,advertiserProcess,requestId, jobEnum);return authProcess + advertiserProcess + "执行完毕";}, sphThreadPoolExecutor).exceptionally((ex -> {return "执行失败:" + ex.getCause().getMessage();}));featureList.add(feature);}CompletableFuture.allOf(featureList.toArray(new CompletableFuture[0])).join();}
触发拒绝策略之后的处理
  1. 自定义拒绝策略, 触发拒绝的任务序列化之后入库

  2. 自定义线程池:继承ThreadPoolExecutor, 重写afterExecute方法(钩子方法 它会在线程池中的某个任务执行完成(无论成功或异常)后自动触发), 从数据库中读取数据,反序列化之后,放入有界队列中

  3. demo
    3.1 维护自定义拒绝策略,线程池

public class ReloadableThreadPoolExecutor extends ThreadPoolExecutor {public ReloadableThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,TaskMapper taskMapper) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);this.taskMapper = taskMapper;this.setRejectedExecutionHandler(new DatabaseRejectedHandler(taskMapper));}@Overrideprotected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);//  从数据库读取持久化的数据, 并加入队列List<TaskEntity> pendingTasks = taskMapper.selectPendingTasks();pendingTasks.forEach(task -> {Runnable runnable = deserializeTask(task.getTaskData());if (super.getQueue().offer(runnable)) {taskMapper.deleteById(task.getId());}});}private static class DatabaseRejectedHandler implements RejectedExecutionHandler {private final TaskMapper taskMapper;public DatabaseRejectedHandler(TaskMapper taskMapper) {this.taskMapper = taskMapper;}@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (!executor.isShutdown()) {TaskEntity task = new TaskEntity();task.setTaskData(r.toString());taskMapper.insert(task);}}}
}

3.2 注册线程池

@Configuration
public class ThreadPoolConfig {@Beanpublic ReloadableThreadPoolExecutor taskExecutor(TaskMapper taskMapper) {return new ReloadableThreadPoolExecutor(5,  // 核心线程数10, // 最大线程数60, // 空闲线程存活时间TimeUnit.SECONDS,new LinkedBlockingQueue<>(5), // 故意设置小队列测试拒绝策略taskMapper);}
}

分布式锁

  1. 引入 redisson

规避任务重复执行, 否则对资源的需求是无上限的。为了进入这个问题引入了redisson; 用法如下

在这里插入图片描述

  1. 优雅关锁

长耗时任务在遇到发版的时候,如果没有销毁分布式锁,会导致无法进行下一波次的执行。这显然是不合理的,下面引入DisposableBean接口解决

@Slf4j
@Component
public class DisposableBeanConfig implements DisposableBean {@Autowiredprotected RedisLock redisLock;@Overridepublic void destroy() throws Exception {List<String> lockPrefixList = Arrays.asList("ks_download_task_", "sph_download_task_", "ks_manual_download_task_", "sph_manual_download_task_");for (String prefix : lockPrefixList) {redisLock.unlockByPrefix(prefix);log.info("jvm销毁, 释放" + prefix + "为前缀的分布式锁");}}
}

平台接口限频处理

开放平台会对api进行限频, 这里引入了guava, 使用com.google.common.util.concurrent.RateLimiter解决限频问题。下面是一个demo

  1. 定义各个接口的每分钟令牌数
@Getter
@AllArgsConstructor
public enum SphApiEnum {/*** api枚举* */GET_TOKEN("https://api.e.qq.com//", "获取access_token", "https://developers.e.qq.com/v3.0/docs/api//token", 1000d),GET_ADVERTISER_LIST("https://api.e.qq.com/v3.0//get", "查询腾讯广告广告主信息", "https://developers.e.qq.com/v3.0/docs/api//get", 1000d),GET_DAILY_REPORTS("https://api.e.qq.com/v3.0//get", "查询腾讯广告日报表数据", "https://developers.e.qq.com/v3.0/docs/api//get", 1000d),GET_HOURLY_REPORTS("https://api.e.qq.com/v3.0//get", "查询腾讯广告小时报表数据", "https://developers.e.qq.com/v3.0/docs/api//get#fdub1", 1000d),GET_AD_GROUPS("https://api.e.qq.com/v3.0//get", "获取广告组", "https://developers.e.qq.com/v3.0/docs/api//get", 1000d),GET_VIDEOS("https://api.e.qq.com/v3.0//get", "获取视频文件", "https://developers.e.qq.com/v3.0/docs/api//get", 1000d),GET_PIC("https://api.e.qq.com/v3.0//get", "获取图片信息", "https://developers.e.qq.com/v3.0/docs/api//get", 1000d),GET_WECHAT_CHANNELS_AUTHORIZATION_LIST("https://api.e.qq.com/v3.0//get", "获取视频号授权记录列表", "https://developers.e.qq.com/v3.0/docs/api//get", 1000d),GET_CAMPAIGNS("https://api.e.qq.com/v1.3//get", "获取推广计划(即将下线)", "https://developers.e.qq.com/docs/api//campaigns/campaigns_get?version=1.3&_preview=1#kew0v", 1000d),;private final String url;private final String desc;/*** 接口文档的url* */private final String docUrl;/*** 每分钟的限频次数, 其实视频号每个接口每分钟默认1000次调用, 每天的限频次数:1440000* */private final double qpm;
}
  1. 定义限频管理器
public class RateLimiterManager {/*** 存储限流器*/private static final ConcurrentMap<String, RateLimiter> API_RATE_LIMITER = new ConcurrentHashMap<>();/*** 查询当前APi的RateLimiter* @param apiEnum* @return*/public static RateLimiter getApiRateLimiter(SphApiEnum apiEnum) {return API_RATE_LIMITER.computeIfAbsent(apiEnum.name().intern(), key -> RateLimiter.create(apiEnum.getQpm()/60, 3, TimeUnit.SECONDS));}
}
  1. 使用demo
    private <K, T> SphApBaseRespVO<T> getApiResp(SphApiEnum apiEnum, K params, Class<T> dataType) {String resp = "";try {// 获取令牌RateLimiterManager.getApiRateLimiter(apiEnum).acquire();resp = HttpUtils.getCallWithPojo(apiEnum.getUrl(), params);return getFormatApiResp(apiEnum, resp, dataType, params);} catch (ServiceException | RetryException e) {saveErrorResponse(apiEnum, params, resp, "");throw e;} catch (Exception e) {log.error("视频号api请求失败 {}接口 入参:{} 异常信息:{}", apiEnum.getDesc(), params, e.getMessage());saveErrorResponse(apiEnum, params, resp, "");throw new ServiceException(e.getMessage(), SPH_API_REQUEST_ERROR.getCode());}}

待扩展点

曾供职于某个erp公司, 在任职期间主要负责跟电商平台进行订单,商品,库存,物流,wms进行数据交付, 那么现在基于之前的经验分析下 还可以做哪些升级

todo

http://www.lryc.cn/news/571189.html

相关文章:

  • 前后端拦截器+MDC实现纯数字 traceId 全链路日志追踪(axios + Spring Boot 超详细实战)
  • DeepSeek 大型 MoE 模型大规模部署压测学习
  • FlinkCDC-Hudi数据实时入湖原理篇
  • JVM监控的挑战:Applications Manager如何提供帮助
  • Spring Boot集成Kafka全攻略:从基础配置到高级实践
  • 多模态大语言模型演进:从视觉理解到具身智能的技术突破
  • Linux运维新人自用笔记(部署 ​​LAMP:Linux + Apache + MySQL + PHP、部署discuz论坛)
  • 5.安装IK分词器
  • ELK在Java的使用
  • Selenium(选择元素,浏览器/元素操作,等待,页面交互)
  • Windows Python 环境管理终极对比:极简方案 VS 传统方案(仅需 2 个软件实现全流程自动化)
  • Selenium(多窗口,frame,验证码,截图,PO模式)
  • rockx读取单张图片并检测图片内人脸的矩形
  • vite的常用配置
  • 「动态规划::数位DP」统计数字递推 / LeetCode 3352|1012(C++)
  • 线程池(Thread Pool)详解
  • 基于Cesium移动的天空云
  • 【Docker基础】Docker核心概念:命名空间(Namespace)之IPC详解
  • 根据Python模块的完整路径import动态导入
  • 05_MinIO+Java SpringBoot 实现透传代理下载
  • 如何确定驱动480x320分辨率的显示屏所需的MCU主频
  • 为何前馈3DGS的边界总是“一碰就碎”?PM-Loss用“3D几何先验”来解
  • Mac 安装JD-GUI
  • 低轨导航 | 低轨卫星导航PNT模型,原理,公式,matlab代码
  • 软件工程:流程图如何画?
  • Python 爬虫入门 Day 5 - 使用 XPath 进行网页解析(lxml + XPath)
  • springboot使用kafka
  • Jmeter的三种参数化方式详解
  • web前端开发核心基础:Html结构分析,head,body,不同标签的作用
  • Java内存模型与线程