Flowable多引擎架构搭建方案
1、多引擎架构分析
通常情况下,为了支持更高的并发量,可以将服务层设计为无状态服务,将存储层分库分表。无状态服务设计相对比较简单,难点在于数据库的分库分表。分库分表通常有两种方案。比较简单的方案是垂直拆分,即将不同的业务拆分到不同的表或库中。
垂直拆分有一个明显的缺点:当同一个业务的数据量比较大、并发比较高、单库单表无法承载时,无法分库分表。这个时候就需要用到第二种方案——水平拆分,即通过分片的方式存储数据。水平拆分可分为表的水平拆分和库的水平拆分。
1.1、水平分库分表的局限性
水平分库分表指将单表的数据切分到多个分片中,每个分片具有相同结构的库与表,只是库、表存储的数据不同。水平分库分表可解决单机单库的性能瓶颈,突破IO、连接数等限制。但在Flowable中采用常规的水平分库分表方案存在一定的难度和问题,具体如下。
1.1.1、没有合适的分库分表依据
要采用水平分库分表方案,首先要决定采用何种分库分表策略。这通常需要根据业务的特性来决策,常见的分库分表策略有按范围或Hash取模拆分等。
对于基于范围的分库分表策略,可以按时间范围或者流程定义范围进行分库分表,如按年或者月进行分库分表等。但在流程领域,往往需要按流程实例ID、任务实例ID和人员ID等场景查询流程和任务,无法确定时间范围,导致这些场景的查询无法满足。按时间范围分库分表的另一个问题是同一时间段的数据需要写入相同的库表中,无法通过多个数据库和表来分担并发压力。
按流程定义分库分表方案也存在类似的问题。首先,流量并非按流程定义均匀分布的,少量的流程定义占据了大部分的数据,易导致分库分表方案失效。其次,该方案无法实现按流程实例ID、任务实例ID和人员ID等查询流程和任务的场景。
对于基于Hash取模的分库分表策略,可以按人员ID的Hash取模进行分库分表,如按人的维度将流程实例分布在不同的库和表上,能解决按人查询的场景需求。但在该方案下,同一个流程会关联多人,同一个任务也可能与多人关联,如同一个用户任务存在多个候选人,这就导致按人分库分表时存在数据冗余。此外,该方案也无法实现基于流程实例ID或者任务ID进行数据查询的场景。当然,也可以按流程实例ID和任务ID进行分库分表,但这又无法实现按人员ID查询流程或任务的场景。
1.1.2、常规分库分表方案改造成本高
分库分表实现本身的复杂度比较高,尤其是SQL比较复杂时,要自己实现一套完整的水平分库分表方案成本非常高昂。因此,通常会使用开源组件实现分库分表。常见的水平分库分表实现方案有两种:第一种是客户端水平分库分表,比较常见的是sharding-jdbc
;第二种是通过代理进行分库分表,如Mycat
。
sharding-jdbc
以JAR包的形式嵌入代码,对代码具有一定的侵入性,但运维成本较低。Mycat
需要维护一套单独的集群,运维成本比较高,对代码无侵入性。无论采用哪种方式,都有部分功能实现起来比较复杂,对系统性能影响也比较大,甚至无法实现。例如,Flowable底层SQL中存在大量的join操作,一旦涉及多表的跨库操作,实现起来就非常困难。除join操作,排序、分组、分页等常用的SQL操作性能也非常低。总之,想在Flowable基础上实现一套分库分表方案,无论是设计的难度还是开发的成本都非常高。
1.1.3、常规分库分表方案扩容困难
分库分表的另一个难题是扩容。出于成本考虑,一开始不可能拆分过多的库表。但是随着业务的发展,已有的库表无法满足业务需求,这时就需要对原有的库表进行扩容。扩容往往涉及数据迁移,而数据迁移过程风险比较大,难以在不停服且用户无感知的情况下完成。虽然可以采用一致性哈希算法来减少迁移的数据量,但是无法彻底解决数据迁移问题,导致系统扩容比较困难。
1.2、多引擎架构设计方案
多引擎架构模式中,可以创建多个集群,每个集群中包含多个Flowable工作流引擎,同一个集群中的工作流引擎连接同一个数据库,集群与集群之间相互独立。该架构中路由网关起到的如下作用至关重要。
1、进行请求的负载均衡。这是针对发起新流程的请求的。网关可以将发起新流程的请求根据策略分发到不同的集群中,起到负载均衡的作用。
2、进行请求与集群的匹配和路由。这是针对已发起流程的各种操作请求的。同一个流程实例的所有数据都存在于同一个集群下,网关会将已发起流程的各种操作请求路由分发到该流程实例所在的集群中去处理。
多引擎架构的优势:
- 引擎改造成本低。从单集群的角度来说,Flowable是一个独立的工作单元,其业务逻辑与原来基本一致,也不存在跨库跨表操作等常见问题,因此,对于Flowable的底层逻辑,基本无须调整。
- 扩容简单。多引擎架构下,扩容时并不需要对历史数据进行迁移,因此扩容只需简单地增加一个集群,以及对应的数据库,且可在用户无感知的情况下快速实现。
- 数据均衡。多引擎架构模式下,流量的分发由网关决定,与业务没有直接的关系,不会产生热点数据,可以实现集群数据的均衡。此外,还可以灵活配置负载均衡算法,实现更复杂的路由策略。
- 差异化的引擎能力。这是多引擎架构最大的优势,也是常规分库分表方案无法实现的能力。所谓差异化,是指可以根据业务的不同提供不同的引擎能力。在实际业务中,不同业务对引擎能力的需求也不相同,如审批场景的数据量比较小,对性能要求也不高,但是业务复杂,与人员组织紧密相关;系统集成场景往往数据量大,对性能要求高,但是业务简单,查询条件少。
存在的问题:
- 不同流程实例的数据分散在多个集群中,但是流程模型信息必须是共享的;
- 网关需要知道已发起流程实例的数据存储于哪个集群中才能正确地进行路由。
2、多引擎建模服务实现
根据上文的架构设计方案可以知道,目前是还存在流程模型共享的问题。因此,在多引擎架构模式中,要实现流程模型在多个引擎之间的共享,可以创建一个独立的服务专门负责流程建模与部署,而其他工作流引擎则通过调用该服务获取流程模型及流程定义信息。
2.1、建模服务搭建
建模服务本身也是一个Flowable工作流引擎,可以结合Spring Boot对外提供HTTP服务。
多引擎架构下,工作流引擎服务所需的流程模型和流程定义信息不再通过数据库查询,而是通过调用建模服务接口获取。但是如果每次获取流程模型和流程定义信息都需要远程调用,必然会导致性能的急剧下降,因此这里采用共享Redis缓存的方式进行建模信息的同步。对于Redis中不存在的流程模型和流程定义息,调用建模服务将其写入Redis。因此,需要在建模服务中增加同步缓存的HTTP接口。其实现如下:
@Autowired
private ManagementService managementService;@PostMapping("/processDefinition/syncById/{processDefinitionId}")
public ResponseEntity<String> syncProcessDefinition(@PathVariable("processDefinitionId") String processDefinitionId) {//如果缓存中不存在,该方法会从数据库中重新加载流程定义到缓存中ProcessDefinition processDefinition = managementService.executeCommand(context ->CommandContextUtil.getProcessEngineConfiguration(context).getDeploymentManager()//当前方法会从Redis缓存中获取,前提是改造集成的 .findDeployedProcessDefinitionById(processDefinitionId));if (processDefinition != null) {return ResponseEntity.ok("success");} else {return ResponseEntity.notFound().build();}
}
2.2、工作流引擎服务缓存改造
Flowable查询流程定义的逻辑是先读取缓存中的流程定义缓存对象,如果其不存在,则从数据库中查询并将其加载到缓存。在多引擎架构下,工作流引擎服务对应的数据库中不再保存流程模型和流程定义信息,只能从建模服务中获取数据。为了提高工作流引擎性能,降低建模服务的负荷,这里采用共享Redis缓存机制来获取流程模型与流程定义信息。其实现逻辑如下:
@Component
@Slf4j
public class RedisProcessDefinitionCache implements DeploymentCache<ProcessDefinitionCacheEntry> {@Resource(name = "processDefinitionCacheRedisTemplate")private RedisTemplate<String, ProcessDefinitionCacheEntry> redisTemplate;@Value("${bpm.process-definitions.cache.key}")private String processDefinitionCacheKey;@Autowiredprivate ProcessDefinitionClient processDefinitionClient;@Overridepublic ProcessDefinitionCacheEntry get(String id) {log.info("Query cache from redis: id={}", id);Object obj = redisTemplate.opsForHash().get(processDefinitionCacheKey, id);if (obj == null) {log.info("Sync cache to redis. id={}", id);//缓存中没有,则从数据库中加载到缓存processDefinitionClient.syncProcessDefinition(id);obj = redisTemplate.opsForHash().get(processDefinitionCacheKey, id);}if (obj == null) {throw new FlowableObjectNotFoundException("流程定义ID:" + id + ";不存在");}return (ProcessDefinitionCacheEntry) obj;}@Overridepublic boolean contains(String id) {return redisTemplate.opsForHash().hasKey(processDefinitionCacheKey, id);}@Overridepublic void add(String id, ProcessDefinitionCacheEntry object) {throw new FlowableException("不支持的操作");}@Overridepublic void remove(String id) {throw new FlowableException("不支持的操作");}@Overridepublic void clear() {throw new FlowableException("不支持的操作");}@Overridepublic Collection < ProcessDefinitionCacheEntry > getAll() {return null;}@Overridepublic int size() {return 0;}
}
在Redis中不存在流程定义对象时调用建模服务来同步缓存。对建模服务的调用通过接口ProcessDefinitionClient
完成。ProcessDefinitionClient
是一个OpenFeign
客户端。OpenFeign
是Spring Cloud中的子项目,提供申明式的HTTP调用服务,可以像调用本地方法一样调用远程HTTP服务。ProcessDefinitionClient
的实现如下:
@FeignClient(name = "process-modeling-service", url = "${service.process-modeling-service.url}")
public interface ProcessDefinitionClient {@PostMapping("/processDefinition/syncById/{processDefinitionId}")public ResponseEntity<String> syncProcessDefinition(@PathVariable("processDefinitionId") String processDefinitionId);
}
在以上代码段中,注解@FeignClient
通过属性url
配置了远程调用地址,可以采用变量的形式引用配置文件中对应的属性配置项。具体的请求路径及HTTP方法通过Spring MVC注解来指定。@PostMapping
注解表示HTTP请求方式为POST,请求路径为/processDefinition/syncById/{processDefinitionId}
,其中{processDefinitionId}
为路径参数,实际调用时会替换为方法参数processDefinitionId
。使用OpenFeign
需要引入以下对应JAR包:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
由于OpenFeign
是Spring Cloud的子项目,需要引入其父POM文件spring-cloud-dependencies
。由于Maven的POM文件中只允许出现一个<parent>
标签,因此这里采用dependencyManagement
标签引入,并且指定其scope为import,其代码如下:
<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
除了引入JAR包,还需要在Spring Boot启用类中通过注解@EnableFeignClients
注解启用OpenFeign
功能,其实现如下:
@SpringBootApplication
@EnableFeignClients
public class FlowableEngineApplication {public static void main(String[] args) {SpringApplication.run(FlowableEngineApplication.class, args);}
}
完成流程定义缓存类的修改后,还需要自定义流程定义数据管理器,将原有的数据库操作改为缓存操作,该类实现ProcessDefinitionDataManager
接口,实现如下:
@Service
public class CustomProcessDefinitionDataManagerImpl implements
ProcessDefinitionDataManager {@Autowiredprivate RedisProcessDefinitionCache processDefinitionCache;@Overridepublic ProcessDefinitionEntity findById(String entityId) {return (ProcessDefinitionEntity) processDefinitionCache.get(entityId).getProcessDefinition();}@Overridepublic ProcessDefinitionEntity findLatestProcessDefinitionByKey(String processDefinitionKey) {return null;}//省略其他未实现方法
}
这里只实现了findById()
方法。在流程执行过程中一般无须使用其他方法,如果有需要,读者可自行调用缓存类或者通过OpenFeign
调用建模服务实现其他方法。最后,还需要在工作流引擎配置类ProcessEngineConfigurationImpl
中指定自定义缓存与流程定义管理类:
@Configuration
public class FlowableEngineConfiguration {@Autowiredprivate PlatformTransactionManager transactionManager;@Autowiredprivate RedisProcessDefinitionCache processDefinitionCache;@Autowiredprivate CustomProcessDefinitionDataManagerImpl processDefinitionDataManager;@Bean(name = "processEngineConfiguration")public ProcessEngineConfigurationImpl processEngineConfiguration(DataSource dataSource) {SpringProcessEngineConfiguration configuration =new SpringProcessEngineConfiguration();configuration.setDataSource(dataSource);configuration.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);configuration.setTransactionManager(transactionManager);//自定义流程定义缓存类configuration.setProcessDefinitionCache(processDefinitionCache);//自定义流程定义数据管理类configuration.setProcessDefinitionDataManager(processDefinitionDataManager);return configuration;}
}
3、工作流引擎路由
针对工作流引擎路由,大致可以分为以下两种场景:
1、新流程发起。新发起流程的路由需要实现流程数据的负载均衡,可以采用轮询、随机等算法,将流程发起流量均衡地负载到不同的集群上(当然也可以根据业务需要实现其他负载策略)。新流程发起的路由与工作流引擎没有直接关系,其具体逻辑由网关层实现。
2、已有流程数据操作。流程实例一旦发起,关于该流程实例的所有操作,如流程实例查询、关联任务办理等,都必须路由到同一个集群上。这就需要记录该流程实例相关数据存储在哪个集群上。这里采用路由表来保存流程实例数据与引擎之间的关联关系。工作流引擎在发起流程时将路由信息写入路由表。对流程进行其他操作时,网关根据请求参数,读取路由表中的集群信息,再将请求路由到对应的集群上。
3.1、Pika与SpringBoot整合
根据上述描述可知,路由表中记录流程实例ID和任务ID对应的工作流引擎。路由表操作均为K-V操作,所以这里采用360公司推出的开源数据存储系统Pika存储路由信息。
Pika是360公司推出的一款开源类Redis存储系统。其底层使用RocksDB存储数据,且数据会直接持久化到磁盘,相对于Redis,其支持更大数据的存储,数据可靠性也更高。Pika完全支持Redis协议,可以直接通过Redis客户端操作。Spring Boot 2默认使用Lettuce作为Redis客户端,但由于新版本的Lettuce客户端与Pika可能存在兼容性问题,因此这里选择使用Jedis连接Pika。在Spring Boot中集成Jedis,需要在项目的pom.xml文件中引入相关依赖:
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId>
</dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency>
这里除了引入Jedis,还引入了commons-pool2,该JAR包的作用是配置Jedis连接池。引入相关依赖后,还需要在application.yml中增加Pika的连接信息:
pika:host: 127.0.0.1port: 9221timeout: 1000msconnect-timeout: 1000msjedis:pool:max-active: 50min-idle: 5max-idle: 10max-wait: 1000ms
上述配置信息指定了Pika服务器的地址和端口,并配置了连接池相关信息。最后,手动创建Pika客户端。其实现如下:
package cn.blnp.net.flowable.boot.config;import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;import java.time.Duration;/*** <h3>Pika集成配置</h3>** @author <a href="mailto:blnp.yibin@qq.com">lyb</a>* @version 1.0* @since 2025/7/2 16:32*/
@Configuration
public class PikaConfig {@Value("${pika.host}")private String pickHost;@Value("${pika.port}")private int pickPort;@Value("${pika.timeout:1000ms}")private Duration readTimeout;@Value("${pika.connect-timeout:1000ms}")private Duration connectTimeout;@Value("${pika.jedis.pool.max-active:8}")private int maxActive;@Value("${pika.jedis.pool.max-idle:4}")private int minIdle;@Value("${pika.jedis.pool.min-idle:0}")private int maxIdle;@Value("${pika.jedis.pool.max-wait:1000ms}")private Duration maxWait;@Bean(name = "pikaTemplate")public RedisTemplate<String, String> pikaTemplate() {//配置服务器信息RedisStandaloneConfiguration configuration =new RedisStandaloneConfiguration(pickHost, pickPort);//配置客户端连接池信息GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();poolConfig.setMaxTotal(maxActive);poolConfig.setMinIdle(minIdle);poolConfig.setMaxIdle(maxIdle);poolConfig.setMaxWait(maxWait);JedisClientConfiguration clientConfiguration = JedisClientConfiguration.builder().readTimeout(readTimeout).connectTimeout(connectTimeout).usePooling().poolConfig(poolConfig).build();RedisTemplate<String, String> redisTemplate = getStringStringRedisTemplate(configuration, clientConfiguration);return redisTemplate;}private RedisTemplate<String, String> getStringStringRedisTemplate(RedisStandaloneConfiguration configuration, JedisClientConfiguration clientConfiguration) {JedisConnectionFactory connectionFactory = new JedisConnectionFactory(configuration, clientConfiguration);connectionFactory.afterPropertiesSet();//配置序列化方式RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();redisTemplate.setKeySerializer(stringRedisSerializer);redisTemplate.setHashKeySerializer(stringRedisSerializer);redisTemplate.setHashValueSerializer(stringRedisSerializer);redisTemplate.setValueSerializer(stringRedisSerializer);redisTemplate.setConnectionFactory(connectionFactory);return redisTemplate;}
}
因为路由表中key和value均为字符串,所以这里指定的key和value的序列化方式都是StringRedisSerializer。完成Pika的客户端配置后,即可在流程执行过程中通过RedisTemplate将路由信息写入Pika。
3.2、路由信息写入
package cn.blnp.net.flowable.boot.service.impl;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import javax.annotation.Resource;/*** <h3>工作流引擎路由助手</h3>** @author <a href="mailto:blnp.yibin@qq.com">lyb</a>* @version 1.0* @since 2025/7/2 16:42*/
@Slf4j
@Service
public class IdRouterService {private static final String PIKA_PREFIX_PROCESS_ID = "BPM#ENGINE#PROCESSID";private static final String PIKA_PREFIX_TASK_ID = "BPM#ENGINE#TASKID";@Resource(name = "pikaTemplate")private RedisTemplate<String, Object> pikaTemplate;@Value("${bpm.engine-name:default}")private String engineName;/*** <p><b>用途:流程实例路由注册<b></p>* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>* @since 16:45 2025/7/2* @params [processInstanceId]* @param processInstanceId* @return void**/public void addProcessId(String processInstanceId) {put(PIKA_PREFIX_PROCESS_ID, processInstanceId, engineName);}/*** <p><b>用途:任务路由注册<b></p>* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>* @since 16:45 2025/7/2* @params [taskId]* @param taskId* @return void**/public void addTaskId(String taskId) {put(PIKA_PREFIX_TASK_ID, taskId, engineName);}/*** <p><b>用途:添加路由信息<b></p>* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>* @since 16:45 2025/7/2* @params [key, hashKey, value]* @param key* @param hashKey* @param value* @return void**/private void put(String key, String hashKey, String value) {log.info("Insert Pika success.key={},hashKey={}", key, hashKey);pikaTemplate.opsForHash().put(key, hashKey, value);}/*** <p><b>用途:根据流程实例ID获取路由信息<b></p>* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>* @since 16:45 2025/7/2* @params [processInstanceId]* @param processInstanceId* @return java.lang.String**/public String getProcessEngineName(String processInstanceId) {Object value = pikaTemplate.opsForHash().get(PIKA_PREFIX_PROCESS_ID, processInstanceId);return value == null ? "" : value.toString();}/*** <p><b>用途:根据任务ID获取路由信息<b></p>* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>* @since 16:45 2025/7/2* @params [taskId]* @param taskId* @return java.lang.String**/public String getTaskEngineName(String taskId) {Object value = pikaTemplate.opsForHash().get(PIKA_PREFIX_TASK_ID, taskId);return value == null ? "" : value.toString();}/*** <p><b>用途:根据流程实例ID删除路由信息<b></p>* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>* @since 16:45 2025/7/2* @params [processInstanceId]* @param processInstanceId* @return**/public void deleteProcessId(String processInstanceId) {pikaTemplate.opsForHash().delete(PIKA_PREFIX_PROCESS_ID, processInstanceId);log.info("Delete Pika success.processInstanceId={}", processInstanceId);}/*** <p><b>用途:根据任务ID删除路由信息<b></p>* @author <a href="mailto:blnp.yibin@qq.com">lyb</a>* @since 16:45 2025/7/2* @params [taskId]* @param taskId* @return**/public void deleteTaskId(String taskId) {pikaTemplate.opsForHash().delete(PIKA_PREFIX_TASK_ID, taskId);}
}
以上代码实现了对流程实例ID和任务ID路由信息的增加、删除和查询操作,引擎名称在配置文件中通过bpm.engine-name
选项进行配置。路由信息写入的逻辑可以添加到不同的监听器中,使得在流程发起和任务创建时,可以分别将路由信息写入路由表。
3.2.1、流程发起监听器
流程发起时将路由信息写入路由表,需要对所有流程生效,因此使用全局监听器。流程发起监听器继承AbstractFlowableEngineEventListener
类,重写processCreated()
方法,并在该方法中将流程实例ID写入路由表,其实现如下:
package cn.blnp.net.flowable.boot.config.flowable.listener;import cn.blnp.net.flowable.boot.service.impl.IdRouterService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent;
import org.flowable.engine.delegate.event.AbstractFlowableEngineEventListener;
import org.flowable.engine.delegate.event.FlowableProcessStartedEvent;
import org.springframework.stereotype.Component;/*** <h3>全局流程发起监听器</h3>** @author <a href="mailto:blnp.yibin@qq.com">lyb</a>* @version 1.0* @since 2025/7/2 16:52*/
@Slf4j
@Component
@AllArgsConstructor
public class GlobalProcessStartListener extends AbstractFlowableEngineEventListener {private final IdRouterService idRouterService;@Overrideprotected void processCreated(FlowableEngineEntityEvent event) {//将流程实例ID路由信息写入路由表idRouterService.addProcessId(event.getProcessInstanceId());}@Overridepublic boolean isFailOnException() {//表示一旦流程实例ID路由信息写入失败,则回滚整个事务,流程发起失败。return true;}
}
3.2.2、任务创建监听器
package cn.blnp.net.flowable.boot.config.flowable.listener;import cn.blnp.net.flowable.boot.service.impl.IdRouterService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent;
import org.flowable.engine.delegate.event.AbstractFlowableEngineEventListener;
import org.flowable.task.service.impl.persistence.entity.TaskEntity;
import org.springframework.stereotype.Component;/*** <h3>全局任务创建监听器</h3>** @author <a href="mailto:blnp.yibin@qq.com">lyb</a>* @version 1.0* @since 2025/7/2 16:57*/
@Slf4j
@Component
@AllArgsConstructor
public class GlobalTaskCreateListener extends AbstractFlowableEngineEventListener {private final IdRouterService idRouterService;@Overrideprotected void taskCreated(FlowableEngineEntityEvent event) {TaskEntity taskEntity = (TaskEntity) (event).getEntity();//将任务ID路由信息写入路由表idRouterService.addTaskId(taskEntity.getId());}@Overridepublic boolean isFailOnException() {//表示一旦流程实例ID路由信息写入失败,则回滚整个事务,任务创建失败。return true;}
}
3.2.3、注册监听器
最后,还需要将上述监听器注册到工作流引擎中。这可以通过调用工作流引擎配置类ProcessEngine ConfigurationImpl
的setTypedEventListeners()
实现。流程发起监听器ProcessStartListener
监听流程发起事件(PROCESS_STARTED
),任务创建监听器TaskCreateListener
监听任务创建事件(TASK_CREATED
),其实现如下:
@Configuration
public class FlowableEngineConfiguration {@Autowiredprivate DataSource dataSource;@Autowiredprivate PlatformTransactionManager transactionManager;@Autowiredprivate ProcessStartListener processStartListener;@Autowiredprivate TaskCreateListener taskCreateListener;@Autowiredprivate RedisProcessDefinitionCache processDefinitionCache;@Autowiredprivate CustomProcessDefinitionDataManagerImpl processDefinitionDataManager;@Bean(name = "processEngineConfiguration")public SpringProcessEngineConfiguration processEngineConfiguration() {SpringProcessEngineConfiguration configuration = newSpringProcessEngineConfiguration();configuration.setDataSource(dataSource);configuration.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);configuration.setTransactionManager(transactionManager);configuration.setProcessDefinitionCache(processDefinitionCache);configuration.setProcessDefinitionDataManager(processDefinitionDataManager);//自定义监听器注册配置Map<String, List<FlowableEventListener>> eventListeners = new HashMap<>();eventListeners.put(PROCESS_STARTED.name(),Arrays.asList(processStartListener));eventListeners.put(TASK_CREATED.name(), Arrays.asList(taskCreateListener));configuration.setTypedEventListeners(eventListeners);return configuration;}
}
4、建立服务网关
4.1、SpringCloud Gateway 简介
Spring Cloud Gateway
是Spring Cloud的一个子项目,用于取代Netflix Zuul
,是基于Spring 5
、Spring Boot 2
和Project Reactor
等技术实现的一个高性能网关服务。Spring Cloud Gateway中包含以下3个核心概念。
- 路由(route):网关最基础的组成部分,由一个唯一ID、一个目标URI、多个断言和过滤器组成,当断言为true时,表示路由匹配成功。
- 断言(predicate):即Java 8中的函数式编程接口
Predicate
,输入类型为org.springframework.web.server.ServerWebExchange
,通过断言可以匹配HTTP请求中的任何内容,包括请求参数、请求头等。 - 过滤器(filter):接口
org.springframework.cloud.gateway.filter.GatewayFilter
实现类的实例,多个过滤器形成过滤器链,可以在请求前对请求(request)进行修改,或在请求后对响应(response)进行修改。
客户端请求Spring Cloud Gateway后,Gateway通过HandlerMapping
找到与请求所匹配的路由,然后将请求发送给WebHandler
。WebHandler
会创建指定的过滤器链,并且将请求发送给第一个过滤器。经过过滤器链后,请求最终被转发给实际的服务器进行业务逻辑处理。
4.2、SpringCloud Gateway服务搭建
搭建Spring Cloud Gateway服务要先引入相应的JAR包。因为Spring Cloud Gateway是Spring Cloud的子项目,所以需要在POM文件中加入Spring Cloud的父依赖。因为Spring Cloud Gateway还依赖于Spring Boot,所以还需要加入Spring Boot的父依赖。最后,加入spring-cloud-starter-gateway。最终的项目pom.xml文件内容如下:
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><spring-boot.version>2.7.18</spring-boot.version><spring-cloud.version>2021.0.5</spring-cloud.version>
</properties><parent><artifactId>spring-boot-starter-parent</artifactId><groupId>org.springframework.boot</groupId><version>2.7.18</version><relativePath />
</parent><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId></dependency>
</dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
添加依赖后,还需要编写一个Spring Boot启动类:
@SpringBootApplication
public class GatewayApplication {public static void main(String[] args) {SpringApplication.run(GatewayApplication.class, args);}
}
4.3、新发起流程路由配置(🧑💻重点)
在配置路由之前,先准备两个工作流引擎服务engine01、engine02。二者代码一致,但使用的配置文件不同。配置文件application-engine01.yml
的内容如下:
bpm:engine-name: engine01
server:port: 8101
spring:datasource:url: jdbc:mysql://localhost:3306/flowable_engine01?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=falseusername: rootpassword: 123456
配置文件application-engine02.yml的内容如下:
bpm:engine-name: engine02
server:port: 8102
spring:datasource:url: jdbc:mysql://localhost:3306/flowable_engine02?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=falseusername: rootpassword: 123456
此外,还需要在工作流引擎服务中添加流程发起接口,代码如下:
@RestController
@RequestMapping("/processInstance")
public class ProcessInstanceController {@Autowiredprivate RuntimeService runtimeService;@PostMapping("/startByProcessDefinitionId/{processDefinitionId}")public String startProcessInstance(@PathVariable("processDefinitionId") String processDefinitionId, @RequestBody Map<String, Object> variables) {ProcessInstance processInstance = runtimeService.startProcessInstanceById(processDefinitionId, variables);return processInstance.getProcessInstanceId();}
}
最后,通过启动命令分别指定虚拟机参数-Dspring.profiles.active=engine01
和-Dspring.profiles.active= engine02
,用于启动以下两个工作流引擎服务:
java -Dspring.profiles.active=engine01 -jar bpm-engine-1.0-SNAPSHOT.jar
java -Dspring.profiles.active=engine02 -jar bpm-engine-1.0-SNAPSHOT.jar
接下来,针对上面两个工作流引擎服务配置流程发起的路由规则。这里需要根据请求的路径进行转发,所以采用Path进行断言。此外,还需要将流量均衡地送到这两个工作流引擎,因此还需要增加Weight断言。最终,网关服务配置文件appplication.yml
中的路由配置如下:
spring:cloud:gateway:routes:- id: engine01-routeruri: http://localhost:8101predicates:- Path=/processInstance/startByProcessDefinitionId/**- Weight=engine-group,5- id: engine02-routeruri: http://localhost:8102predicates:- Path=/processInstance/startByProcessDefinitionId/**- Weight=engine-group,5
上述配置文件中配置了两个路由,其id分别为engine01-router和engine02-router,分别对应工作流引擎engine01和engine02,路径断言均为/processInstance/startByProcessDefinitionId/**
,权重断言指定分组均为engine-group,值均为5,表示路径为/processInstance/startByProcessDefinitionId/**
的请求会平均分发到engine01和engine02两个工作流引擎上。
4.4、已有流程路由配置(🧑💻重点)
接下来将介绍针对已发起流程的路由。已发起流程的路由与新发起流程不同:已发起流程的路由需要解析请求的参数,再根据参数获取对应的工作流引擎,最后由网关实现路由转发。首先,在工作流引擎服务中增加以下两个接口:
@RestController
@RequestMapping("/task")
public class TaskController {@Autowiredprivate TaskService taskService;/*** 根据流程实例ID查询待办任务*/@GetMapping("/processInstance/{processInstanceId}")public ResponseEntity<List<Map<String, Object>>> queryTasks(@PathVariable("processInstanceId") String processInstanceId) {List<Task> taskList = taskService.createTaskQuery().processInstanceId(processInstanceId).list();List<Map<String, Object>> ret = new ArrayList<>();for (Task task: taskList) {Map<String, Object> taskData = new HashMap<>();taskData.put("taskId", task.getId());taskData.put("name", task.getName());ret.add(taskData);}return ResponseEntity.ok(ret);}/*** 根据任务ID办理任务*/@PostMapping("/complete/{taskId}")public ResponseEntity<String> completeTask(@PathVariable("taskId") String taskId, Map<String, Object> variables) {taskService.complete(taskId, variables);return ResponseEntity.ok("success");}
}
接下来,在网关中实现这两个接口的路由。因为这两个接口的路由是根据路径匹配的,所以仍然采用Path断言。但是,最终转发的URI是动态获取的而非固定的。因此,在配置上可以先用占位符表示,并且通过不同的占位符来区分流程实例ID和任务ID。最终的路由配置如下:
spring:cloud:gateway:routes:- id: task-complete-routeruri: er://taskpredicates:- Path=/task/complete/*- id: task-query-routeruri: er://processpredicates:- Path=/task/processInstance/*
在以上配置中,er
部分的代码是两个自定义URI。er://task
表示schema为er,用于判断该URI是否需要动态查询;host为task,表示需要根据任务ID获取引擎信息。er://process
中的host为process,表示需要根据流程实例ID获取工作流引擎。此外,还需要增加工作流引擎配置信息和ID提取模式,代码如下:
engine-config:engines:- name: engine01url: http://localhost:8101- name: engine02url: http://localhost:8102pattens:- /task/complete/(.+)- /task/processInstance/(.+)
以上配置中包含了工作流引擎配置信息和正则表达式。工作流引擎配置指定了工作流引擎的名称和请求地址,当根据流程实例ID和任务ID读取到对应的工作流引擎名称后,能根据其名称查询到对应的地址。正则表达式则用于提取路径中的ID信息。可以通过自定义配置类来读取上述配置:
Configuration
@ConfigurationProperties(prefix = "engine-config")
@Data
public class EngineConfig {private List<EngineInfo> engines;private List<String> pattens;@Datapublic static class EngineInfo {private String name;private String url;}
}
完成上述配置后,即可通过Spring Cloud Gateway的全局过滤器实现最终请求地址的替换,具体实现如下:
@Component
public class EngineRouterGlobalFilter implements GlobalFilter, Ordered {@Autowiredprivate EngineConfig engineConfig;@Resource(name = "pikaTemplate")private RedisTemplate<String, String> redisTemplate;private static final String PIKA_PREFIX_PROCESS_ID = "BPM#ENGINE#PROCESSID";private static final String PIKA_PREFIX_TASK_ID = "BPM#ENGINE#TASKID";@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);if (url == null || !"er".equals(url.getScheme())) {return chain.filter(exchange);}String id = matchId(url.getPath());if (StringUtils.hasText(id)) {String prefix = url.getHost().equals("task") ? PIKA_PREFIX_TASK_ID :PIKA_PREFIX_PROCESS_ID;String engineName = String.valueOf(redisTemplate.opsForHash().get(prefix, id));for (EngineConfig.EngineInfo engine: engineConfig.getEngines()) {if (engine.getName().equals(engineName)) {URI requestUri = UriComponentsBuilder.fromUriString(engine.getUrl()).path(url.getPath()).build().toUri();exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUri);break;}}}return chain.filter(exchange);}@Overridepublic int getOrder() {return 10001;}private String matchId(String path) {for (String regex: engineConfig.getPattens()) {Pattern pattern = Pattern.compile(regex);Matcher matcher = pattern.matcher(path);if (matcher.matches()) {return matcher.group(1);}}return "";}
}
判断当前schema是否为er,如果是,则从请求路径中根据配置的正则表达获取ID信息;根据host信息判断是流程实例ID还是任务ID,并根据ID信息从Pika的路由表中获取工作流引擎名称;根据工作流引擎名称查询实际的URL地址,再替换ServerWebExchange
对象中属性GATEWAY_ REQUEST_URL_ATTR
的值,由Spring Cloud Gateway根据该属性值进行最终的路由转发。
需要注意的是,EngineRouterGlobalFilter
除了实现了GlobalFilter
接口,还实现了Ordered
接口。Ordered接口的作用是指定过滤器执行的顺序。这里getOrder()
方法返回的值为10001,主要原因是EngineRouterGlobalFilter
过滤器需要从ServerWebExchange
对象中获取属性GATEWAY_REQUEST_URL_ATTR
的值,该值通过全局过滤器RouteToRequestUrlFilter
设置。RouteToRequestUrlFilter
过滤器的getOrder()
方法返回的值为10000,因此为了保证EngineRouterGlobalFilter
过滤器在RouteToRequestUrlFilter
过滤器之后执行,getOrder()
方法的返回值就必须大于10000。
4.5、小结
多引擎架构模式可以通过多个引擎来提高系统的容量和性能,解决流程领域高并发、大数据量业务场景的需求。多引擎架构中,Flowable工作流引擎的底层逻辑不需要进行大的调整,整体改造成本比较低。另外,服务网关和路由表为多引擎架构模式提供了强大的扩容能力,使得在不进行任何数据迁移的情况下,实现快速扩容。此外,多引擎架构模式还提供了差异化的引擎能力,解决了众多流程领域中的复杂问题。但是还存在一些问题,主要包括:
- 工作流引擎集群都只包含一台服务器,存在单点风险,并且重启过程会导致服务不可用,因此需要采用集群部署模式;
- 路由信息是在配置文件中固定配置的,如果要增加或修改路由信息,则需要重新上线,成本较高;
- 通过多引擎差异化能力可以尽量避免跨库数据的查询,但是实际工作中难免会有这种场景,目前的架构无法完成跨库查询功能。
5、工作流引擎集群搭建
对于上文的架构设计,目前还存在几个需要解决的问题:引擎服务为单机模式,服务之间通过IP调用,而IP地址有可能会变动,尤其随着云原生应用的广泛使用,每次部署时都可能变更IP地址,从而影响服务的可用性;网关路由是静态配置的,无法支持动态路由配置;无法支持跨集群的数据查询。
工作流引擎应该以集群为工作单元对外提供流程服务,但是在实现中,工作流引擎服务是以单机模式提供服务的,这会影响工作流引擎的稳定性和容量。因此需实现工作流引擎服务的集群部署,但集群模式又会带来管理上的挑战,如服务器地址管理、服务上下线及服务健康状态监测等。可以通过服务注册中心来解决集群管理问题,这里使用Nacos
完成工作流引擎服务集群模式的实现。
5.1、Nacos服务搭建
Nacos是阿里巴巴推出的一个开源服务注册中心,同时也是一个配置中心。Nacos致力于发现、配置和管理微服务。Nacos提供了一组简单、易用的特性集,可以快速实现动态服务发现、服务配置、服务元数据及流量管理。Nacos主要提供以下功能。
- 服务注册与发现。Nacos支持基于DNS和基于RPC的服务发现。服务提供者使用原生SDK或OpenAPI向Nacos注册服务,服务消费者通过HTTP接口查找和发现服务。
- 服务健康监测。Nacos提供了对服务的实时健康检查,阻止向不健康的主机或服务实例发送请求。
- 动态配置服务。Nacos提供了配置中心管理能力,实现了对配置文件的动态修改,配置变更时无须重新部署应用和服务,让配置管理变得更加高效和敏捷。
本小节主要利用Nacos的服务注册与发现,以及服务健康监测功能实现工作流引擎集群的搭建。Nacos是一个独立服务,可以通过Git下载对应的二进制版本进行解压缩启动。Nacos与Spring Cloud、Spring Cloud Alibaba版本的对应如表所示。
Nacos 版本 | SpringCloud 版本 | SpringCloud Alibaba 版本 |
---|---|---|
2.2.1 | 2022.0.0 | 2022.0.0.0 |
2.2.0 | 2021.0.5 | 2021.0.5.0 |
2.1.0 | Hoxton.SR12 | 2.2.9.RELEASE |
2.0.4 | 2021.0.4 | 2021.0.4.0 |
2.0.3 | Hoxton.SR12 | 2.2.7.RELEASE |
1.4.2 | 2021.0.1 | 2021.0.1.0 |
1.4.1 | 2020.0.1 | 2021.1 |
1.4.2 | Hoxton.SR9 | 2.2.6.RELEASE |
1.2.1 | Hoxton.SR3 | 2.2.1.RELEASE |
1.1.4 | Hoxton.RELEASE | 2.2.0.RELEASE |
1.4.1 | Greenwich.SR6 | 2.1.4.RELEASE |
1.2.1 | Greenwich | 2.1.2.RELEASE |
由于Spring Cloud 2022.0.0
对应的是Spring Boot 3,需要JDK 17以上才能支持。而本文采用的Flowable 6.8是基于JDK 8实现的,因此本书使用支持SpringBoot 2x
的Spring Cloud版本。Nacos
默认使用嵌入式数据库derby
,为了方便管理和维护,可以改成MySQL。首先在MySQL中新建一个数据库,然后执行下载解压缩文件中conf目录下的nacos-mysql.sql
文件,并修改conf目录下的配置文件application.properties
,增加以下数据库配置:
spring.datasource.platform=mysql
db.num=1
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.user.0=root
db.password.0=123456
在以上配置中,spring.datasource.platform=mysql
表示使用数据库MySQL;db.num=1
表示使用一个数据库;Nacos
支持多数据源,db.url.0
、db.user.0
和db.password.0
分别表示第一个库的地址、用户名和密码。Nacos
支持单机和集群两种启动模式,这里采用单机模式启动,Linux环境下的启动命令如下:
sh startup.sh -m standalone
Windows环境下的启动命令如下:
startup.cmd -m standalone
Nacos
默认使用的端口号为8848,可以在application.properties
文件中修改配置项server.port
来指定其他端口号。启动成功后,可以通过浏览器访问http://locahost:8848/nacos,Nacos
默认用户名和密码均为nacos
,登录成功后的界面如图所示。
5.2、基于Nacos的引擎集群搭建
搭建完成Nacos
服务后,需要将工作流引擎服务注册到Nacos
中。Nacos
提供了注册客户端,可以通过调用HTTP接口完成服务注册。在Spring Cloud环境下,可以通过对应的starter
实现Nacos
客户端的整合。首先,在POM文件中引入Spring Cloud Alibaba的父依赖,根据上表中的对应关系,版本号应为2021.0.5.0
。因此,在dependencyManagement
标签中增加了以下内容:
<dependencies><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>2021.0.5.0</version><type>pom</type><scope>import</scope></dependency>
</dependencies>
此外,还需要加入Nacos服务注册发现组件spring-cloud-starter-alibaba-nacos-discovery
。在POM文件中加入以下内容:
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
添加以上依赖后配置服务注册信息。Nacos地址信息属于公共配置,因此可以配置在application.yml
文件中:
spring:cloud:nacos:discovery:server-addr: 127.0.0.1:8848username: nacospassword: nacos
各引擎中,服务名是不同的,所以将其配置在各引擎的配置文件中。在application-engine01.yml文件中增加以下内容:
spring:cloud:nacos:discovery:service: engine01-cluster
在application-engine02.yml文件中增加以下内容:
spring:cloud:nacos:discovery:service: engine02-cluster
Nacos默认使用spring.application.name
属性值作为服务注册名称,也可以通过spring.cloud.nacos.discovery. service
指定服务注册名称。如果这两个值都没设定,则会抛出如下异常
java.lang.IllegalArgumentException: Param 'serviceName' is illegal, serviceName is blank
最后,通过命令行来启动服务。服务都在同一台机器上,这里通过端口号来进行区分,集群engine01启动命令如下:
java -Dspring.profiles.active=engine01 -Dserver.port=8101 -jar bpm-engine-1.0-SNAPSHOT.jar
java -Dspring.profiles.active=engine01 -Dserver.port=8102 -jar bpm-engine-1.0-SNAPSHOT.jar
集群engine02启动命令如下:
java -Dspring.profiles.active=engine02 -Dserver.port=8201 -jar bpm-engine-1.0-SNAPSHOT.jar
java -Dspring.profiles.active=engine02 -Dserver.port=8202 -jar bpm-engine-1.0-SNAPSHOT.jar
启动完成后,可以通过Nacos
“服务管理”下的“服务列表”界面查看引擎集群状态,如图所示。
现在存在两个集群,服务名分别为engine01-cluster和engine02-cluster,默认分组名称为DEFAULT_ GROUP,分组名称也可以在配置文件中指定,两个集群中都有两个运行正常的实例。可以查看每个集群的详细信息,如图所示。
从集群详细信息中可以了解集群中每个实例的IP及端口,也可以操作实例的下线及设置每个实例的权重,权重值代表每个实例所占流量的比例。接下来,通过网关服务实现工作流引擎集群流量的分发。
5.3、引擎集群路由配置
在上文的架构设计中,工作流引擎为单机服务,网关通过IP地址调用工作流引擎服务。后面工作流引擎服务在Nacos
基础上实现了集群模式,因此网关服务也可以借助Nacos
的服务发现能力,实现工作流引擎集群的服务路由。网关服务需要先引入Spring Cloud Alibaba的父依赖及spring-cloud-starter- alibaba-nacos-discovery
。此外,为了实现负载均衡,这里还需要引入spring-cloud-starter-loadbalancer
:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
接下来,在网关服务的application.yml
中配置Nacos
服务地址及服务名:
spring:cloud:nacos:discovery:service: bpm-gatewayserver-addr: 127.0.0.1:8848username: nacospassword: nacos
完成Nacos
配置后,网关服务就可以通过Nacos
中注册的集群服务名调用工作流引擎服务了。为了实现同一个集群服务下多个实例的流量负载均衡,需要指定schema
为lb
(即LoaderBalance
的简写),因此原有的配置修改为如下形式:
spring:cloud:gateway:routes:- id: task-complete-routeruri: er://taskpredicates:- Path=/task/complete/*- id: task-query-routeruri: er://processpredicates:- Path=/task/processInstance/*- id: engine01-routeruri: lb://engine01-clusterpredicates:- Path=/processInstance/startByProcessDefinitionId/**- Weight=engine-group,5- id: engine02-routeruri: lb://engine02-clusterpredicates:- Path=/processInstance/startByProcessDefinitionId/**- Weight=engine-group,5
engine-config:engines:- name: engine01url: lb://engine01-cluster- name: engine02url: lb://engine02-clusterpattens:- /task/complete/(.+)- /task/processInstance/(.+)
在以上配置中,将原HTTP地址修改为lb://服务名
的形式,这样就实现了工作流引擎集群多实例负载均衡的调用模式。如果有实例重启或宕机,Nacos能及时监控到对应实例的状态,从而将其从有效实例列表中剔除,这样网关就不会将流量转发到该实例上,从而提高了整个系统的稳定性。
6、网关动态路由配置
由前文可知,网关路由信息配置在application.yml
文件中。一旦路由信息发生变动,就需要重新部署上线整个网关服务,效率比较低,因此需要支持路由的动态配置,实现路由信息的在线修改。网关路由信息分为两部分:引擎配置信息,包括引擎名称与URI;路由信息,包括路由ID、断言和过滤器。
6.1、引擎信息动态配置
在Spring Cloud项目中要使用Nacos配置中心,首先要引入对应的JAR包,除父依赖Spring Cloud Alibaba,还需要引入spring-cloud-starter-alibaba-nacos-config
。在项目的pom.xml文件中增加如下内容:
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
此外,要在resouces
目录下增加bootstrap.yml
或bootstrap.properties
文件。该文件会优先于application.yml
和application.properties
配置文件加载,但是SprinCloud 2020
及其后续版本默认不加载bootstrap文件。要加载bootstrap文件,需要加入如下依赖:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
接下来,在bootstrap.yml
中添加Nacos
配置信息:
spring:cloud:nacos:config:server-addr: 127.0.0.1:8848file-extension: yamlgroup: bpm-gatewaynamespace: publicname: engine-config
在以上配置中,server-addr
为Nacos
服务器地址;file-extension
表示Nacos支持的配置文件格式,Nacos支持TEXT、JSON、XML、YAML、HTML、Properties 6种文件格式;group表示配置文件的分组,默认分组为DEFAULT_GROUP;namespace表示命名空间,默认值为public,如果使用其他值,需要先在Nacos中创建对应名称的命名空间;name为配置文件的ID,同一分组下不能重复,默认取值为spring.application.name
的值,这里指定的值为engine-config,所以需要在Nacos的配置管理中增加数据ID为engine-config,分组为bpm-gateway,格式为YAML,配置内容为引擎信息的Nacos配置信息。具体如下所示:
engine-config:engines:- name: engine01url: lb://engine01-cluster- name: engine02url: lb://engine02-clusterpattens:- /task/complete/(.+)- /task/processInstance/(.+)
以上配置文件的内容与本地application.yml
文件中对应的部分一致,但是优先级高于本地配置文件。通过这种方式,即可实现基于Nacos配置中心的引擎信息动态变更。如果Nacos中的引擎配置信息发生变化,配置类EngineConfig
中的内容就会自动更新。
6.2、路由信息动态配置
路由配置信息与引擎配置信息不同。Spring Cloud Gateway
中的路由配置信息会转换为路由定义类org. springframework.cloud.gateway.route.RouteDefinitio
的实例,RouteDefinition
包含的属性如下所示:
public class RouteDefinition {private String id;@NotEmpty@Validprivate List<PredicateDefinition> predicates = new ArrayList<>();@Validprivate List<FilterDefinition> filters = new ArrayList<>();@NotNullprivate URI uri;private Map<String, Object> metadata = new HashMap<>();private int order = 0;
}
在以上代码中,RouteDefinition
的属性与application.yml
文件中的路由配置对应,Spring Cloud Gateway默认通过类org.springframework.cloud.gateway.route.InMemoryRouteDefinitionRepository
来管理路由定义,InMemoryRouteDefinitionRepository
会在内存中保存路由定义信息。
因此,路由信息动态配置除了感知配置信息变更,还需更新InMemoryRouteDefinitionRepository
中保存的路由信息,并且通知Spring Cloud Gatewa
路由发生了变化。Spring Cloud Gateway通过Spring事件机制来实现路由变化的通知,因此,路由信息动态更新类需要实现接口ApplicationEventPublisherAware
,代码如下:
@Service
public class DynamicEngineRouteService implements ApplicationEventPublisherAware {private ApplicationEventPublisher publisher;@Autowiredprivate RouteDefinitionWriter routeDefinitionWriter;public void addRouteDefinition(RouteDefinition definition) {routeDefinitionWriter.save(Mono.just(definition)).subscribe();this.publisher.publishEvent(new RefreshRoutesEvent(this));}public void updateRouteDefinition(RouteDefinition definition) {routeDefinitionWriter.delete(Mono.just(definition.getId())).subscribe();routeDefinitionWriter.save(Mono.just(definition)).subscribe();this.publisher.publishEvent(new RefreshRoutesEvent(this));}public void deleteRouteDefinition(String id) {routeDefinitionWriter.delete(Mono.just(id)).subscribe();this.publisher.publishEvent(new RefreshRoutesEvent(this));}@Overridepublic void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {this.publisher = applicationEventPublisher;}
}
以上代码实现了路由信息的增、删、改操作,并结合Spring的ApplicationEventPublisherAware
接口和Spring Cloud Gateway提供的RefreshRoutesEvent
事件,实现了路由变更的通知。
路由信息变动的感知可以通过Nacos
配置中心实现,为了方便将路由配置信息转为RouteDefinition
对象,这里采用JSON格式来保存配置信息。application.ym
文件中的路由配置信息对应的JSON格式数据如下:
[{"id": "task-complete-router","uri": "er://task","predicates": [{"name": "Path","args": {"pattern": "/task/complete/*"}}]},{"id": "task-query-router","uri": "er://process","predicates": [{"name": "Path","args": {"pattern": "/task/processInstance/*"}}]},{"id": "engine01-router","uri": "lb://engine01-cluster","predicates": [{"name": "Path","args": {"pattern": "/processInstance/startByProcessDefinitionId/**"}},{"name": "Weight","args": {"weight.group": "engine-group","weight.weight": "5"}}]},{"id": "engine02-router","uri": "lb://engine02-cluster","predicates": [{"name": "Path","args": {"pattern": "/processInstance/startByProcessDefinitionId/**"}},{"name": "Weight","args": {"weight.group": "engine-group","weight.weight": "5"}}]}
]
将以上配置信息保存在Nacos的配置管理中,数据ID为router-definition.json
,分组为bpm-gateway
。接下来需要监听该配置文件的变化,并调用动态变更路由对应的方法,其实现如下:
@Component
public class RouterConfigListener {private Logger log = LoggerFactory.getLogger(RouterConfigListener.class);@Autowiredprivate DynamicEngineRouteService dynamicEngineRouteService;@Autowiredprivate NacosConfigManager nacosConfigManager;@Value("${router.config.dataId:router-definition.json}")private String dataId;@Value("${spring.cloud.nacos.config.group:bpm-gateway}")private String group;private final static Set<String> ROUTER_SET = new HashSet<>();@PostConstructpublic void dynamicRouteListener() throws NacosException {nacosConfigManager.getConfigService().addListener(dataId, group, new Listener() {@Overridepublic Executor getExecutor() {return null;}@Overridepublic void receiveConfigInfo(String configInfo) {ObjectMapper mapper = new ObjectMapper();try {List<RouteDefinition> definitions = mapper.readValue(configInfo,new TypeReference<List<RouteDefinition>>(){});Set<String> newRouterIds = definitions.stream().map(definition -> definition.getId()).collect(Collectors.toSet());Iterator<String> iterator = ROUTER_SET.iterator();while (iterator.hasNext()) {String next = iterator.next();if (!newRouterIds.contains(next)) {dynamicEngineRouteService.deleteRouteDefinition(next);}iterator.remove();}for (RouteDefinition definition: definitions) {if (ROUTER_SET.contains(definition.getId())) {dynamicEngineRouteService.updateRouteDefinition(definition);} else {ROUTER_SET.add(definition.getId());dynamicEngineRouteService.addRouteDefinition(definition);}}log.info("路由更新完成------------------------");} catch (JsonProcessingException e) {log.error("JsonProcessingException e", e);}}});}
}
在以上代码段中,注解@PostConstruct
表示Spring完成RouterConfigListener
类的初始化后调用dynamicRouteListener()
方法,在该方法中注册了对配置文件router-definition.json
变更的监听,一旦Nacos修改该文件并进行发布,就会执行监听器的receiveConfigInfo()
方法,自动更新Spring Cloud Gateway路由定义信息,实现路由的动态变更。为了判断路由是新增、修改还是删除,RouterConfigListener
中增加了一个类型为HashSet的成员属性ROUTER_SET,用于保存已存在的路由ID。
这里需要注意,网关服务启动时并不会触发变更逻辑,因此需要在服务器启动后主动从Nacos中获取路由配置信息,完成路由定义信息的初始化:
@PostConstruct
public void init() throws NacosException, JsonProcessingException {String configInfo = nacosConfigManager.getConfigService()getConfig(dataId, group, 10000);ObjectMapper mapper = new ObjectMapper();List<RouteDefinition> definitions = mapper.readValue(configInfo, new TypeReference<List<RouteDefinition>>(){});for (RouteDefinition definition: definitions) {ROUTER_SET.add(definition.getId());dynamicEngineRouteService.addRouteDefinition(definition);}log.info("路由初始化完成------------------------");
}
7、流程查询服务器搭建
多引擎架构的最大优势在于可以根据业务的不同提供差异化的工作流引擎能力,从而最大限度地避免跨数据库的查询,但是实际应用中,经常会遇到一些特殊的场景,不可避免地需要跨数据库查询流程或任务数据。对于这种情况,可以通过专门的查询服务来实现。本节将介绍如何基于Elasticsearch
来搭建综合查询服务。
7.1、Elasticsearch与SpringBoot的整合
Elasticsearch是一款分布式、高扩展、高实时的搜索与数据分析引擎。Elasticsearch底层使用Lucene建立倒排索引,支持分布式实时文件存储与搜索。Elasticsearch中索引可以被分成多个分片,每个分片又可能有若干个副本。Elasticsearch具备强大的水平扩展能力,支持PB级数据的存储和查询,能够满足大数据量、多条件的复杂查询场景需求。
对于Spring Boot与Elasticsearch的集成,Spring Boot官方提供了对应的starter,只需在项目的pom.xml文件中直接引入依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
这里使用的Spring Boot版本为2.7.18,该版本下默认使用的Elasticsearch客户端版本为7.17.15。引入JAR包后,还需要在application.yml
中配置Elasticsearch服务的地址,其配置如下:
spring:elasticsearch:uris:- 127.0.0.1:9200
完成上述配置后,就可以在Spring的IoC容器中获取ElasticsearchRestTemplate
实例,并通过该实例实现对Elasticsearch的增、删、改、查等操作了。
7.2、数据写入Elasticsearch
Elasticsearch中写入什么数据,需要根据业务的具体需求来确定。这里以任务数据为例来演示数据写入Elasticsearch的过程。首先,需要在Elasticsearch中创建索引,并设置mappings,内容如下:
{"mappings": {"properties": {"id": {"type": "keyword","index": true},"name": {"type": "keyword","index": true},"activityId": {"type": "keyword","index": true},"processInstanceId": {"type": "keyword","index": true},"processInstanceName": {"type": "keyword","index": true},"assignee": {"type": "keyword","index": true},"candidates": {"type": "keyword","index": true},"status": {"type": "integer","index": true},"createTime": {"type": "date","index": true},"completeTime": {"type": "date","index": true}}}
}
在以上代码中,type表示数据类型,支持text、keyword、integer、double、boolean、long、date等,也支持嵌套对象类型object和一些特殊的数据类型,如表示地理位置的geo_point、geo_shape等。keyword和text都表示字符串,区别在于keyword不进行分词,text需要进行分词。index表示是否需要为该字段创建索引,true表示创建索引,false表示不创建索引。接下来,在引擎服务中创建与上述索引对应的实体类:
@Data
@Document(indexName = "bpm_task")
public class TaskDoc {private String id; //任务IDprivate String engine; //对应的引擎名称private String name; //任务名称private String activityId; //任务对应节点的keyprivate String processInstanceId; //流程实例IDprivate String processInstanceName; //流程实例名称private String assignee; //任务办理人private int status; //任务状态private String[] candidates; //候选人private Date createTime; //任务创建时间private Date completeTime; //任务办理时间
}
将任务数据写入Elasticsearch,可以通过任务监听器实现。为了不影响流程执行效率,可以在事务提交后,通过异步方式写入Elasticsearch,监听器的实现如下:
@Component
@Slf4j
public class TaskToEsListener extends AbstractFlowableEventListener {@Value("${bpm.engine-name}")private String engineName;@Value("${es.task-index:bpm_task}")private String taskIndex;@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;ExecutorService executorService = Executors.newFixedThreadPool(20);@Overridepublic void onEvent(FlowableEvent event) {TaskEntity taskEntity = (TaskEntity)((FlowableEntityEventImpl) event).getEntity();if (event.getType() == TASK_CREATED || event.getType() == TASK_ASSIGNED) {TaskDoc taskDoc = toTaskDoc(taskEntity);execute(() -> elasticsearchRestTemplate.save(taskDoc));} else if (event.getType() == TASK_COMPLETED) {Document document = Document.create();document.put("status", 2);document.put("completeTime", new Date());UpdateQuery updateQuery = UpdateQuery.builder(taskEntity.getId()).withDocument(document).build();execute(() -> elasticsearchRestTemplate.update(updateQuery, IndexCoordinates.of(taskIndex)));}}private void execute(Runnable runnable) {//事务提交后再写入ElasticSearchTransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {@Overridepublic void afterCommit() {executorService.submit(runnable);}});}private TaskDoc toTaskDoc(TaskEntity taskEntity) {TaskDoc doc = new TaskDoc();doc.setId(taskEntity.getId());doc.setName(taskEntity.getName());doc.setActivityId(taskEntity.getTaskDefinitionKey());doc.setAssignee(taskEntity.getAssignee());if (taskEntity.getAssignee() == null && taskEntity.getCandidates() != null) {String[] candidates = taskEntity.getCandidates().toArray(new String[taskEntity.getCandidates().size()]);doc.setCandidates(candidates);}doc.setProcessInstanceId(taskEntity.getProcessInstanceId());HistoricProcessInstanceEntity processInstance = CommandContextUtil.getHistoricProcessInstanceEntityManager().findById(taskEntity.getProcessInstanceId());if (StringUtils.hasText(processInstance.getName())) {doc.setProcessInstanceName(processInstance.getName());} else {doc.setProcessInstanceName(processInstance.getProcessDefinitionName());}doc.setEngine(engineName);doc.setStatus(1);doc.setCreateTime(taskEntity.getCreateTime());return doc;}@Overridepublic boolean isFailOnException() {return false;}
}
注意事项:
- 为了保证不将中间过程数据写入Elasticsearch,需要在事务提交后再执行Elasticsearch写入逻辑。这里通过
TransactionSynchronizationManager.registerSynchronization
注册了回调接口TransactionSynchronization
,事务提交成功后会调用其中的afterCommit()
方法进行Elasticsearch数据的写入。如果流程执行过程中发生了异常,导致事务回滚,则无须将数据写入Elasticsearch。 - 数据在写入Elasticsearch的过程中可能会发生异常。为了保证数据的一致性,应该在异常处理机制中进行数据的补偿。补偿过程与异步历史数据的补偿逻辑类似,可以先将异常数据写入MQ,再消费MQ中的数据进行补偿。
- 以上代码同时监听了TASK_CREATED和TASK_ASSIGNED事件,并且两者处理逻辑一致。主要原因在于,在Flowable中,如果流程定义中已设置了任务办理人,则会生成TASK_ASSIGNED事件,并且该事件会先于TASK_CREATED发生;如果未设置任务办理人,则只生成TASK_CREATED事件。所以这里Elasticsearch写入的逻辑是,如果对应ID的数据已经存在,则进行数据的全量更新,否则插入一条新的数据。
最后,在工作流引擎启动时将监听器TaskToEsListener
注入引擎:
@Configuration
public class FlowableEngineConfiguration {@Autowiredprivate DataSource dataSource;@Autowiredprivate PlatformTransactionManager transactionManager;@Autowiredprivate ProcessStartListener processStartListener;@Autowiredprivate TaskCreateListener taskCreateListener;@Autowiredprivate TaskToEsListener taskToEsListener;@Autowiredprivate RedisProcessDefinitionCache processDefinitionCache;@Autowiredprivate CustomProcessDefinitionDataManagerImpl processDefinitionDataManager;@Beanpublic ProcessEngine createProcessEngine() {SpringProcessEngineConfiguration engineConf = new SpringProcessEngineConfiguration();engineConf.setDataSource(dataSource);engineConf.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);engineConf.setTransactionManager(transactionManager);engineConf.setProcessDefinitionCache(processDefinitionCache);engineConf.setProcessDefinitionDataManager(processDefinitionDataManager);engineConf.setIdGenerator(new SnowFlakeIdGenerator());Map<String, List<FlowableEventListener>> eventListeners = new HashMap<>();eventListeners.put(PROCESS_STARTED.name(), Arrays.asList(processStartListener));eventListeners.put(TASK_CREATED.name(), Arrays.asList(taskCreateListener, taskToEsListener));eventListeners.put(TASK_COMPLETED.name(), Arrays.asList(taskToEsListener));eventListeners.put(TASK_ASSIGNED.name(), Arrays.asList(taskToEsListener));engineConf.setTypedEventListeners(eventListeners);return engineConf.buildProcessEngine();}
}
7.3、创建查询服务
任务数据写入Elasticsearch后,即可创建查询服务来实现任务的综合查询。例如,实现按人查询待办任务的代码如下:
@Component
@Slf4j
public class ElasticSearchDocQuery {@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;public List<TaskDoc> queryTasksByUserId(String userId) {List<TaskDoc> ret = new ArrayList<>();try {QueryBuilder builder = QueryBuilders.termQuery("assignee", userId);Query query = new NativeSearchQuery(builder);query.addSort(Sort.by(Sort.Direction.DESC, "createTime"));SearchHits<TaskDoc> searchHits = elasticsearchRestTemplate.search(query, TaskDoc.class);for (SearchHit<TaskDoc> searchHit: searchHits.getSearchHits()) {ret.add(searchHit.getContent());}} catch (Exception ex) {log.error("Exception ex", ex);}for (TaskDoc taskDoc: ret) {log.info("任务ID:{},工作流引擎:{}", taskDoc.getId(), taskDoc.getEngine());}return ret;}
}
IdGenerator());Map<String, List<FlowableEventListener>> eventListeners = new HashMap<>();eventListeners.put(PROCESS_STARTED.name(), Arrays.asList(processStartListener));eventListeners.put(TASK_CREATED.name(), Arrays.asList(taskCreateListener, taskToEsListener));eventListeners.put(TASK_COMPLETED.name(), Arrays.asList(taskToEsListener));eventListeners.put(TASK_ASSIGNED.name(), Arrays.asList(taskToEsListener));engineConf.setTypedEventListeners(eventListeners);return engineConf.buildProcessEngine();}
}
7.3、创建查询服务
任务数据写入Elasticsearch后,即可创建查询服务来实现任务的综合查询。例如,实现按人查询待办任务的代码如下:
@Component
@Slf4j
public class ElasticSearchDocQuery {@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;public List<TaskDoc> queryTasksByUserId(String userId) {List<TaskDoc> ret = new ArrayList<>();try {QueryBuilder builder = QueryBuilders.termQuery("assignee", userId);Query query = new NativeSearchQuery(builder);query.addSort(Sort.by(Sort.Direction.DESC, "createTime"));SearchHits<TaskDoc> searchHits = elasticsearchRestTemplate.search(query, TaskDoc.class);for (SearchHit<TaskDoc> searchHit: searchHits.getSearchHits()) {ret.add(searchHit.getContent());}} catch (Exception ex) {log.error("Exception ex", ex);}for (TaskDoc taskDoc: ret) {log.info("任务ID:{},工作流引擎:{}", taskDoc.getId(), taskDoc.getEngine());}return ret;}
}
项目完整源码
源码地址