ThingsBoard源码解析-数据订阅与规则链数据处理
前言
结合本篇对规则链的执行过程进行探讨
根据之前对MQTT
源码的学习,我们由消息的处理入手
//org.thingsboard.server.transport.mqtt.MqttTransportHandlervoid processRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) {switch (msg.fixedHeader().messageType()) {case PUBLISH:processPublish(ctx, (MqttPublishMessage) msg);break;case SUBSCRIBE:processSubscribe(ctx, (MqttSubscribeMessage) msg);break;case UNSUBSCRIBE:processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);break;case PINGREQ:if (checkConnected(ctx, msg)) {ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));transportService.reportActivity(deviceSessionCtx.getSessionInfo());}break;case DISCONNECT:ctx.close();break;case PUBACK:int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId();TransportProtos.ToDeviceRpcRequestMsg rpcRequest = rpcAwaitingAck.remove(msgId);if (rpcRequest != null) {transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);}break;default:break;}}
接着看对发布消息的处理
//org.thingsboard.server.transport.mqtt.MqttTransportHandlerprivate void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {if (!checkConnected(ctx, mqttMsg)) {return;}String topicName = mqttMsg.variableHeader().topicName();int msgId = mqttMsg.variableHeader().packetId();log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {//消息来源为网关主题if (gatewaySessionHandler != null) {handleGatewayPublishMsg(ctx, topicName, msgId, mqttMsg);transportService.reportActivity(deviceSessionCtx.getSessionInfo());}} else {//处理设备的消息,重点processDevicePublish(ctx, mqttMsg, topicName, msgId);}
}
继续
//org.thingsboard.server.transport.mqtt.MqttTransportHandlerprivate void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {try {Matcher fwMatcher;MqttTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));} else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));attrReqTopicType = TopicType.V1;} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)) {TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));toServerRpcSubTopicType = TopicType.V1;} else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) {TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg));} else if ((fwMatcher = FW_REQUEST_PATTERN.matcher(topicName)).find()) {getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.FIRMWARE);} else if ((fwMatcher = SW_REQUEST_PATTERN.matcher(topicName)).find()) {getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.SOFTWARE);} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC)) {TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC)) {TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getJsonMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC)) {TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getProtoMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC)) {TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC)) {TransportProtos.PostAttributeMsg postAttributeMsg = context.getJsonMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC)) {TransportProtos.PostAttributeMsg postAttributeMsg = context.getProtoMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC)) {TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getJsonMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC)) {TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getProtoMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC)) {TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC)) {TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getJsonMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));toServerRpcSubTopicType = TopicType.V2_JSON;} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC)) {TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getProtoMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));toServerRpcSubTopicType = TopicType.V2_PROTO;} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC)) {TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC);transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));toServerRpcSubTopicType = TopicType.V2;} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX)) {TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getJsonMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX);transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));attrReqTopicType = TopicType.V2_JSON;} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX)) {TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getProtoMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX);transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));attrReqTopicType = TopicType.V2_PROTO;} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX)) {TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX);transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));attrReqTopicType = TopicType.V2;} else {transportService.reportActivity(deviceSessionCtx.getSessionInfo());ack(ctx, msgId);}} catch (AdaptorException e) {log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);ctx.close();}
}
这里根据消息来源主题的不同,进行对应的处理
顺着属性消息处理继续往下看
//org.thingsboard.server.common.transport.service.DefaultTransportService@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {if (checkLimits(sessionInfo, msg, callback, msg.getKvCount())) {//更新活动时间reportActivityInternal(sessionInfo);TenantId tenantId = getTenantId(sessionInfo);DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));//获取键值对JsonObject json = JsonUtils.getJsonObject(msg.getKvList());//构造元数据TbMsgMetaData metaData = new TbMsgMetaData();metaData.putValue("deviceName", sessionInfo.getDeviceName());metaData.putValue("deviceType", sessionInfo.getDeviceType());metaData.putValue("notifyDevice", "false");CustomerId customerId = getCustomerId(sessionInfo);//发送至规则引擎sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST, new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback)));}
}
继续
//org.thingsboard.server.common.transport.service.DefaultTransportServiceprivate void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, CustomerId customerId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json, TbMsgMetaData metaData, SessionMsgType sessionMsgType, TbQueueCallback callback) {//创建设备配置标识DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB()));//从缓存中获取设备配置DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId);RuleChainId ruleChainId;String queueName;if (deviceProfile == null) {//无设备配置,使用默认的规则链和队列名log.warn("[{}] Device profile is null!", deviceProfileId);ruleChainId = null;queueName = ServiceQueue.MAIN;} else {//获取规则链标识ruleChainId = deviceProfile.getDefaultRuleChainId();//获取队列名String defaultQueueName = deviceProfile.getDefaultQueueName();queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN;}//创建消息TbMsg tbMsg = TbMsg.newMsg(queueName, sessionMsgType.name(), deviceId, customerId, metaData, gson.toJson(json), ruleChainId, null);//发送至规则引擎sendToRuleEngine(tenantId, tbMsg, callback);
}
继续
//org.thingsboard.server.common.transport.service.DefaultTransportServiceprivate void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {//获取分区信息TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, tbMsg.getOriginator());if (log.isTraceEnabled()) {log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg);}//创建消息ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg)).setTenantIdMSB(tenantId.getId().getMostSignificantBits()).setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();//统计数增加ruleEngineProducerStats.incrementTotal();//统计相关回调StatsCallback wrappedCallback = new StatsCallback(callback, ruleEngineProducerStats);//发送至规则消息引擎队列ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback);
}
接下来,我们需要去消费端查看后续的处理
protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
先找到TbQueueProducer的位置
接着从TbQueueConsumer入手
package org.thingsboard.server.queue;import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;import java.util.List;
import java.util.Set;public interface TbQueueConsumer<T extends TbQueueMsg> {String getTopic();void subscribe();void subscribe(Set<TopicPartitionInfo> partitions);void unsubscribe();List<T> poll(long durationInMillis);void commit();boolean isStopped();}
重点关注拉取方法List<T> poll(long durationInMillis);
,看看在哪些地方被调用
发现目标DefaultTbRuleEngineConsumerService
//org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerServicevoid consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {updateCurrentThreadName(threadSuffix);while (!stopped && !consumer.isStopped()) {try {//拉取消息List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(pollDuration);if (msgs.isEmpty()) {continue;}//获取提交策略final TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(configuration);//获取确认策略final TbRuleEngineProcessingStrategy ackStrategy = getAckStrategy(configuration);//初始化提交策略submitStrategy.init(msgs);while (!stopped) {//创建处理上下文TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(configuration.getName(), submitStrategy, ackStrategy.isSkipTimeoutMsgs());//提交,重点为 submitMessage 方法submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> submitMessage(configuration, stats, ctx, id, msg)));//超时等待final boolean timeout = !ctx.await(configuration.getPackProcessingTimeout(), TimeUnit.MILLISECONDS);//创建结果TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getName(), timeout, ctx);if (timeout) {//超时处理printFirstOrAll(configuration, ctx, ctx.getPendingMap(), "Timeout");}if (!ctx.getFailedMap().isEmpty()) {//失败处理printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed");}//打印统计信息ctx.printProfilerStats();//根据结果获取决策TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);if (statsEnabled) {//记录是否提交完成stats.log(result, decision.isCommit());}//清理上下文ctx.cleanup();//判断决策if (decision.isCommit()) {//已提交//停止提交策略submitStrategy.stop();//退出循环break;} else {//未提交完毕//将决策的重试消息集合更新至提交策略,继续提交submitStrategy.update(decision.getReprocessMap());}}//消费端提交确认consumer.commit();} catch (Exception e) {if (!stopped) {log.warn("Failed to process messages from queue.", e);try {Thread.sleep(pollDuration);} catch (InterruptedException e2) {log.trace("Failed to wait until the server has capacity to handle new requests", e2);}}}}log.info("TB Rule Engine Consumer stopped.");
}
查看关键方法
//org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerServicevoid submitMessage(TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, TbMsgPackProcessingContext ctx, UUID id, TbProtoQueueMsg<ToRuleEngineMsg> msg) {log.trace("[{}] Creating callback for topic {} message: {}", id, configuration.getName(), msg.getValue());//获取原始消息ToRuleEngineMsg toRuleEngineMsg = msg.getValue();//获取租户标识TenantId tenantId = TenantId.fromUUID(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB()));//创建回调TbMsgCallback callback = prometheusStatsEnabled ? new TbMsgPackCallback(id, tenantId, ctx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) : new TbMsgPackCallback(id, tenantId, ctx);try {if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) {//转发至规则引擎 ActorforwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback);} else {//消息为空直接回调成功方法callback.onSuccess();}} catch (Exception e) {//回调失败方法callback.onFailure(new RuleEngineException(e.getMessage()));}
}
继续
//org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerServiceprivate void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {//构建消息TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback);QueueToRuleEngineMsg msg;//获取关联类型列表ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();Set<String> relationTypes = null;if (relationTypesList != null) {if (relationTypesList.size() == 1) {relationTypes = Collections.singleton(relationTypesList.get(0));} else {relationTypes = new HashSet<>(relationTypesList);}}//创建消息msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage());//使用 Actor 系统上下文发送消息actorContext.tell(msg);
}
先看一下QueueToRuleEngineMsg
package org.thingsboard.server.common.msg.queue;import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;import java.util.Set;/*** Created by ashvayka on 15.03.18.*/
@ToString
@EqualsAndHashCode(callSuper = true)
public final class QueueToRuleEngineMsg extends TbRuleEngineActorMsg {@Getterprivate final TenantId tenantId;@Getterprivate final Set<String> relationTypes;@Getterprivate final String failureMessage;public QueueToRuleEngineMsg(TenantId tenantId, TbMsg tbMsg, Set<String> relationTypes, String failureMessage) {super(tbMsg);this.tenantId = tenantId;this.relationTypes = relationTypes;this.failureMessage = failureMessage;}@Overridepublic MsgType getMsgType() {return MsgType.QUEUE_TO_RULE_ENGINE_MSG;}@Overridepublic void onTbActorStopped(TbActorStopReason reason) {String message;if (msg.getRuleChainId() != null) {message = reason == TbActorStopReason.STOPPED ?String.format("Rule chain [%s] stopped", msg.getRuleChainId().getId()) :String.format("Failed to initialize rule chain [%s]!", msg.getRuleChainId().getId());} else {message = reason == TbActorStopReason.STOPPED ? "Rule chain stopped" : "Failed to initialize rule chain!";}msg.getCallback().onFailure(new RuleEngineException(message));}public boolean isTellNext() {return relationTypes != null && !relationTypes.isEmpty();}}
得知消息类型为MsgType.QUEUE_TO_RULE_ENGINE_MSG,后面会用到
接着我们看Actor
系统对消息的处理
//org.thingsboard.server.actors.ActorSystemContextpublic void tell(TbActorMsg tbActorMsg) {appActor.tell(tbActorMsg);
}
appActor 为应用Actor
,是整个Actor
系统的根Actor
,感兴趣可以自行阅读
根据之前的学习,我们了解到Actor
的处理方法为boolean process(TbActorMsg msg)
//org.thingsboard.server.actors.service.ContextAwareActor@Override
public boolean process(TbActorMsg msg) {if (log.isDebugEnabled()) {log.debug("Processing msg: {}", msg);}//处理消息if (!doProcess(msg)) {log.warn("Unprocessed message: {}!", msg);}return false;
}
可见,正真执行的方法为protected abstract boolean doProcess(TbActorMsg msg)
//org.thingsboard.server.actors.app.AppActor@Override
protected boolean doProcess(TbActorMsg msg) {if (!ruleChainsInitialized) {//规则链未初始化//初始化租户 ActorinitTenantActors();ruleChainsInitialized = true;if (msg.getMsgType() != MsgType.APP_INIT_MSG && msg.getMsgType() != MsgType.PARTITION_CHANGE_MSG) {log.warn("Rule Chains initialized by unexpected message: {}", msg);}}//判断消息类型switch (msg.getMsgType()) {case APP_INIT_MSG:break;case PARTITION_CHANGE_MSG:ctx.broadcastToChildren(msg);break;case COMPONENT_LIFE_CYCLE_MSG:onComponentLifecycleMsg((ComponentLifecycleMsg) msg);break;case QUEUE_TO_RULE_ENGINE_MSG:onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);break;case TRANSPORT_TO_DEVICE_ACTOR_MSG:onToDeviceActorMsg((TenantAwareMsg) msg, false);break;case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:onToDeviceActorMsg((TenantAwareMsg) msg, true);break;case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:onToTenantActorMsg((EdgeEventUpdateMsg) msg);break;case SESSION_TIMEOUT_MSG:ctx.broadcastToChildrenByType(msg, EntityType.TENANT);break;default:return false;}return true;
}
回忆之前消息类型为MsgType.QUEUE_TO_RULE_ENGINE_MSG
//org.thingsboard.server.actors.app.AppActorprivate void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {//消息来自系统,视为异常msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));} else {if (!deletedTenants.contains(msg.getTenantId())) {//租户未被删除//获取或创建租户 Actor 并发送消息getOrCreateTenantActor(msg.getTenantId()).tell(msg);} else {//租户已删除,直接回调成功方法msg.getMsg().getCallback().onSuccess();}}
}
deletedTenants 用于记录删除的用户标识
//org.thingsboard.server.actors.app.AppActorprivate TbActorRef getOrCreateTenantActor(TenantId tenantId) {return ctx.getOrCreateChildActor(new TbEntityActorId(tenantId), () -> DefaultActorService.TENANT_DISPATCHER_NAME, () -> new TenantActor.ActorCreator(systemContext, tenantId));
}
直接查看TenantActor的doProcess
方法
//org.thingsboard.server.actors.tenant.TenantActor@Override
protected boolean doProcess(TbActorMsg msg) {if (cantFindTenant) {//找不到租户log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg);if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) {QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg;//直接回调成功方法queueMsg.getMsg().getCallback().onSuccess();} else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)) {TransportToDeviceActorMsgWrapper transportMsg = (TransportToDeviceActorMsgWrapper) msg;//直接回调成功方法transportMsg.getCallback().onSuccess();}return true;}switch (msg.getMsgType()) {case PARTITION_CHANGE_MSG:PartitionChangeMsg partitionChangeMsg = (PartitionChangeMsg) msg;ServiceType serviceType = partitionChangeMsg.getServiceQueueKey().getServiceType();if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {//To Rule Chain Actorsbroadcast(msg);} else if (ServiceType.TB_CORE.equals(serviceType)) {List<TbActorId> deviceActorIds = ctx.filterChildren(new TbEntityTypeActorIdPredicate(EntityType.DEVICE) {@Overrideprotected boolean testEntityId(EntityId entityId) {return super.testEntityId(entityId) && !isMyPartition(entityId);}});deviceActorIds.forEach(id -> ctx.stop(id));}break;case COMPONENT_LIFE_CYCLE_MSG:onComponentLifecycleMsg((ComponentLifecycleMsg) msg);break;case QUEUE_TO_RULE_ENGINE_MSG:onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);break;case TRANSPORT_TO_DEVICE_ACTOR_MSG:onToDeviceActorMsg((DeviceAwareMsg) msg, false);break;case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:onToDeviceActorMsg((DeviceAwareMsg) msg, true);break;case SESSION_TIMEOUT_MSG:ctx.broadcastToChildrenByType(msg, EntityType.DEVICE);break;case RULE_CHAIN_INPUT_MSG:case RULE_CHAIN_OUTPUT_MSG:case RULE_CHAIN_TO_RULE_CHAIN_MSG:onRuleChainMsg((RuleChainAwareMsg) msg);break;case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:onToEdgeSessionMsg((EdgeEventUpdateMsg) msg);break;default:return false;}return true;
}
查看MsgType.QUEUE_TO_RULE_ENGINE_MSG类型的处理方法
//org.thingsboard.server.actors.tenant.TenantActorprivate void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {//检查当前服务是否为规则引擎服务if (!isRuleEngine) {log.warn("RECEIVED INVALID MESSAGE: {}", msg);return;}TbMsg tbMsg = msg.getMsg();//检查规则引擎是否启用状态if (getApiUsageState().isReExecEnabled()) {//判断规则链标识是否为空if (tbMsg.getRuleChainId() == null) {//获取根链 Actor 并判断是否为空if (getRootChainActor() != null) {//向根链 Actor 发送消息getRootChainActor().tell(msg);} else {//无根链 Actor ,回调失败方法tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!"));log.info("[{}] No Root Chain: {}", tenantId, msg);}} else {try {//向指定规则链发送消息ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);} catch (TbActorNotRegisteredException ex) {log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId());//TODO: 3.1 Log it to dead letters queue;tbMsg.getCallback().onSuccess();}}} else {log.trace("[{}] Ack message because Rule Engine is disabled", tenantId);tbMsg.getCallback().onSuccess();}
}
跳过中间的步骤,直接看规则链Actor
的doProcess
方法即可
//org.thingsboard.server.actors.ruleChain.RuleChainActor@Override
protected boolean doProcess(TbActorMsg msg) {switch (msg.getMsgType()) {case COMPONENT_LIFE_CYCLE_MSG:onComponentLifecycleMsg((ComponentLifecycleMsg) msg);break;case QUEUE_TO_RULE_ENGINE_MSG:processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);break;case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);break;case RULE_CHAIN_TO_RULE_CHAIN_MSG:processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);break;case RULE_CHAIN_INPUT_MSG:processor.onRuleChainInputMsg((RuleChainInputMsg) msg);break;case RULE_CHAIN_OUTPUT_MSG:processor.onRuleChainOutputMsg((RuleChainOutputMsg) msg);break;case PARTITION_CHANGE_MSG:processor.onPartitionChangeMsg((PartitionChangeMsg) msg);break;case STATS_PERSIST_TICK_MSG:onStatsPersistTick(id);break;default:return false;}return true;
}
查看MsgType.QUEUE_TO_RULE_ENGINE_MSG类型的处理方法
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorvoid onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {TbMsg msg = envelope.getMsg();//验证消息if (!checkMsgValid(msg)) {return;}log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg);//判断是否包含关联类型if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {onTellNext(msg, true);} else {onTellNext(msg, envelope.getMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage());}
}
此处的关联类型是什么?
结合规则链的结构联想一下,规则链本质是从上个节点传递到下个节点,节点传递之间有什么是可有可无的?
显然关联类型就是节点流转的条件
由于首节点是没有条件的,因此在构造消息时没有设置关联类型
注:每条规则链仅有一个首节点,且除首节点外的其他节点至少存在一个关联类型(TbRelationTypes.FAILURE),这部分感兴趣自行研究
先看没有关联类型的处理
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorprivate void onTellNext(TbMsg msg, boolean useRuleNodeIdFromMsg) {try {//检查组件(节点)状态是否正常checkComponentStateActive(msg);//获取节点标识RuleNodeId targetId = useRuleNodeIdFromMsg ? msg.getRuleNodeId() : null;RuleNodeCtx targetCtx;if (targetId == null) {//未指定目标节点//将当前规则链的首节点作为目标targetCtx = firstNode;//拷贝消息,entityId即当前规则链的标识msg = msg.copyWithRuleChainId(entityId);} else {//已指定目标节点,获取节点上下文targetCtx = nodeActors.get(targetId);}//判断上下文是否存在if (targetCtx != null) {log.trace("[{}][{}] Pushing message to target rule node", entityId, targetId);//推送至节点pushMsgToNode(targetCtx, msg, NA_RELATION_TYPE);} else {log.trace("[{}][{}] Rule node does not exist. Probably old message", entityId, targetId);msg.getCallback().onSuccess();}} catch (RuleNodeException rne) {msg.getCallback().onFailure(rne);} catch (Exception e) {msg.getCallback().onFailure(new RuleEngineException(e.getMessage()));}
}
查看下一个方法前,我们先了解一下节点上下文的结构
package org.thingsboard.server.actors.ruleChain;import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleNode;/*** Created by ashvayka on 19.03.18.*/
@Data
@AllArgsConstructor
final class RuleNodeCtx {private final TenantId tenantId;private final TbActorRef chainActor;private final TbActorRef selfActor;private RuleNode self;
}
很简单的结构,记录了租户标识,规则链Actor
,自身节点Actor
和自身节点
继续
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorprivate void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) {if (nodeCtx != null) {//创建消息并告知自身节点nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType));} else {log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName);msg.getCallback().onFailure(new RuleEngineException("Rule Node CTX is empty"));}
}
与QueueToRuleEngineMsg类似,查看可知RuleChainToRuleNodeMsg的消息类型为MsgType.RULE_CHAIN_TO_RULE_MSG,后面同样以此为处理条件
终于到节点的Actor
了
//org.thingsboard.server.actors.ruleChain.RuleNodeActor@Override
protected boolean doProcess(TbActorMsg msg) {switch (msg.getMsgType()) {case COMPONENT_LIFE_CYCLE_MSG:case RULE_NODE_UPDATED_MSG:onComponentLifecycleMsg((ComponentLifecycleMsg) msg);break;case RULE_CHAIN_TO_RULE_MSG:onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg);break;case RULE_TO_SELF_MSG:onRuleNodeToSelfMsg((RuleNodeToSelfMsg) msg);break;case STATS_PERSIST_TICK_MSG:onStatsPersistTick(id);break;case PARTITION_CHANGE_MSG:onClusterEventMsg((PartitionChangeMsg) msg);break;default:return false;}return true;
}
查看MsgType.RULE_CHAIN_TO_RULE_MSG处理方法
//org.thingsboard.server.actors.ruleChain.RuleNodeActorprivate void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg envelope) {TbMsg msg = envelope.getMsg();//验证消息if (!msg.isValid()) {if (log.isTraceEnabled()) {log.trace("Skip processing of message: {} because it is no longer valid!", msg);}return;}if (log.isDebugEnabled()) {log.debug("[{}][{}][{}] Going to process rule engine msg: {}", ruleChainId, id, processor.getComponentName(), msg);}try {//处理消息processor.onRuleChainToRuleNodeMsg(envelope);//增加处理计数increaseMessagesProcessedCount();} catch (Exception e) {logAndPersist("onRuleMsg", e);}
}
继续
//org.thingsboard.server.actors.ruleChain.RuleNodeActorMessageProcessorvoid onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {//回调处理开始通知msg.getMsg().getCallback().onProcessingStart(info);//检查组件状态可用checkComponentStateActive(msg.getMsg());TbMsg tbMsg = msg.getMsg();//获取规则节点计数int ruleNodeCount = tbMsg.getAndIncrementRuleNodeCounter();//获取消息的最大规则节点执行次数int maxRuleNodeExecutionsPerMessage = getTenantProfileConfiguration().getMaxRuleNodeExecsPerMessage();//判断执行次数是否超限if (maxRuleNodeExecutionsPerMessage == 0 || ruleNodeCount < maxRuleNodeExecutionsPerMessage) {//上报规则引擎执行计数apiUsageClient.report(tenantId, tbMsg.getCustomerId(), ApiUsageRecordKey.RE_EXEC_COUNT);if (ruleNode.isDebugMode()) {systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());}try {//执行节点处理方法tbNode.onMsg(msg.getCtx(), msg.getMsg());} catch (Exception e) {msg.getCtx().tellFailure(msg.getMsg(), e);}} else {tbMsg.getCallback().onFailure(new RuleNodeException("Message is processed by more then " + maxRuleNodeExecutionsPerMessage + " rule nodes!", ruleChainName, ruleNode));}
}
这里仅调用了tbNode的onMsg
方法,那么节点的流转呢?
猜想所有节点继承自一个公共的父抽象类,该类中实现了节点的流转
那么看一下继承关系
这些节点并没由继承公共的父类,流转方法没有抽离出来?
随便找一个节点看看,这里我看的是TbMsgTypeFilterNode
//org.thingsboard.rule.engine.filter.TbMsgTypeFilterNode@Override
public void onMsg(TbContext ctx, TbMsg msg) {ctx.tellNext(msg, config.getMessageTypes().contains(msg.getType()) ? "True" : "False");
}
验证了猜想,接着看下去
//org.thingsboard.server.actors.ruleChain.DefaultTbContext@Override
public void tellSuccess(TbMsg msg) {tellNext(msg, Collections.singleton(TbRelationTypes.SUCCESS), null);
}@Override
public void tellNext(TbMsg msg, String relationType) {tellNext(msg, Collections.singleton(relationType), null);
}@Override
public void tellNext(TbMsg msg, Set<String> relationTypes) {tellNext(msg, relationTypes, null);
}
这里贴出了tellSuccess
方法和另一个tellNext
方法,它们有被其他节点使用
//org.thingsboard.server.actors.ruleChain.DefaultTbContextprivate void tellNext(TbMsg msg, Set<String> relationTypes, Throwable th) {if (nodeCtx.getSelf().isDebugMode()) {relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));}//回调处理结束通知msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId());//像规则链 Actor 发送消息nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
}
和前面一样,查看RuleNodeToRuleChainTellNextMsg的消息类型为MsgType.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG
接下来又回到了规则链Actor
的doProcess
方法,找到对应的处理方法
//org.thingsboard.server.actors.ruleChain.RuleChainActorcase RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);break;
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorvoid onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {var msg = envelope.getMsg();if (checkMsgValid(msg)) {onTellNext(msg, envelope.getOriginator(), envelope.getRelationTypes(), envelope.getFailureMessage());}
}
这里调用的onTellNext
方法即前面onQueueToRuleEngineMsg
方法中根据关联类型通知的方法
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorprivate void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) {try {checkComponentStateActive(msg);EntityId entityId = msg.getOriginator();//获取主题分区信息TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);//根据来源节点标识获取关联(后续节点的指向)列表List<RuleNodeRelation> ruleNodeRelations = nodeRoutes.get(originatorNodeId);if (ruleNodeRelations == null) { // When unchecked, this will cause NullPointerException when rule node doesn't exist anymorelog.warn("[{}][{}][{}] No outbound relations (null). Probably rule node does not exist. Probably old message.", tenantId, entityId, msg.getId());ruleNodeRelations = Collections.emptyList();}//根据指定的关联类型筛选关联List<RuleNodeRelation> relationsByTypes = ruleNodeRelations.stream().filter(r -> contains(relationTypes, r.getType())).collect(Collectors.toList());//获取关联个数int relationsCount = relationsByTypes.size();if (relationsCount == 0) {//没有后续节点了log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());//判断当前关联是否包含失败(上个节点是否执行失败)if (relationTypes.contains(TbRelationTypes.FAILURE)) {//获取规则节点上下文RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);if (ruleNodeCtx != null) {//回调消息的失败方法msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));} else {log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());//回调消息的失败方法msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));}} else {//回调消息的成功方法msg.getCallback().onSuccess();}} else if (relationsCount == 1) {//后续仅一个节点//此处循环仅执行一次for (RuleNodeRelation relation : relationsByTypes) {log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());//推送给目标pushToTarget(tpi, msg, relation.getOut(), relation.getType());}} else {//后续多个节点MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback());log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relationsByTypes);//遍历关联列表for (RuleNodeRelation relation : relationsByTypes) {EntityId target = relation.getOut();//推送至队列putToQueue(tpi, msg, callbackWrapper, target);}}} catch (RuleNodeException rne) {msg.getCallback().onFailure(rne);} catch (Exception e) {log.warn("[" + tenantId + "]" + "[" + entityId + "]" + "[" + msg.getId() + "]" + " onTellNext failure", e);msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));}
}
我们先看后续多个节点的推送
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorprivate void putToQueue(TopicPartitionInfo tpi, TbMsg msg, TbQueueCallback callbackWrapper, EntityId target) {switch (target.getEntityType()) {case RULE_NODE:putToQueue(tpi, msg.copyWithRuleNodeId(entityId, new RuleNodeId(target.getId()), UUID.randomUUID()), callbackWrapper);break;case RULE_CHAIN:putToQueue(tpi, msg.copyWithRuleChainId(new RuleChainId(target.getId()), UUID.randomUUID()), callbackWrapper);break;}
}
根据实体类型拷贝消息,发送至队列
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorprivate void putToQueue(TopicPartitionInfo tpi, TbMsg newMsg, TbQueueCallback callbackWrapper) {//构建消息ToRuleEngineMsg toQueueMsg = ToRuleEngineMsg.newBuilder().setTenantIdMSB(tenantId.getId().getMostSignificantBits()).setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).setTbMsg(TbMsg.toByteString(newMsg)).build();//推送至规则引擎clusterService.pushMsgToRuleEngine(tpi, newMsg.getId(), toQueueMsg, callbackWrapper);
}
继续
//org.thingsboard.server.service.queue.DefaultTbClusterService@Override
public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) {log.trace("PUSHING msg: {} to:{}", msg, tpi);//获取规则引擎消息生产端,发送消息producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback);toRuleEngineMsgs.incrementAndGet();
}
消息发送至队列,消费端接收到消息后,根据关联类型再次调用onTellNext
方法处理,此时的消息仅指向单个后续节点
最后查看单个后续节点的处理
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorprivate void pushToTarget(TopicPartitionInfo tpi, TbMsg msg, EntityId target, String fromRelationType) {//判断是否为当前服务负责的分区if (tpi.isMyPartition()) {//判断实体类型switch (target.getEntityType()) {case RULE_NODE://推送至节点pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType);break;case RULE_CHAIN://通知父 Actorparent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType));break;}} else {//放入队列,交给负责的服务putToQueue(tpi, msg, new TbQueueTbMsgCallbackWrapper(msg.getCallback()), target);}
}
注:集群部署下,每个服务负责若干分区pushMsgToNode
方法和putToQueue
方法前面都已经看过了,重点看跳转规则链
和前面一样,查看RuleChainToRuleChainMsg的消息类型为MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG
接着查看租户Actor
中对应的处理方法
//org.thingsboard.server.actors.tenant.TenantActorprivate void onRuleChainMsg(RuleChainAwareMsg msg) {if (getApiUsageState().isReExecEnabled()) {getOrCreateActor(msg.getRuleChainId()).tell(msg);}
}
再查看规则链Actor
中对应的处理方法
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessorvoid onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {var msg = envelope.getMsg();if (!checkMsgValid(msg)) {return;}try {checkComponentStateActive(envelope.getMsg());if (firstNode != null) {pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());} else {envelope.getMsg().getCallback().onSuccess();}} catch (RuleNodeException e) {log.debug("Rule Chain is not active. Current state [{}] for processor [{}][{}] tenant [{}]", state, entityId.getEntityType(), entityId, tenantId);}
}
pushMsgToNode
方法前面已经看过了,至此,我们已阅读完整个规则链的执行逻辑
总结
最后,画一下规则链的执行逻辑