从电商平台下载数据的项目经验分享 (part 1)
文章目录
- 背景
- 本公司的设计
- 业务调研
- 资源调研
- 架构设计
- 定时任务设计
- httpclient连接池
- 重试机制
- 请求收敛
- 时间切片的动态分配
- API调用大盘
- 告警
- 手动抓取接口
- 线程池提速
- 定义业务线程池
- 拒绝策略
- 不丢弃任务的处理方案
- 规避触发拒绝策略
- 触发拒绝策略之后的处理
- 分布式锁
- 平台接口限频处理
- 待扩展点
- todo
背景
我们在抖音,快手,视频等平台运营者很多账号, 各个平台也提供了后台服务供观察投流效果。 但是每个平台都需要登录很麻烦,公司也想收集投流的数据,整合一套投流的智能系统,提高roi收益。所以最近经手了从快手,视频号定时下载数据的工作。 另外曾供职某erp公司的平台数据部门,所以这里总结下定时任务下载任务要怎么设计。
本公司的设计
业务调研
公司在抖音,快手,视频号,小红书,天猫,京东,拼多多,得物,百度平台投流,这些平台在各自的平台提供了api。所以整体的模式是
step1 --> 请求平台的接口,拿到投流数据
step2 --> 落到mysql数据库
step3 --> 数仓读数据, 然后定期清理mysql数据(mysql只有500G 不清理的情况下很容易被业务数据灌满)
资源调研
公司客观限制
- 前期只能提供一台8c 16G 200G的服务器。资源明确不足时 才可以申请新资源
- 业务没有上云, 缺少k8s的基础
架构设计
标准的springcloud alibaba架构, 在单实例无法应该业务的时候, 部署多台platform实例, 预留扩展空间
platform维护定时任务的具体实现
system 系统服务
auth 鉴权相关
job 定时任务
定时任务设计
- 定时触发
job依赖quartz完成定时任务的触发
对比项 | Quartz | XXL-Job |
---|---|---|
分布式调度 | ❌ 原生不支持 | ✅ 支持简单分布式调度 |
UI 管理 | ❌ 无官方 UI,需集成 | ✅ 提供 Web 控制台 |
动态任务配置 | ❌ 靠代码配置 | ✅ 控制台配置 |
失败重试 | ❌ 需自定义 | ✅ 内建支持 |
定时精度 | ✅ 毫秒级 | ⚠️ 秒级为主 |
易用性 | ❌ 较复杂 | ✅ 简单易用 |
执行隔离 | ✅ 自定义线程池 | ⚠️ 需手动配置隔离,默认不隔离 |
社区支持 | ✅ 社区较活跃 | ⚠️ 活跃度一般 |
- job服务通过feign调用platform服务
微服务内部调用不通过gateway, 在服务注册与发现中心查找ServiceNameConstants.PLATFORM_SERVICE服务列表,使用负载均衡器选择一个可用实例, 然后发起http请求
@FeignClient(contextId = "remotePlatformService",value = ServiceNameConstants.PLATFORM_SERVICE,fallbackFactory = RemotePlatformFallbackFactory.class)
public interface RemotePlatformService {
}
- 工厂,模版模式实现主流程
3.1 模版实现公共逻辑
3.1.1 将任务单元准备好
public interface DownloadTaskBaseService {/*** 下载任务的入口*/void downloadJob(SphJobEnum jobEnum, String requestId);/*** 手动下载任务* @param reqVO*/void downloadByManual(SphManualReqVO reqVO);
}@Overridepublic void downloadJob(SphJobEnum jobEnum, String requestId) {// 0.0 这是长耗时任务, 加锁1小时try {// 1.0 查询可用的授权列表// 2.0 轮训各个授权的对应的广告主的增量广告计划loopDownloadJob()} finally {// 3.0 解锁,统计耗时}}private void loopDownloadJob(List<SphOauth2AccessTokenDO> validAuthList, String requestId, SphJobEnum jobEnum) {// 1.0 乱序广告主,让广告主的执行更公平// 2.0 按照配置对广告主分组,为线程池提供数据支撑}private void dispatchTask(List<SphAuthAdvertiserInfoDO> advertiserList, SphOauth2AccessTokenDO tokenDO,String requestId, SphJobEnum jobEnum, AtomicInteger oauthCount, AtomicInteger oauthCurrentCounter,AtomicInteger taskTotalCounter, AtomicInteger taskCurrentCounter) {// 线程池加速下载, 编排任务 规避线程池拒绝List<CompletableFuture<String>> featureList = new ArrayList<>();for (SphAuthAdvertiserInfoDO advertiserInfoDO : advertiserList) {CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {downloadJob(advertiserInfoDO, tokenDO,authProcess,advertiserProcess,requestId, jobEnum);return authProcess + advertiserProcess + "执行完毕";}, sphThreadPoolExecutor).exceptionally((ex -> {return "执行失败:" + ex.getCause().getMessage();}));featureList.add(feature);}CompletableFuture.allOf(featureList.toArray(new CompletableFuture[0])).join();}
3.1.2 子任务完成数据下载以及数据入库
public abstract class SphDownloadTaskBaseServiceImpl implements SphDownloadTaskBaseService {/*** 需要具体的子任务实现* 1. 查询查询结果* 2. 批量入库* @param lastRequestTimeDO* @param endTime* @param advertiserInfoDO* @param tokenDO*/protected abstract void downloadJob(SysDictData lastRequestTimeDO, LocalDateTime endTime, SphAuthAdvertiserInfoDO advertiserInfoDO, SphOauth2AccessTokenDO tokenDO, String authProgress, String advertiserProgress);/*** 需要具体的子任务实现* 查询的上次查询时间* @return*/protected abstract SysDictData getLastRequestTime(SphAuthAdvertiserInfoDO advertiserInfoDO);/*** 子任务实现** 保存请求时间* @param lastRequestTimeDO* @param endTime* @param advertiserInfoDO*/protected abstract void saveLastRequestTime(SysDictData lastRequestTimeDO, LocalDateTime endTime, SphAuthAdvertiserInfoDO advertiserInfoDO);
}
3.1.3 工厂模式调用各个子任务
platform服务对外job服务暴漏下载功能
/*** 查询服务* @param jobEnum* @return*/private SphDownloadTaskBaseService getTaskService(SphJobEnum jobEnum) {return SpringUtils.getBean(jobEnum.getJobName());}private void downloadJob(SphJobEnum jobEnum, String requestId) {switch (jobEnum) {case SPH_ADGROUP_DAY_REPORT:case SPH_ADGROUP_HOUR_REPORT:case SPH_REFRESH_TOKEN:case SPH_DOWNLOAD_ADGROUP:case SPH_DOWNLOAD_VIDEO:case SPH_DOWNLOAD_PIC:case SPH_DOWNLOAD_WECHAT_AUTHORIZATION:case SPH_DOWNLOAD_CAMPAIGN:case SPH_DOWNLOAD_VIDEO_DAY_REPORT:case SPH_DOWNLOAD_IMAGE_DAY_REPORT:getTaskService(jobEnum).downloadJob(jobEnum, requestId);break;default: {log.error("不支持该该类型的定时任务 ServiceName:{} 任务描述:{}", jobEnum.getJobName(), jobEnum.getDesc());throw new ServiceException("不支持该该类型的定时任务");}}}
httpclient连接池
从业务模式可以看出需要对平台发起大量的请求, 所以很有必要引入httpclient连接池来管理连接
- 引入httpclient依赖
<dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId></dependency>
- 维护工具类 - 请求工具类 HttpUtils
2.1 HttpUtils.getCallWithPojo 请求入参的GET请求
2.2 HttpUtils.postCallWithToken POST请求
public class HttpUtils {private static final Logger log = LoggerFactory.getLogger(HttpUtils.class);public static RequestConfig requestConfig;private static CloseableHttpClient httpClient;private static PoolingHttpClientConnectionManager connMgr;private static IdleConnectionMonitorThread idleThread;static {HttpUtils.initClient();}/*** 向指定 URL 发送GET方法的请求** @param url 发送请求的 URL* @return 所代表远程资源的响应结果*/public static String sendGet(String url) {return sendGet(url, StringUtils.EMPTY);}/*** 向指定 URL 发送GET方法的请求** @param url 发送请求的 URL* @param param 请求参数,请求参数应该是 name1=value1&name2=value2 的形式。* @return 所代表远程资源的响应结果*/public static String sendGet(String url, String param) {return sendGet(url, param, Constants.UTF8);}/*** 向指定 URL 发送GET方法的请求** @param url 发送请求的 URL* @param param 请求参数,请求参数应该是 name1=value1&name2=value2 的形式。* @param contentType 编码类型* @return 所代表远程资源的响应结果*/public static String sendGet(String url, String param, String contentType) {StringBuilder result = new StringBuilder();BufferedReader in = null;try {String urlNameString = StringUtils.isNotBlank(param) ? url + "?" + param : url;log.info("sendGet - {}", urlNameString);URL realUrl = new URL(urlNameString);URLConnection connection = realUrl.openConnection();connection.setRequestProperty("accept", "*/*");connection.setRequestProperty("connection", "Keep-Alive");connection.setRequestProperty("user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64)");connection.connect();in = new BufferedReader(new InputStreamReader(connection.getInputStream(), contentType));String line;while ((line = in.readLine()) != null) {result.append(line);}log.info("recv - {}", result);} catch (ConnectException e) {log.error("调用HttpUtils.sendGet ConnectException, url=" + url + ",param=" + param, e);} catch (SocketTimeoutException e) {log.error("调用HttpUtils.sendGet SocketTimeoutException, url=" + url + ",param=" + param, e);} catch (IOException e) {log.error("调用HttpUtils.sendGet IOException, url=" + url + ",param=" + param, e);} catch (Exception e) {log.error("调用HttpsUtil.sendGet Exception, url=" + url + ",param=" + param, e);} finally {try {if (in != null) {in.close();}} catch (Exception ex) {log.error("调用in.close Exception, url=" + url + ",param=" + param, ex);}}return result.toString();}/*** 向指定 URL 发送POST方法的请求** @param url 发送请求的 URL* @param param 请求参数,请求参数应该是 name1=value1&name2=value2 的形式。* @return 所代表远程资源的响应结果*/public static String sendPost(String url, String param) {PrintWriter out = null;BufferedReader in = null;StringBuilder result = new StringBuilder();try {log.info("sendPost - {}", url);URL realUrl = new URL(url);URLConnection conn = realUrl.openConnection();conn.setRequestProperty("accept", "*/*");conn.setRequestProperty("connection", "Keep-Alive");conn.setRequestProperty("user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64)");conn.setRequestProperty("Accept-Charset", "utf-8");conn.setRequestProperty("contentType", "utf-8");conn.setDoOutput(true);conn.setDoInput(true);out = new PrintWriter(conn.getOutputStream());out.print(param);out.flush();in = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8));String line;while ((line = in.readLine()) != null) {result.append(line);}log.info("recv - {}", result);} catch (ConnectException e) {log.error("调用HttpUtils.sendPost ConnectException, url=" + url + ",param=" + param, e);} catch (SocketTimeoutException e) {log.error("调用HttpUtils.sendPost SocketTimeoutException, url=" + url + ",param=" + param, e);} catch (IOException e) {log.error("调用HttpUtils.sendPost IOException, url=" + url + ",param=" + param, e);} catch (Exception e) {log.error("调用HttpsUtil.sendPost Exception, url=" + url + ",param=" + param, e);} finally {try {if (out != null) {out.close();}if (in != null) {in.close();}} catch (IOException ex) {log.error("调用in.close Exception, url=" + url + ",param=" + param, ex);}}return result.toString();}public static String sendSSLPost(String url, String param) {StringBuilder result = new StringBuilder();String urlNameString = url + "?" + param;try {log.info("sendSSLPost - {}", urlNameString);SSLContext sc = SSLContext.getInstance("SSL");sc.init(null, new TrustManager[]{new TrustAnyTrustManager()}, new java.security.SecureRandom());URL console = new URL(urlNameString);HttpsURLConnection conn = (HttpsURLConnection) console.openConnection();conn.setRequestProperty("accept", "*/*");conn.setRequestProperty("connection", "Keep-Alive");conn.setRequestProperty("user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64)");conn.setRequestProperty("Accept-Charset", "utf-8");conn.setRequestProperty("contentType", "utf-8");conn.setDoOutput(true);conn.setDoInput(true);conn.setSSLSocketFactory(sc.getSocketFactory());conn.setHostnameVerifier(new TrustAnyHostnameVerifier());conn.connect();InputStream is = conn.getInputStream();BufferedReader br = new BufferedReader(new InputStreamReader(is));String ret = "";while ((ret = br.readLine()) != null) {if (ret != null && !ret.trim().equals("")) {result.append(new String(ret.getBytes(StandardCharsets.ISO_8859_1), StandardCharsets.UTF_8));}}log.info("recv - {}", result);conn.disconnect();br.close();} catch (ConnectException e) {log.error("调用HttpUtils.sendSSLPost ConnectException, url=" + url + ",param=" + param, e);} catch (SocketTimeoutException e) {log.error("调用HttpUtils.sendSSLPost SocketTimeoutException, url=" + url + ",param=" + param, e);} catch (IOException e) {log.error("调用HttpUtils.sendSSLPost IOException, url=" + url + ",param=" + param, e);} catch (Exception e) {log.error("调用HttpsUtil.sendSSLPost Exception, url=" + url + ",param=" + param, e);}return result.toString();}private static class TrustAnyTrustManager implements X509TrustManager {@Overridepublic void checkClientTrusted(X509Certificate[] chain, String authType) {}@Overridepublic void checkServerTrusted(X509Certificate[] chain, String authType) {}@Overridepublic X509Certificate[] getAcceptedIssuers() {return new X509Certificate[]{};}}private static class TrustAnyHostnameVerifier implements HostnameVerifier {@Overridepublic boolean verify(String hostname, SSLSession session) {return true;}}/*** 获取httpClient** @return*/public static CloseableHttpClient getHttpClient() {if (httpClient != null) {return httpClient;} else {return HttpClients.createDefault();}}/*** 创建连接池管理器** @return*/private static PoolingHttpClientConnectionManager createConnectionManager() {PoolingHttpClientConnectionManager connMgr = new PoolingHttpClientConnectionManager();// 将最大连接数增加到connMgr.setMaxTotal(HttpConf.MAX_TOTAL_CONN);// 将每个路由基础的连接增加到connMgr.setDefaultMaxPerRoute(HttpConf.MAX_ROUTE_CONN);return connMgr;}/*** 根据当前配置创建HTTP请求配置参数。** @return 返回HTTP请求配置。*/private static RequestConfig createRequestConfig() {Builder builder = RequestConfig.custom();builder.setConnectionRequestTimeout(StringUtils.nvl(HttpConf.WAIT_TIMEOUT, 10000));builder.setConnectTimeout(StringUtils.nvl(HttpConf.CONNECT_TIMEOUT, 10000));builder.setSocketTimeout(StringUtils.nvl(HttpConf.SO_TIMEOUT, 10000));return builder.build();}/*** 创建默认的HTTPS客户端,信任所有的证书。** @return 返回HTTPS客户端,如果创建失败,返回HTTP客户端。*/private static CloseableHttpClient createHttpClient(HttpClientConnectionManager connMgr) {try {final SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {@Overridepublic boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {// 信任所有return true;}}).build();final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext);// 重试机制HttpRequestRetryHandler retryHandler = new DefaultHttpRequestRetryHandler(HttpConf.RETRY_COUNT, true);ConnectionKeepAliveStrategy connectionKeepAliveStrategy = new ConnectionKeepAliveStrategy() {@Overridepublic long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) {return HttpConf.KEEP_ALIVE_TIMEOUT; // tomcat默认keepAliveTimeout为20s}};httpClient = HttpClients.custom().setSSLSocketFactory(sslsf).setConnectionManager(connMgr).setDefaultRequestConfig(requestConfig).setRetryHandler(retryHandler).setKeepAliveStrategy(connectionKeepAliveStrategy).build();} catch (Exception e) {log.error("Create http client failed", e);httpClient = HttpClients.createDefault();}return httpClient;}/*** 初始化 只需调用一次*/public synchronized static CloseableHttpClient initClient() {if (httpClient == null) {connMgr = createConnectionManager();requestConfig = createRequestConfig();// 初始化httpClient连接池httpClient = createHttpClient(connMgr);// 清理连接池idleThread = new IdleConnectionMonitorThread(connMgr);idleThread.start();}return httpClient;}/*** 关闭HTTP客户端。** @param*/public synchronized static void shutdown() {try {if (idleThread != null) {idleThread.shutdown();idleThread = null;}} catch (Exception e) {log.error("httpclient connection manager close", e);}try {if (httpClient != null) {httpClient.close();httpClient = null;}} catch (IOException e) {log.error("httpclient close", e);}}/*** 请求上游 GET提交** @param uri* @throws IOException*/public static String getCall(final String uri) throws Exception {return getCall(uri, null, Constants.UTF8);}/*** 请求上游 GET提交** @param uri* @param contentType* @throws IOException*/public static String getCall(final String uri, String contentType) throws Exception {return getCall(uri, contentType, Constants.UTF8);}/*** 携带入参的GET请求* @param baseUri 请求基础URL* @param pojo 请求参数对象* @param <K> 参数类型* @return 响应字符串(JSON格式)* @throws IOException 当HTTP请求或响应处理失败时抛出* @throws JSONException 当响应不是有效JSON时抛出*/public static <K> String getCallWithPojo(final String baseUri, K pojo) throws IOException, JSONException {// 参数校验if (baseUri == null || baseUri.trim().isEmpty()) {throw new IllegalArgumentException("Base URI cannot be null or empty");}// 构建URIUriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(baseUri);List<Field> fields = getAllFields(pojo.getClass());// 缓存反射结果以提高性能for (Field field : fields) {field.setAccessible(true);try {Object value = field.get(pojo);if (value != null) {String paramName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE,field.getName());// 处理不同类型参数if (value instanceof String){// URL编码普通参数值builder.queryParam(paramName, URLEncoder.encode(value.toString(), StandardCharsets.UTF_8.name()));} else {// 对JSON字符串进行URL编码String jsonValue = JsonUtils.toJsonString(value);builder.queryParam(paramName, URLEncoder.encode(jsonValue, StandardCharsets.UTF_8.name()));}}} catch (IllegalAccessException | UnsupportedEncodingException e) {log.warn("Failed to process field {}: {}", field.getName(), e.getMessage());}}String finalUrl = builder.build().toUriString();HttpGet httpGet = new HttpGet(finalUrl);httpGet.setConfig(requestConfig);// 使用try-with-resources确保资源释放try (CloseableHttpResponse httpRsp = getHttpClient().execute(httpGet)) {int statusCode = httpRsp.getStatusLine().getStatusCode();// 只处理成功或特定错误状态if (statusCode == HttpStatus.SC_OK || statusCode == HttpStatus.SC_FORBIDDEN) {HttpEntity entity = httpRsp.getEntity();try {String rspText = EntityUtils.toString(entity, StandardCharsets.UTF_8);// 提取跟踪IDHeader traceIdHeader = httpRsp.getFirstHeader("X-Tsa-Trace-Id");String traceId = traceIdHeader != null ? traceIdHeader.getValue() : null;// 构造响应JSONJSONObject responseJson = new JSONObject(rspText);if (traceId != null) {responseJson.putOpt("request_id", traceId);}return responseJson.toString();} finally {EntityUtils.consumeQuietly(entity);}} else {throw new IOException("HTTP request failed with status code: " + statusCode +", URL: " + finalUrl);}} catch (JSONException e) {throw new JSONException("Invalid JSON response from: " + finalUrl, e);} catch (Exception e) {throw new IOException("HTTP request failed for URL: " + finalUrl, e);}}/*** 递归获取类所有字段(包括父类)* @param type 目标类* @return 字段列表*/private static List<Field> getAllFields(Class<?> type) {List<Field> fields = new ArrayList<>();Class<?> currentClass = type;// 使用循环替代递归提高性能while (currentClass != null && currentClass != Object.class) {fields.addAll(Arrays.asList(currentClass.getDeclaredFields()));currentClass = currentClass.getSuperclass();}return fields;}/*** 请求上游 GET提交** @param uri* @param contentType* @param charsetName* @throws IOException*/public static String getCall(final String uri, String contentType, String charsetName) throws Exception {final String url = uri;final HttpGet httpGet = new HttpGet(url);httpGet.setConfig(requestConfig);if (!StringUtils.isEmpty(contentType)) {httpGet.addHeader("Content-Type", contentType);}final CloseableHttpResponse httpRsp = getHttpClient().execute(httpGet);try {if (httpRsp.getStatusLine().getStatusCode() == HttpStatus.SC_OK|| httpRsp.getStatusLine().getStatusCode() == HttpStatus.SC_FORBIDDEN) {final HttpEntity entity = httpRsp.getEntity();final String rspText = EntityUtils.toString(entity, charsetName);// 提取 X-Tsa-Trace-IdHeader traceIdHeader = httpRsp.getFirstHeader("X-Tsa-Trace-Id");String traceId = traceIdHeader != null ? traceIdHeader.getValue() : null;EntityUtils.consume(entity);// 构造返回的 JSON(包含响应体和 traceId)JSONObject responseJson = new JSONObject(rspText); // 解析原始JSONresponseJson.putOpt("request_id", traceId); // 直接添加到顶层return responseJson.toString(); // 返回修改后的JSON} else {throw new IOException("HTTP StatusCode=" + httpRsp.getStatusLine().getStatusCode());}} finally {try {httpRsp.close();} catch (Exception e) {log.error("关闭httpRsp异常", e);}}}/*** 请求上游 POST提交** @param uri* @param paramsMap* @throws IOException*/public static String postCall(final String uri, Map<String, Object> paramsMap) throws Exception {return postCall(uri, null, paramsMap, Constants.UTF8);}/*** 请求上游 POST提交** @param uri* @param contentType* @param paramsMap* @throws IOException*/public static String postCall(final String uri, String contentType, Map<String, Object> paramsMap) throws Exception {return postCall(uri, contentType, paramsMap, Constants.UTF8);}/*** 请求上游 POST提交** @param uri* @param contentType* @param paramsMap* @param charsetName* @throws IOException*/public static String postCall(final String uri, String contentType, Map<String, Object> paramsMap,String charsetName) throws Exception {final String url = uri;final HttpPost httpPost = new HttpPost(url);httpPost.setConfig(requestConfig);if (!StringUtils.isEmpty(contentType)) {httpPost.addHeader("Content-Type", contentType);}// 添加参数List<NameValuePair> list = new ArrayList<NameValuePair>();if (paramsMap != null) {for (Map.Entry<String, Object> entry : paramsMap.entrySet()) {list.add(new BasicNameValuePair(entry.getKey(), (String) entry.getValue()));}}httpPost.setEntity(new UrlEncodedFormEntity(list, charsetName));final CloseableHttpResponse httpRsp = getHttpClient().execute(httpPost);try {if (httpRsp.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {final HttpEntity entity = httpRsp.getEntity();final String rspText = EntityUtils.toString(entity, charsetName);EntityUtils.consume(entity);return rspText;} else {throw new IOException("HTTP StatusCode=" + httpRsp.getStatusLine().getStatusCode());}} finally {try {httpRsp.close();} catch (Exception e) {log.error("关闭httpRsp异常", e);}}}/*** 请求上游 POST提交** @param uri* @param param* @throws IOException*/public static String postCall(final String uri, String param) throws Exception {return postCall(uri, null, param, Constants.UTF8);}/*** 携带token的post请求** @param uri* @param param* @param token* @return* @throws Exception*/public static String postCallWithToken(final String uri, String param, String token) throws Exception {return postCall(uri, null, param, Constants.UTF8, token);}/*** 请求上游 POST提交** @param uri* @param contentType* @param param* @throws IOException*/public static String postCall(final String uri, String contentType, String param) throws Exception {return postCall(uri, contentType, param, Constants.UTF8);}/*** 请求上游 POST提交** @param uri* @param contentType* @param param* @param charsetName* @throws IOException*/public static String postCall(final String uri, String contentType, String param, String charsetName)throws Exception {return postCall(uri, contentType, param, charsetName, null);}/*** 请求上游 POST提交** @param uri* @param contentType* @param param* @param charsetName* @throws IOException*/public static String postCall(final String uri, String contentType, String param, String charsetName, String token)throws Exception {final String url = uri;final HttpPost httpPost = new HttpPost(url);httpPost.setConfig(requestConfig);if (!StringUtils.isEmpty(contentType)) {httpPost.addHeader("Content-Type", contentType);} else {httpPost.addHeader("Content-Type", "application/json");}// 设置tokenif (StrUtil.isNotEmpty(token)) {httpPost.addHeader("Access-Token", token);}// 添加参数StringEntity paramEntity = new StringEntity(param, charsetName);httpPost.setEntity(paramEntity);final CloseableHttpResponse httpRsp = getHttpClient().execute(httpPost);try {if (httpRsp.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {final HttpEntity entity = httpRsp.getEntity();final String rspText = EntityUtils.toString(entity, charsetName);EntityUtils.consume(entity);return rspText;} else {throw new IOException("HTTP StatusCode=" + httpRsp.getStatusLine().getStatusCode());}} finally {try {httpRsp.close();} catch (Exception e) {log.error("关闭httpRsp异常", e);}}}/*** 判断HTTP异常是否为读取超时。** @param e 异常对象。* @return 如果是读取引起的异常(而非连接),则返回true;否则返回false。*/public static boolean isReadTimeout(final Throwable e) {return (!isCausedBy(e, ConnectTimeoutException.class) && isCausedBy(e, SocketTimeoutException.class));}/*** 检测异常e被触发的原因是不是因为异常cause。检测被封装的异常。** @param e 捕获的异常。* @param cause 异常触发原因。* @return 如果异常e是由cause类异常触发,则返回true;否则返回false。*/public static boolean isCausedBy(final Throwable e, final Class<? extends Throwable> cause) {if (cause.isAssignableFrom(e.getClass())) {return true;} else {Throwable t = e.getCause();while (t != null && t != e) {if (cause.isAssignableFrom(t.getClass())) {return true;}t = t.getCause();}return false;}}
}
- 关闭httpclient连接池,在项目重启时释放资源
@Component
public class ShutdownManagerConfig {private static final Logger logger = LoggerFactory.getLogger(ShutdownManagerConfig.class);@PreDestroypublic void destroy(){// 关闭线程池HttpUtils.shutdown();logger.info("关闭http连接线程池");}
}
重试机制
http请求难免有些需要重试的, 这里引入spring-retry 解决
- 引入依赖
<dependency><groupId>org.springframework.retry</groupId><artifactId>spring-retry</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency>
- 使用例子
2.1 value 指定要触发重试的异常, 不是所有场景都需要重试的
2.2 maxAttempts 指定重试的最大次数
2.3 backoff 重试次略
2.3.1 backoff.delay 首次延迟时间
2.3.2 backoff.multiplier 之后的每次重试延迟时间乘multiplier
2.3.3 backoff.random 延迟时间随机抖动,避免多个客户端同时重试
@Retryable(value = {RetryException.class}, maxAttempts = 5, backoff = @Backoff(delay = 1000, multiplier = 2, random = true))
/*** 广告计划数据(实时)* 触发限频,重试策略* 1. 最大重试次数(包括第一次调用) 5次* 2. 重试间隔(ms) 1000ms* 3. multiplier 延迟乘数(下次延迟 = 当前延迟 * multiplier)* 4. random 是否在延迟时间上添加随机抖动 true* @param reqVO* @param token* @return*/@Override@Retryable(value = {RetryException.class}, maxAttempts = 5, backoff = @Backoff(delay = 1000, multiplier = 2, random = true))@Override@Retryable(value = {RetryException.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000, multiplier = 2, random = true))public SphApBaseRespVO<SphApiImageDailyReportsRespVO> getImageDailyReports(SphApiDailyReportsReqVO req) {SphApBaseRespVO<SphApiImageDailyReportsRespVO> apiResp = getApiResp(SphApiEnum.GET_DAILY_REPORTS, req, SphApiImageDailyReportsRespVO.class);apiResp.setData(JsonUtils.parseObject(JsonUtils.toJsonString(apiResp.getData()), SphApiImageDailyReportsRespVO.class));return apiResp;}
请求收敛
将对平台接口的请求封装到一个函数中,这么做有什么好处呢?
- 统一入口,降低复杂度
1.1 简化调用:所有接口请求通过单一函数处理,调用方无需关心SDK的具体实现,只需关注业务参数。
1.2 减少重复代码:避免在每个调用处重复初始化SDK、处理认证等逻辑。 如下图每次请求参数都统一设置一个随机数。 - 集中管理请求逻辑
2.1 参数标准化:统一处理参数校验、默认值、格式转换(如时间戳、枚举值)。
2.2 错误处理:集中捕获网络异常、快手API错误码,并转换为一致的错误格式(例如抛出特定异常或返回统一错误对象)。
2.3 日志与监控:方便统一添加请求日志、性能监控(如耗时统计)和埋点。
3 扩展性优化
3.1 接口重试Retryable 可以用在这一层 (需要处理自调用导致Retryable失效的问题)
3.2 各个接口的统计记数, 错误分析 都可以在这一层通过切面很方便的完成
private <K extends SphApiBaseReqVO, T> SphApBaseRespVO<T> getApiResp(SphApiEnum apiEnum, K params, Class<T> dataType) {String resp = "";try {params.setNonce(UUID.randomUUID().toString());SphRateLimiterManager.getApiRateLimiter(apiEnum).acquire();resp = HttpUtils.getCallWithPojo(apiEnum.getUrl(), params);return getFormatApiResp(apiEnum, resp, dataType, params);} catch (ServiceException | RetryException e) {saveErrorResponse(apiEnum, params, resp, "");throw e;} catch (Exception e) {log.error("视频号api请求失败 {}接口 入参:{} 异常信息:{}", apiEnum.getDesc(), params, e.getMessage());saveErrorResponse(apiEnum, params, resp, "");throw new ServiceException(e.getMessage(), SPH_API_REQUEST_ERROR.getCode());}}
时间切片的动态分配
很多接口会限制单次分页请求可以抓到数据, 这也是常见的解决深度分页问题一个方案了。
那这时候 就需要在发生这种情况时,把时间参数切成更小的粒度然后再次请求。
下面是递归时间切片的demo
private List<SphImageDayReportDO> downloadDayReport(List<LocalDateTime[]> requestTimeList, SphAuthAdvertiserInfoDO advertiserInfoDO,SphOauth2AccessTokenDO tokenDO,String authProgress, String advertiserProgress, DateIntervalEnum dateIntervalEnum) {List<SphImageDayReportDO> detailsList = new ArrayList<>();for (LocalDateTime[] timeRange : requestTimeList) {try {// 1.0 常规请求} catch (ServiceException e) {// 2.0 触发深度分页的场景if (!e.getMessage().contains("请降低查询数据范围")) {throw e;}// 2.1 递归切成更小的时间片detailsList.addAll(getRollbackDOList(timeRange, advertiserInfoDO, tokenDO,authProgress, advertiserProgress, dateIntervalEnum));}if (detailsList.size() >= 100) {// 插入本批次的数据}}return detailsList;}private List<SphImageDayReportDO> getRollbackDOList(LocalDateTime[] timeRange, SphAuthAdvertiserInfoDO advertiserInfoDO,SphOauth2AccessTokenDO tokenDO,String authProgress, String advertiserProgress, DateIntervalEnum dateIntervalEnum) {DateIntervalEnum oldDateIntervalEnum = dateIntervalEnum;// 1.0 换成更细粒度的切片骨子额dateIntervalEnum = getRollbackDateIntervalEnum(dateIntervalEnum);// 2.0 避免死循环,判断是否已经是最小粒度if (oldDateIntervalEnum.equals(dateIntervalEnum)) throw new ServiceException("已经进入了最小时间切片颗粒, 但是还是太多了");}// 新时间切片, 递归return downloadDayReport(LocalDateTimeUtils.getDateRangeList(timeRange[0], timeRange[1], dateIntervalEnum), advertiserInfoDO, tokenDO, authProgress, advertiserProgress, dateIntervalEnum);}protected DateIntervalEnum getRollbackDateIntervalEnum(DateIntervalEnum dateIntervalEnum) {// 分钟为最小维度if (DateIntervalEnum.MINUTE.equals(dateIntervalEnum)) {return DateIntervalEnum.MINUTE;}int rollbackLevel = dateIntervalEnum.getInterval() - 1;return DateIntervalEnum.valueOf(rollbackLevel);}
API调用大盘
这一项是非常非常重要,但是没排期做的。
告警
这一项是非常非常重要,但是没排期做的。
这个跟API大盘搭配起来才能不断地优化项目。
手动抓取接口
这个功能是非常常见的功能,如果出现漏单或者单据状态与平台不一致的情况,可以手动抓某个单据或者时间段,
以达成更新数据
用来核验平台接口响应,我们入库数据,平台数据三方有什么异同的依据
重抓某个时段的数据
public String downloadByManual(SphManualReqVO reqVO) {// 1.0 获取任务枚举类SphJobEnum jobEnum = SphJobEnum.of(reqVO.getJobName());// 2.0 加锁redisLock.tryLock(buildManualLockKey(jobEnum.getJobName()), 5, 1200, TimeUnit.SECONDS);try {// 3.0 验参validateManualParam(reqVO, jobEnum);// 4.0 执行手动任务getTaskService(jobEnum).downloadByManual(reqVO);} finally {// 5.0 释放锁redisLock.unlock(buildManualLockKey(jobEnum.getJobName()));}}
线程池提速
视频号有7300个广告主, 每个广告主执行1分钟 那么任务轮训一遍需要5天, 那这个效率自然是不能接受的, 这时候自然要引入线程池
定义业务线程池
从线程池的构造函数 可以看到
- corePoolSize 核心线数,池子中会一直持有的线程数
- maximumPoolSize 最大临时线程数
- keepAliveTime 临时线程在指定空闲时间之后会被释放
- unit 空闲时间单位
- workQueue 阻塞队列, 为避免oom一定要设置成有界阻塞队列
threadFactory 线程工厂, 各个业务要有不同的名字,可以通过该工厂实现
RejectedExecutionHandler 拒绝策略, 自定义拒绝策略 我这里只记录了日志,因为下面会展示通过编排任务 来规避触发拒绝策略
线程池优先使用核心线程池处理,核心线程池打满之后, 丢入
拒绝策略
-
AbortPolicy(默认策略)
- 直接抛出RejectedExecutionException异常
- 适用于需要明确知道任务被拒绝的场景
-
CallerRunsPolicy
- 由提交任务的线程(调用者线程)直接执行该任务
- 适用于不希望丢失任务,且可以接受任务执行变慢的场景
-
DiscardPolicy
- 直接静默丢弃被拒绝的任务,不做任何处理
- 适用于允许丢失任务的场景
-
DiscardOldestPolicy
- 丢弃队列中最老的任务(队列头部的任务),然后尝试重新提交当前任务
- 适用于允许丢弃老任务,保留新任务的场景
# 线程池的构造函数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
/*** 创建视频号的线程池* @return*/@Bean("sphThreadPoolExecutor")public ThreadPoolExecutor createSphThreadPoolExecutor() {return new ThreadPoolExecutor(sphThreadPoolConfig.getCoreSize(),sphThreadPoolConfig.getMaxSize(),sphThreadPoolConfig.getKeepAliveSeconds(),TimeUnit.SECONDS,new LinkedBlockingQueue<>(sphThreadPoolConfig.getQueueCapacity()),createThreadFactory(sphThreadPoolConfig.getName()),createRejectedHandler(sphThreadPoolConfig.getName()));}/*** 创建拒绝策略** @param threadName* @return*/private RejectedExecutionHandler createRejectedHandler(String threadName) {return (r, executor) -> {log.error("触发{}线程池的拒绝策略 线程池: {}, 活跃线程数: {}, 队列大小: {} 最大线程池数:{}", threadName,executor.toString(), executor.getActiveCount(), executor.getQueue().size(), executor.getMaximumPoolSize());throw new RejectedExecutionException("触发" + threadName + "线程池的拒绝策略");};}/*** 创建工厂** @param name* @return*/private ThreadFactory createThreadFactory(String name) {return new ThreadFactory() {private final AtomicInteger threadNumber = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, name + threadNumber.getAndIncrement());}};}
不丢弃任务的处理方案
规避触发拒绝策略
从线程池工作原理可以知道, 只要把统一时间的任务总数控制在最大核心线程数 + 阻塞队列长度之内,就可以规避触发拒绝策略, 下面是个demo
- 给各个任务分配同一时间的最大线程数
视频号连接池配置如下, 最大线程数800, 队列长度800, 核心线程400, 那么只要控制同一时刻丢入线程池的任务总数少于1600个, 那么就不会触发拒绝策略。 下面的枚举类分配了各个任务同一时刻丢入队列最大数, 累计1050个线程。满负荷运行时堆不满有界队列, 所以线程池只会有400个工作线程,也不会触发拒绝。
platform-pool:sph:name: "视频号线程池"coreSize: 400maxSize: 800queueCapacity: 800keepAliveSeconds: 60
@Getter
@AllArgsConstructor
public enum SphJobEnum {/*** 服务列表*/SPH_ADGROUP_DAY_REPORT("sphDownloadAdgroupDayReportService", "视频号 - 拉取广告报表数据(天级别)", 100),SPH_ADGROUP_HOUR_REPORT("sphDownloadAdgroupHourlyReportService", "视频号 - 拉取广告报表数据(小时级别)", 300),SPH_REFRESH_TOKEN("sphRefreshTokenService", "视频号 - 刷新token", 1),SPH_DOWNLOAD_ADGROUP("sphDownloadAdgroupService", "视频号 - 下载广告组", 50),SPH_DOWNLOAD_VIDEO("sphDownloadVideoService", "视频号 - 下载视频素材", 100),SPH_DOWNLOAD_PIC("sphDownloadPicService", "视频号 - 下载图片素材", 100),SPH_DOWNLOAD_WECHAT_AUTHORIZATION("sphDownloadWechatChannelsAuthorizationService", "视频号 - 下载获取授权记录列表", 100),SPH_DOWNLOAD_CAMPAIGN("sphDownloadCampaignsService", "视频号 - 下载广告计划(即将下架)", 100),SPH_DOWNLOAD_VIDEO_DAY_REPORT("sphDownloadVideoDayReportService", "视频号 - 下载视频日报", 100),SPH_DOWNLOAD_IMAGE_DAY_REPORT("sphDownloadImageDayReportService", "视频号 - 下载图片日报", 100),;private final String jobName;private final String desc;/*** 使用线程池处理, 设置占用的线程数, 避免线程池耗尽* 线程池的设置是: 400个核心线程, 800个队列长度, 600个最大线程数.因为采用的是阻塞等待执行完的方式,所以可以设置的最大长度是为 1200个,即填满队列** 300 + 300 + 400 + 50 = 1050线程, 现成核心线程数400个** */private final Integer dispatchThreadCount;public static SphJobEnum of(String serviceName) {for (SphJobEnum jobEnum : values()) {if (jobEnum.getJobName().equals(serviceName)) {return jobEnum;}}return null;}
}
- 编排任务,阻塞执行
场景: 各个任务已经分配了线程资源数,那么只需要
- 通过任务编排,每次丢入各个任务的最大任务数, 调用者线程阻塞,等待任务完成 CompletableFuture.join
- 再次丢入线程池自己的最大任务数, 循环直到任务全部完成
private void loopDownloadJob(List<SphOauth2AccessTokenDO> validAuthList, String requestId, SphJobEnum jobEnum) {// 1.0 CollectionUtils.partition按照数量分批for (List<SphAuthAdvertiserInfoDO> partList : CollectionUtils.partition(advertiserList, jobEnum.getDispatchThreadCount())) {dispatchTask(partList, tokenDO, requestId, jobEnum, oauthCount, oauthCurrentCounter, taskTotalCounter, taskCurrentCounter);}}private void dispatchTask(List<SphAuthAdvertiserInfoDO> advertiserList, SphOauth2AccessTokenDO tokenDO,String requestId, SphJobEnum jobEnum, AtomicInteger oauthCount, AtomicInteger oauthCurrentCounter,AtomicInteger taskTotalCounter, AtomicInteger taskCurrentCounter) {List<CompletableFuture<String>> featureList = new ArrayList<>();for (SphAuthAdvertiserInfoDO advertiserInfoDO : advertiserList) {CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {downloadJob(advertiserInfoDO, tokenDO,authProcess,advertiserProcess,requestId, jobEnum);return authProcess + advertiserProcess + "执行完毕";}, sphThreadPoolExecutor).exceptionally((ex -> {return "执行失败:" + ex.getCause().getMessage();}));featureList.add(feature);}CompletableFuture.allOf(featureList.toArray(new CompletableFuture[0])).join();}
触发拒绝策略之后的处理
-
自定义拒绝策略, 触发拒绝的任务序列化之后入库
-
自定义线程池:继承ThreadPoolExecutor, 重写afterExecute方法(钩子方法 它会在线程池中的某个任务执行完成(无论成功或异常)后自动触发), 从数据库中读取数据,反序列化之后,放入有界队列中
-
demo
3.1 维护自定义拒绝策略,线程池
public class ReloadableThreadPoolExecutor extends ThreadPoolExecutor {public ReloadableThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,TaskMapper taskMapper) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);this.taskMapper = taskMapper;this.setRejectedExecutionHandler(new DatabaseRejectedHandler(taskMapper));}@Overrideprotected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);// 从数据库读取持久化的数据, 并加入队列List<TaskEntity> pendingTasks = taskMapper.selectPendingTasks();pendingTasks.forEach(task -> {Runnable runnable = deserializeTask(task.getTaskData());if (super.getQueue().offer(runnable)) {taskMapper.deleteById(task.getId());}});}private static class DatabaseRejectedHandler implements RejectedExecutionHandler {private final TaskMapper taskMapper;public DatabaseRejectedHandler(TaskMapper taskMapper) {this.taskMapper = taskMapper;}@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (!executor.isShutdown()) {TaskEntity task = new TaskEntity();task.setTaskData(r.toString());taskMapper.insert(task);}}}
}
3.2 注册线程池
@Configuration
public class ThreadPoolConfig {@Beanpublic ReloadableThreadPoolExecutor taskExecutor(TaskMapper taskMapper) {return new ReloadableThreadPoolExecutor(5, // 核心线程数10, // 最大线程数60, // 空闲线程存活时间TimeUnit.SECONDS,new LinkedBlockingQueue<>(5), // 故意设置小队列测试拒绝策略taskMapper);}
}
分布式锁
- 引入 redisson
规避任务重复执行, 否则对资源的需求是无上限的。为了进入这个问题引入了redisson; 用法如下
- 优雅关锁
长耗时任务在遇到发版的时候,如果没有销毁分布式锁,会导致无法进行下一波次的执行。这显然是不合理的,下面引入DisposableBean接口解决
@Slf4j
@Component
public class DisposableBeanConfig implements DisposableBean {@Autowiredprotected RedisLock redisLock;@Overridepublic void destroy() throws Exception {List<String> lockPrefixList = Arrays.asList("ks_download_task_", "sph_download_task_", "ks_manual_download_task_", "sph_manual_download_task_");for (String prefix : lockPrefixList) {redisLock.unlockByPrefix(prefix);log.info("jvm销毁, 释放" + prefix + "为前缀的分布式锁");}}
}
平台接口限频处理
开放平台会对api进行限频, 这里引入了guava, 使用com.google.common.util.concurrent.RateLimiter解决限频问题。下面是一个demo
- 定义各个接口的每分钟令牌数
@Getter
@AllArgsConstructor
public enum SphApiEnum {/*** api枚举* */GET_TOKEN("https://api.e.qq.com//", "获取access_token", "https://developers.e.qq.com/v3.0/docs/api//token", 1000d),GET_ADVERTISER_LIST("https://api.e.qq.com/v3.0//get", "查询腾讯广告广告主信息", "https://developers.e.qq.com/v3.0/docs/api//get", 1000d),GET_DAILY_REPORTS("https://api.e.qq.com/v3.0//get", "查询腾讯广告日报表数据", "https://developers.e.qq.com/v3.0/docs/api//get", 1000d),GET_HOURLY_REPORTS("https://api.e.qq.com/v3.0//get", "查询腾讯广告小时报表数据", "https://developers.e.qq.com/v3.0/docs/api//get#fdub1", 1000d),GET_AD_GROUPS("https://api.e.qq.com/v3.0//get", "获取广告组", "https://developers.e.qq.com/v3.0/docs/api//get", 1000d),GET_VIDEOS("https://api.e.qq.com/v3.0//get", "获取视频文件", "https://developers.e.qq.com/v3.0/docs/api//get", 1000d),GET_PIC("https://api.e.qq.com/v3.0//get", "获取图片信息", "https://developers.e.qq.com/v3.0/docs/api//get", 1000d),GET_WECHAT_CHANNELS_AUTHORIZATION_LIST("https://api.e.qq.com/v3.0//get", "获取视频号授权记录列表", "https://developers.e.qq.com/v3.0/docs/api//get", 1000d),GET_CAMPAIGNS("https://api.e.qq.com/v1.3//get", "获取推广计划(即将下线)", "https://developers.e.qq.com/docs/api//campaigns/campaigns_get?version=1.3&_preview=1#kew0v", 1000d),;private final String url;private final String desc;/*** 接口文档的url* */private final String docUrl;/*** 每分钟的限频次数, 其实视频号每个接口每分钟默认1000次调用, 每天的限频次数:1440000* */private final double qpm;
}
- 定义限频管理器
public class RateLimiterManager {/*** 存储限流器*/private static final ConcurrentMap<String, RateLimiter> API_RATE_LIMITER = new ConcurrentHashMap<>();/*** 查询当前APi的RateLimiter* @param apiEnum* @return*/public static RateLimiter getApiRateLimiter(SphApiEnum apiEnum) {return API_RATE_LIMITER.computeIfAbsent(apiEnum.name().intern(), key -> RateLimiter.create(apiEnum.getQpm()/60, 3, TimeUnit.SECONDS));}
}
- 使用demo
private <K, T> SphApBaseRespVO<T> getApiResp(SphApiEnum apiEnum, K params, Class<T> dataType) {String resp = "";try {// 获取令牌RateLimiterManager.getApiRateLimiter(apiEnum).acquire();resp = HttpUtils.getCallWithPojo(apiEnum.getUrl(), params);return getFormatApiResp(apiEnum, resp, dataType, params);} catch (ServiceException | RetryException e) {saveErrorResponse(apiEnum, params, resp, "");throw e;} catch (Exception e) {log.error("视频号api请求失败 {}接口 入参:{} 异常信息:{}", apiEnum.getDesc(), params, e.getMessage());saveErrorResponse(apiEnum, params, resp, "");throw new ServiceException(e.getMessage(), SPH_API_REQUEST_ERROR.getCode());}}
待扩展点
曾供职于某个erp公司, 在任职期间主要负责跟电商平台进行订单,商品,库存,物流,wms进行数据交付, 那么现在基于之前的经验分析下 还可以做哪些升级