Apache Camel 简介
相关文档地址
https://camel.apache.org/components/next/index.htmlhttps://camel.apache.org/components/4.10.x/languages/simple-language.htmlhttps://camel.apache.org/manual/exception-clause.htmlhttps://camel.apache.org/manual/index.htmlhttps://camel.apache.org/components/next/http-component.html
null-safe 运算符算
Apache Camel中的null安全导航操作
在Apache Camel的Simple语言中,处理可能为null的属性或嵌套属性时,有几种方法可以避免空指针异常:
- 使用空安全操作符(?.)
Apache Camel的Simple语言支持null安全操作符(?.),可以防止在访问嵌套属性时出现空指针异常:
${exchangeProperty.allData?[${exchangeProperty.loopCount}]?.['业务二级产出线ID']?[0]?.['number']}
这种语法确保当任何中间值为null时,表达式会安全地返回null而不是抛出异常。
2. 使用空值合并操作符(??)
当你想在遇到null值时提供默认值时,可以使用null合并操作符(??):
${exchangeProperty.allData?[${exchangeProperty.loopCount}]?.['业务二级产出线ID']?[0]?.['number'] ?? '默认值'}
这样如果表达式计算结果为null,则会返回指定的默认值。
3. 使用isEmpty或exists函数
在某些情况下,你可能想先检查一个值是否存在,然后再尝试访问它的属性:
${header.foo} == null
${header.foo} != null
- 使用条件表达式
可以使用iif函数来实现条件逻辑:
${iif(exchangeProperty.allData?[${exchangeProperty.loopCount}] != null, exchangeProperty.allData?[${exchangeProperty.loopCount}]?.['业务二级产出线ID']?[0]?.['number'], '默认值')}
- 处理Map和List类型
对于Map或List类型的数据,Camel支持索引和键访问,同样可以使用null安全操作符:
${body.address?.lines?[0]}
${header.foo?[bar]}
最佳实践
尽量使用空安全操作符:在访问嵌套属性时,始终使用?.操作符来避免空指针异常。
提供默认值:在可能的情况下,使用??操作符为可能为null的表达式提供默认值。
简化复杂表达式:对于非常复杂的表达式,可以考虑将其分解为多个步骤,或使用Bean组件在Java代码中处理复杂的null处理逻辑。
使用变量存储中间结果:对于复杂的表达式,可以使用变量存储中间结果,然后在后续步骤中使用这些变量。
在你的具体案例中,可以这样修改表达式:
${exchangeProperty.allData?[${exchangeProperty.loopCount}]?.['业务二级产出线ID']?[0]?.['number'] ?? ''}
这样当任何嵌套属性为null时,表达式会安全地返回空字符串而不是抛出异常。
setVariable、setProperty、setHeader
- setVariable
变量范围:变量(Variable)是当前交换(Exchange)的一部分,但与Header和Property不同,它不会被复制到消息中
生命周期:变量的生命周期仅限于当前Exchange的处理过程
访问方式:通过${variable.xxx}访问
用途:适合作为路由内部使用的临时数据,不会影响到消息本身
- setProperty(或setExchangeProperty)
属性范围:属性(Property)是附加到Exchange上的,在整个消息处理过程中都可用
生命周期:属性在整个Exchange生命周期内持续存在,即使跨多个路由和端点
访问方式:通过${exchangeProperty.xxx}访问
用途:适合存储需要在整个消息处理流程中使用的数据,如跨路由共享的上下文信息
- setHeader
作用范围:Header是消息(Message)级别的,附加在消息上而不是Exchange上
生命周期:Header随消息一起传递,当消息被传递到另一个端点时,Header通常也会被传递(取决于端点实现)
访问方式:通过${header.xxx}访问
端点交互:许多组件(如HTTP、JMS、Kafka等)会读取特定的Header来控制行为
重要特性:当使用EIP(如split、multicast)时,新创建的Exchange会从原始Exchange中复制Header
特性 | setHeader | setVariable | setProperty |
---|---|---|---|
作用范围 | 消息级别 | Exchange级别,但不复制 | Exchange级别 |
生命周期 | 随消息传递,可能被端点消费 | 仅当前Exchange处理过程 | 整个Exchange生命周期 |
是否传递给目标系统 | 是(通常) | 否 | 否 |
跨路由可见性 | 可能会变化(消息转换时) | 仅在当前Exchange中可见 | 在整个Exchange中可见 |
当前时间
date:now如果不指定格式模式,{date:now}
如果不指定格式模式,date:now如果不指定格式模式,{date:now}会返回一个默认格式的日期时间字符串。
默认情况下,Apache Camel会使用java.util.Date对象的toString()方法的格式,即类似于:
Wed Apr 24 10:15:30 CST 2024
${date:now:yyyy-MM-dd}
可以根据需要调整日期时间格式。
例如,如果只需要日期部分,可以使用yyyy-MM-dd格式。
Simple语言中的时间偏移支持的单位:
h:小时
m:分钟
s:秒
注意事项:
Simple语言的时间偏移语法使用+和-操作符表示时间的增减
可以组合多个时间单位,如now+1h30m(当前时间加1小时30分钟) date:now+1h30m在路由中使用时,需要将表达式放在{date:now+1h30m}
在路由中使用时,需要将表达式放在date:now+1h30m在路由中使用时,需要将表达式放在{}内部
Apache Camel onException 完整使用指南
概述
在Apache Camel中,onException
、onWhen
、retryWhile
和redeliveryPolicy
可以组合使用来实现复杂的异常处理逻辑。本文档详细说明如何同时使用这些功能。
核心组件说明
1. onException
- 作用: 定义异常处理器,捕获特定类型的异常
- 语法:
<onException><exception>异常类型</exception></onException>
2. onWhen
- 作用: 条件判断,决定是否处理特定的异常
- 语法:
<onWhen><simple>条件表达式</simple></onWhen>
3. retryWhile
- 作用: 定义重试条件,决定是否继续重试
- 语法:
<retryWhile><simple>重试条件</simple></retryWhile>
4. redeliveryPolicy
- 作用: 定义重试策略,包括重试次数、延迟时间等
- 语法:
<redeliveryPolicy>重试策略配置</redeliveryPolicy>
组合使用示例
基本语法结构
<onException><!-- 1. 定义异常类型 --><exception>java.lang.Exception</exception><!-- 2. 使用onWhen条件判断 --><onWhen><simple>${exception.message} contains 'retryable'</simple></onWhen><!-- 3. 使用retryWhile定义重试条件 --><retryWhile><simple>${header.CamelRedeliveryCounter} < 3</simple></retryWhile><!-- 4. 应用重试策略 --><redeliveryPolicy><maximumRedeliveries>3</maximumRedeliveries><redeliveryDelay>1000</redeliveryDelay><backOffMultiplier>2.0</backOffMultiplier><useExponentialBackOff>true</useExponentialBackOff></redeliveryPolicy><!-- 5. 异常处理逻辑 --><setHeader headerName="ErrorType"><simple>${exception.class.simpleName}</simple></setHeader><handled><constant>true</constant></handled>
</onException>
详细配置说明
1. 异常类型捕获
<!-- 捕获单个异常类型 -->
<exception>java.lang.RuntimeException</exception><!-- 捕获多个异常类型 -->
<exception>java.net.ConnectException</exception>
<exception>java.net.SocketTimeoutException</exception>
<exception>java.net.UnknownHostException</exception>
2. onWhen条件判断
<!-- 基于异常消息内容判断 -->
<onWhen><simple>${exception.message} contains 'retryable'</simple>
</onWhen><!-- 基于异常类型判断 -->
<onWhen><simple>${exception.class.simpleName} == 'BusinessException'</simple>
</onWhen><!-- 复合条件判断 -->
<onWhen><simple>${exception.message} contains 'business' or ${exception.message} contains 'validation'</simple>
</onWhen><!-- 总是处理 -->
<onWhen><constant>true</constant>
</onWhen>
3. retryWhile重试条件
<!-- 基于重试次数判断 -->
<retryWhile><simple>${header.CamelRedeliveryCounter} < 3</simple>
</retryWhile><!-- 基于异常消息和重试次数判断 -->
<retryWhile><simple>${header.CamelRedeliveryCounter} < 5 and ${exception.message} contains 'business'</simple>
</retryWhile><!-- 基于自定义逻辑判断 -->
<retryWhile><simple>${header.CamelRedeliveryCounter} < 10 and ${exception.message} contains 'connection'</simple>
</retryWhile>
4. redeliveryPolicy重试策略
<redeliveryPolicy><!-- 最大重试次数 --><maximumRedeliveries>3</maximumRedeliveries><!-- 重试延迟时间(毫秒) --><redeliveryDelay>1000</redeliveryDelay><!-- 退避倍数 --><backOffMultiplier>2.0</backOffMultiplier><!-- 使用指数退避 --><useExponentialBackOff>true</useExponentialBackOff><!-- 日志配置 --><logRetryAttempted>true</logRetryAttempted><logRetryStackTrace>true</logRetryStackTrace><logHandled>true</logHandled><logNewException>true</logNewException><logExhausted>true</logExhausted><logExhaustedMessageHistory>true</logExhaustedMessageHistory>
</redeliveryPolicy>
实际应用场景
1. 全局异常处理
<onException><exception>java.lang.Exception</exception><onWhen><simple>${exception.message} contains 'retryable' or ${exception.message} contains 'temporary'</simple></onWhen><retryWhile><simple>${header.CamelRedeliveryCounter} < 3 and (${exception.message} contains 'retryable' or ${exception.message} contains 'temporary')</simple></retryWhile><redeliveryPolicy><maximumRedeliveries>3</maximumRedeliveries><redeliveryDelay>1000</redeliveryDelay><backOffMultiplier>2.0</backOffMultiplier><useExponentialBackOff>true</useExponentialBackOff></redeliveryPolicy><handled><constant>true</constant></handled>
</onException>
2. 业务异常处理
<onException><exception>java.lang.RuntimeException</exception><onWhen><simple>${exception.message} contains 'business' or ${exception.message} contains 'validation'</simple></onWhen><redeliveryPolicy><maximumRedeliveries>5</maximumRedeliveries><redeliveryDelay>2000</redeliveryDelay><backOffMultiplier>1.5</backOffMultiplier><useExponentialBackOff>true</useExponentialBackOff></redeliveryPolicy><retryWhile><simple>${header.CamelRedeliveryCounter} < 5 and ${exception.message} contains 'business'</simple></retryWhile><to uri="activemq:dead.letter.queue"/><handled><constant>true</constant></handled>
</onException>
3. 网络异常处理
<onException><exception>java.net.ConnectException</exception><exception>java.net.SocketTimeoutException</exception><exception>java.net.UnknownHostException</exception><onWhen><constant>true</constant></onWhen><redeliveryPolicy><maximumRedeliveries>10</maximumRedeliveries><redeliveryDelay>5000</redeliveryDelay><backOffMultiplier>2.0</backOffMultiplier><useExponentialBackOff>true</useExponentialBackOff></redeliveryPolicy><retryWhile><simple>${header.CamelRedeliveryCounter} < 10</simple></retryWhile><handled><constant>true</constant></handled>
</onException>
重要变量说明
异常相关变量
${exception}
- 当前异常对象${exception.message}
- 异常消息${exception.class.simpleName}
- 异常类名
重试相关变量
${header.CamelRedeliveryCounter}
- 当前重试次数${header.CamelRedeliveryMaxCounter}
- 最大重试次数${header.CamelFailureEndpoint}
- 失败的端点
消息相关变量
${body}
- 消息体${headers}
- 消息头${exchangeId}
- 交换ID
最佳实践
1. 异常处理顺序
- 将具体的异常处理放在前面
- 将通用的异常处理放在后面
2. 重试策略设计
- 根据异常类型设置不同的重试策略
- 使用指数退避避免系统过载
- 设置合理的最大重试次数
3. 日志记录
- 启用重试日志记录
- 记录异常堆栈信息
- 记录重试历史
4. 死信队列
- 对于无法重试的异常,发送到死信队列
- 实现死信队列处理逻辑
测试示例
<route id="testRoute"><from uri="direct:test"/><choice><when><simple>${body} == 'retryable-error'</simple><throwException exceptionType="java.lang.RuntimeException" message="模拟可重试的临时故障"/></when><when><simple>${body} == 'business-error'</simple><throwException exceptionType="java.lang.RuntimeException" message="模拟业务异常"/></when><when><simple>${body} == 'network-error'</simple><throwException exceptionType="java.net.ConnectException" message="模拟网络连接异常"/></when><otherwise><setBody><simple>处理成功: ${body}</simple></setBody></otherwise></choice><to uri="mock:result"/>
</route>
总结
通过组合使用onException
、onWhen
、retryWhile
和redeliveryPolicy
,可以实现:
- 精确的异常捕获 - 通过
onWhen
条件判断 - 灵活的重试逻辑 - 通过
retryWhile
自定义重试条件 - 可控的重试策略 - 通过
redeliveryPolicy
配置重试参数 - 完善的异常处理 - 通过异常处理器处理捕获的异常
这种组合使用方式能够满足复杂业务场景下的异常处理需求,提高系统的可靠性和稳定性。
onWhen的作用机制
- onWhen=true时
异常会被onException处理器捕获
会执行redeliveryPolicy定义的重试策略
会执行retryWhile定义的重试条件判断
会执行异常处理逻辑(如setHeader、process等) - onWhen=false时
异常不会被onException处理器捕获
不会执行重试逻辑
不会执行异常处理逻辑
异常会继续向上传播,可能被其他异常处理器捕获或导致路由失败
maximumRedeliveries 和 retryWhile
- maximumRedeliveries=0 - 无论retryWhile如何设置,都不会重试
- retryWhile=false - 无论maximumRedeliveries如何设置,都不会重试
- 优先级关系 - 两个条件是"与"的关系,不是"或"的关系
- 两个条件都必须满足 - 只有maximumRedeliveries>0且retryWhile=true时才会重试
重试逻辑的优先级
Apache Camel的重试逻辑遵循以下优先级:
- onWhen - 首先判断是否处理此异常
- maximumRedeliveries - 检查是否允许重试
- retryWhile - 检查是否满足重试条件
这种设计确保了重试行为的可控性和可预测性,避免了意外的重试行为。
Apache Camel异常处理器注册机制:
- 处理器注册时机:onException处理器在路由定义时就会注册到异常处理链中,与它们在路由中的位置无关
- 匹配顺序:Camel会按照onException在路由中定义的顺序进行检查
- 第一个匹配的处理器生效:找到匹配的处理器后立即执行并停止查找
Apache Camel onCompletion 使用说明
概述
onCompletion
是Apache Camel中用于处理路由完成事件的机制,类似于onException
,但处理的是路由正常完成或异常完成后的清理工作。
onCompletion vs onException 对比
相似点
- 注册机制:两者都在路由定义时注册到处理链中
- 位置无关性:在路由中的定义位置不影响其生效
- 支持条件判断:都可以使用
onWhen
条件 - 处理器引用:都可以使用
<process ref="xxx">
引用处理器
不同点
特性 | onException | onCompletion |
---|---|---|
触发时机 | 异常发生时 | 路由完成时(成功或失败) |
处理对象 | 异常对象 | 完成事件 |
重试机制 | 支持重试策略 | 不支持重试 |
条件类型 | 异常类型匹配 | 完成状态匹配 |
使用场景 | 异常处理 | 清理工作、日志记录 |
onCompletion 基本语法
<onCompletion><process ref="completionProcessor"/>
</onCompletion><!-- 带条件的onCompletion -->
<onCompletion><onWhen><simple>${exchangeProperty.CamelRouteStop} == true</simple></onWhen><process ref="stopCompletionProcessor"/>
</onCompletion><!-- 成功完成时的处理 -->
<onCompletion><onCompleteOnly><process ref="successCompletionProcessor"/></onCompleteOnly>
</onCompletion><!-- 失败完成时的处理 -->
<onCompletion><onFailureOnly><process ref="failureCompletionProcessor"/></onFailureOnly>
</onCompletion>
位置影响分析
关键结论:位置不影响功能
与onException
类似,onCompletion
在路由中的位置不会影响其功能,因为:
- 注册时机:在路由启动时就已经注册到完成处理链中
- 执行时机:在路由完成时触发,与定义位置无关
- 作用范围:对整个路由生效
示例验证
<route id="testRoute"><from uri="direct:start"/><!-- 业务逻辑 --><log message="开始处理"/><process ref="businessProcessor"/><!-- onCompletion定义在中间 --><onCompletion><process ref="completionProcessor"/></onCompletion><!-- 更多业务逻辑 --><to uri="mock:result"/><!-- 路由结束<!-- 即使onCompletion定义在中间,仍然会在路由完成时执行 -->
</route>
常用配置模式
1. 基本完成处理
<onCompletion><log message="路由执行完成: ${exchangeId}"/><process ref="cleanupProcessor"/>
</onCompletion>
2. 条件完成处理
<onCompletion><onWhen><simple>${header.ProcessType} == 'batch'</simple></onWhen><log message="批处理完成,执行清理"/><process ref="batchCleanupProcessor"/>
</onCompletion>
3. 成功/失败分别处理
<!-- 成功完成处理 -->
<onCompletion><onCompleteOnly><log message="路由执行成功"/><setHeader headerName="CompletionStatus"><constant>SUCCESS</constant></setHeader><process ref="successProcessor"/></onCompleteOnly>
</onCompletion><!-- 失败完成处理 -->
<onCompletion><onFailureOnly><log message="路由执行失败"/><setHeader headerName="CompletionStatus"><constant>FAILURE</constant></setHeader><process ref="failureProcessor"/></onFailureOnly>
</onCompletion>
4. 资源清理
<onCompletion><log message="开始清理资源"/><!-- 清理临时文件 --><setHeader headerName="TempFile"><simple>${exchangeProperty.tempFile}</simple></setHeader><process ref="fileCleanupProcessor"/><!-- 清理数据库连接 --><process ref="dbCleanupProcessor"/><!-- 记录完成时间 --><setHeader headerName="CompletionTime"><simple>${date:now:yyyy-MM-dd HH:mm:ss}</simple></setHeader><log message="资源清理完成"/>
</onCompletion>
实际应用示例
1. 工作流完成处理
<route id="workflowRoute"><from uri="direct:workflow"/><onCompletion><process ref="workflowCompletionProcessor"/></onCompletion><!-- 工作流逻辑 --><process ref="workflowProcessor"/><to uri="mock:result"/>
</route>
2. 批处理完成处理
<route id="batchProcessRoute"><from uri="direct:batch"/><onCompletion><onWhen><simple>${header.BatchSize} > 1000</simple></onWhen><log message="大批量处理完成,执行特殊清理"/><process ref="largeBatchCleanupProcessor"/></onCompletion><!-- 批处理逻辑 --><process ref="batchProcessor"/><to uri="mock:result"/>
</route>
3. 事务完成处理
<route id="transactionRoute"><from uri="direct:transaction"/><onCompletion><onCompleteOnly><log message="事务提交成功"/><process ref="commitProcessor"/></onCompleteOnly></onCompletion><onCompletion><onFailureOnly><log message="事务回滚"/><process ref="rollbackProcessor"/></onFailureOnly></onCompletion><!-- 事务逻辑 --><transacted ref="transactionPolicy"/><process ref="transactionProcessor"/><to uri="mock:result"/>
</route>
注意事项
1. 执行顺序
- 多个
onCompletion
处理器按定义顺序执行 - 与
onException
不同,onCompletion
不会停止后续处理器的执行
2. 异常处理
onCompletion
中的异常不会触发onException
处理器- 建议在
onCompletion
中使用try-catch处理异常
3. 性能考虑
onCompletion
会在每次路由完成时执行- 避免在
onCompletion
中执行耗时的操作
4. 资源管理
- 确保在
onCompletion
中正确清理资源 - 避免资源泄漏
总结
onCompletion
是Apache Camel中重要的完成处理机制,与onException
类似,在路由中的位置不影响其功能。它主要用于:
- 资源清理:清理临时文件、数据库连接等
- 日志记录:记录路由执行结果
- 状态更新:更新处理状态
- 通知机制:发送完成通知
通过合理使用onCompletion
,可以确保路由执行后的清理工作得到正确处理,提高系统的稳定性和可维护性。
开发流程
方式一
如果对 Apache Camel 有一定的了解,熟悉 Apache Camel 相关的 xml 语法,可以使用此方式进行开发。
- (非必需)在 processor 中定义相关的处理器,只处理核心的逻辑,并在 CamelConfig 中注册此处理器。
- 在 parse 中对前端传过来的json进行解析,拼接成 Apache Camel 的 xml 代码,如果 xml 不能直接完成需求,则引用步骤1中定义的处理器。
方式二
如果对 Apache Camel 了解程度有限或需要快速完成需求,可以使用此方式进行开发。
- 在 processor 中定义相关的处理器,处理所有的逻辑,并在 CamelConfig 中注册此处理器。
- 在 parse 中对前端传过来的json进行解析,只进行核心的校验,并引用步骤1中定义的处理器。