【如何实现分布式压测中间件】
分布式压测中间件的原理及其实现
- 原理
- 全链路追踪框架(Trace)
- MQ中间件
- 数据库
- 分布式缓存中间件(Redis)
- 分库分表中间件
原理
通过大量阅读中间件源码,开源社区调研,得到设计原理:
(1)发起压测链路http请求
(2)通过分布式追踪框架获取URL上影子标识,将其放入上下文Context中
(3)提供者应用发起PRC/MQ调用时,中间件会将测试标放入中间件的Context上下文中传递。
(4)消费者处理RPC/MQ消息,获取中间件Context上下文。
(5)经过分库分表/缓存数据库中间件,获取当前Context里的影子标识。
打成Maven包,在项目中直接引入
- 可插拔,业务代码不感知。
- 支持复杂SQL处理,支持全链路测试,且支持全链路追踪。
- 极大提高压测工作效率。
全链路追踪框架(Trace)
从HTTP请求链接上识别到特定的key,如:
URL添加压测标识,test = true,将压测标识添加到追踪链路框架中的Context上下文中。
MQ中间件
例如RocketMQ: com.alibaba.rocketmq.client.hook.SendMessageHook
实现接口SendMessageHook进行日志追踪链路埋点, 分布式链路组件SOFA
Trace也是基于这个接口去埋点,这是mq官方留给实现者的AOP。
public class MetaQSendMessageHookImpl implements SendMessageHook, MetaQTraceConstants {public MetaQSendMessageHookImpl() {}public String hookName() {return "EagleEyeSendMessageHook";}public void sendMessageBefore(SendMessageContext context) {if (context != null && context.getMessage() != null && MetaQTraceLogUtils.isTraceLogOn(context.getProducerGroup())) {MetaQTraceContext mqTraceContext = new MetaQTraceContext();context.setMqTraceContext(mqTraceContext);mqTraceContext.setMetaQType(MetaQType.METAQ);mqTraceContext.setGroup(context.getProducerGroup());mqTraceContext.setAsync(CommunicationMode.ASYNC.equals(context.getCommunicationMode()));Message msg = context.getMessage();if (msg != null) {MetaQTraceBean traceBean = new MetaQTraceBean();traceBean.setTopic(msg.getTopic());traceBean.setOriginMsgId(MessageAccessor.getOriginMessageId(msg));traceBean.setTags(msg.getTags());traceBean.setKeys(msg.getKeys());traceBean.setBuyerId(msg.getBuyerId());traceBean.setTransferFlag(MessageAccessor.getTransferFlag(msg));traceBean.setCorrectionFlag(MessageAccessor.getCorrectionFlag(msg));traceBean.setBodyLength(msg.getBody().length);traceBean.setBornHost(context.getBornHost());traceBean.setStoreHost(context.getBrokerAddr());traceBean.setBrokerName(context.getMq().getBrokerName());traceBean.setProps(context.getProps());traceBean.setMsgType(context.getMsgType());List<MetaQTraceBean> beans = new ArrayList();beans.add(traceBean);mqTraceContext.setTraceBeans(beans);if (StringUtils.isNotBlank(msg.getUserProperty("eagleTraceId"))) {traceBean.setTraceId(msg.getUserProperty("eagleTraceId"));traceBean.setRpcId(msg.getUserProperty("eagleRpcId"));traceBean.setEagleEyeUserData(msg.getUserProperty("eagleData"));}MetaQSendMessageTraceLog.sendMessageBefore(mqTraceContext);if (StringUtils.isBlank(msg.getProperty("eagleTraceId")) && StringUtils.isNotBlank(traceBean.getTraceId())) {msg.putUserProperty("eagleTraceId", traceBean.getTraceId());msg.putUserProperty("eagleRpcId", traceBean.getRpcId());msg.putUserProperty("eagleData", traceBean.getEagleEyeUserData());}}}}public void sendMessageAfter(SendMessageContext context) {if (context != null && context.getMessage() != null && context.getSendResult() != null && MetaQTraceLogUtils.isTraceLogOn(context.getProducerGroup())) {MetaQTraceContext mqTraceContext = (MetaQTraceContext)context.getMqTraceContext();mqTraceContext.setRegionId(context.getSendResult().getRegionId());MetaQTraceBean traceBean = (MetaQTraceBean)mqTraceContext.getTraceBeans().get(0);if (traceBean != null && context.getSendResult() != null) {traceBean.setQueueId(context.getMq().getQueueId());traceBean.setMsgId(context.getSendResult().getOffsetMsgId());traceBean.setOriginMsgId(context.getSendResult().getMsgId());traceBean.setOffset(context.getSendResult().getQueueOffset());mqTraceContext.setSuccess(true);mqTraceContext.setStatus(context.getSendResult().getSendStatus().toString());} else if (context.getException() != null) {String msg = context.getException().getMessage();mqTraceContext.setErrorMsg(StringUtils.substring(msg, 0, msg.indexOf("\n")));}MetaQSendMessageTraceLog.sendMessageAfter(mqTraceContext);}}
}
数据库
参考数据库Druid链接池方案:
https://github.com/alibaba/druid/wiki/SQL-Parser
import com.alibaba.druid.filter.FilterAdapter;
import com.alibaba.druid.filter.FilterChain;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.proxy.jdbc.DataSourceProxy;
import com.alibaba.druid.proxy.jdbc.StatementProxy;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.util.JdbcConstants;
import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
import java.util.List;/*** 拦截druid数据链接池* @author doge* @date 2021/10/19*/
@Slf4j
public class DruidShadowTestFilter extends FilterAdapter {private DruidShadowTestVisitor visitor = new DruidShadowTestVisitor();@Overridepublic boolean statement_execute(FilterChain chain, StatementProxy statement, String sql) throws SQLException {try {List<SQLStatement> sqlStatements = SQLUtils.parseStatements(sql, JdbcConstants.MYSQL);sqlStatements.forEach(sqlStatement -> sqlStatement.accept(visitor));if (visitor.getRewriteStatus()) {// 改写了SQL,需要替换String newSql = SQLUtils.toSQLString(sqlStatements,JdbcConstants.MYSQL);log.debug("rewrite sql, origin sql: [{}], new sql: [{}]", sql, newSql);return super.statement_execute(chain, statement, newSql);}return super.statement_execute(chain, statement, sql);} finally {visitor.removeRewriteStatus();}}@Overridepublic void init(DataSourceProxy dataSourceProxy){if (!(dataSourceProxy instanceof DruidDataSource)) {log.error("ConfigLoader only support DruidDataSource");}DruidDataSource dataSource = (DruidDataSource) dataSourceProxy;log.info("db configuration: url="+ dataSource.getUrl());}}
import com.alibaba.druid.sql.ast.statement.SQLExprTableSource;
import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlASTVisitorAdapter;
import com.alibaba.ewtp.test.utils.ShadowTestUtil;
import org.apache.commons.lang3.StringUtils;import java.util.Optional;/*** @author doge* @date 2021/10/20*/
public class DruidShadowTestVisitor extends MySqlASTVisitorAdapter {private static final ThreadLocal<Boolean> REWRITE_STATUS_CACHE = new ThreadLocal<>();@Overridepublic boolean visit(SQLExprTableSource sqlExprTableSource) {// 别名,如果有别名,别名保持不变String alias = StringUtils.isEmpty(sqlExprTableSource.getAlias()) ? sqlExprTableSource.getExpr().toString() : sqlExprTableSource.getAlias();// 修改表名,不包含点才加 select c.id,d.name from c left join d on c.id = d.id 中的c 和 dif(!sqlExprTableSource.getExpr().toString().contains(".")) {sqlExprTableSource.setExpr(ShadowTestUtil.PREFIX + sqlExprTableSource.getExpr());}sqlExprTableSource.setAlias(alias);REWRITE_STATUS_CACHE.set(true);return true;}/*** 返回重写状态* @return 重写状态,{@code true}表示已重写,{@code false}表示未重写*/public boolean getRewriteStatus() {// get reset rewrite statusreturn Optional.ofNullable(REWRITE_STATUS_CACHE.get()).orElse(Boolean.FALSE);}/*** 重置重写状态*/public void removeRewriteStatus() {REWRITE_STATUS_CACHE.remove();}
}
分布式缓存中间件(Redis)
可以参考SofaTrace做法
https://www.sofastack.tech/blog/sofa-channel-15-retrospect/
- 新增一个Redis的后置增强器(部分代码)
- 实现redis的连接工厂(部分代码)
- 实现redis的连接器(会在所有redis key前加上前缀 test )
import com.alibaba.ewtp.test.factory.TracingRedisConnectionFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
```java/*** @author doge* @date 2021/10/14* redis 后置增强处理*/
public class TracingRedisBeanPostProcessor implements BeanPostProcessor {public TracingRedisBeanPostProcessor(){}@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof RedisConnectionFactory) {bean = new TracingRedisConnectionFactory((RedisConnectionFactory) bean);}return bean;}
}import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.*;/*** @author doge* @date 2021/10/14*/
public class TracingRedisConnectionFactory implements RedisConnectionFactory {private final RedisConnectionFactory delegate;public TracingRedisConnectionFactory(RedisConnectionFactory delegate) {this.delegate = delegate;}@Overridepublic RedisConnection getConnection() {// support cluster connectionRedisConnection connection = this.delegate.getConnection();return new TracingRedisConnection(connection);}@Overridepublic RedisClusterConnection getClusterConnection() {return delegate.getClusterConnection();}@Overridepublic boolean getConvertPipelineAndTxResults() {return delegate.getConvertPipelineAndTxResults();}@Overridepublic RedisSentinelConnection getSentinelConnection() {return delegate.getSentinelConnection();}@Overridepublic DataAccessException translateExceptionIfPossible(RuntimeException e) {return delegate.translateExceptionIfPossible(e);}
}import org.springframework.dao.DataAccessException;
import org.springframework.data.geo.Distance;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.geo.Circle;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Metric;
import org.springframework.data.geo.Point;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.core.types.RedisClientInfo;import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;/*** @author doge* @date 2021/10/14*/
public class TracingRedisConnection implements RedisConnection {private final RedisConnection connection;public TracingRedisConnection(RedisConnection connection) {this.connection = connection;}@Overridepublic Boolean expire(byte[] key, long seconds) {handleByte(key);return connection.expire(key, seconds);}@Overridepublic Boolean set(byte[] key, byte[] value) {handleByte(key);return connection.set(key, value);}@Overridepublic Boolean mSet(Map<byte[], byte[]> tuple) {handleByteMap(tuple);return connection.mSet(tuple);}public void handleByte(byte[] key){if (ShadowTestUtil.isShadowTesLink()){key = (ShadowTestUtil.prefix + new String(key)).getBytes();}}public void handleBytes(byte[]... keys){if (ShadowTestUtil.isShadowTesLink()){for (byte[] bytes : keys){handleByte(bytes);}}}public void handleByteMap(Map<byte[], byte[]> tuple){if (ShadowTestUtil.isShadowTesLink()){for (Map.Entry<byte[], byte[]> entry : tuple.entrySet()){handleByte(entry.getKey());}}}}
分库分表中间件
开源框架的解决方案:
https://shardingsphere.apache.org/document/current/cn/features/shadow
方案&思路: 当获取压测标后,若开启影子链路,将打开Sharding影子库的开关,串通起整个分库分表链路。当然也可以直接用数据库连接池来解决。