当前位置: 首页 > news >正文

ActiveMQ(三)

协议配置

ActiveMQ 支持的协议有 TCP 、 UDP、NIO、SSL、HTTP(S) 、VM 这是activemq 的activemq.xml 中配置文件设置协议的地方

<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumCon    nections=1000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnect    ions=1000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConn    ections=1000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnect    ions=1000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnection    s=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

默认是使用 openwire 也就是 tcp 连接
默认的Broker 配置,TCP 的Client 监听端口 61616 ,在网络上传输数据,必须序列化数据,消息是通过一个 write protocol 来序列化为字节流。默认情况 ActiveMQ 会把 wire protocol 叫做 Open Wire ,它的目的是促使网络上的效率和数据快速交互
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  1. NIO 协议为ActiveMQ 提供更好的性能
    适合NIO 使用的场景:
    1 当有大量的Client 连接到Broker 上 , 使用NIO 比使用 tcp 需要更少的线程数量,所以使用 NIO
    2 可能对于 Broker 有一个很迟钝的网络传输, NIO 的性能高于 TCP
    连接形式:
    nio://hostname:port?key=value
    在这里插入图片描述
https://activemq.apache.org/configuring-version-5-transports.html

修改 activemq.xml 使之支持 NIO 协议:

<transportConnectors><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/><transportConnector name="auto+nio+ssl" uri="auto+nio+ssl://0.0.0.0:5671?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/></transportConnectors>

ActiveMQ 的可持久化

将MQ 收到的消息存储到文件、硬盘、数据库 等、 则叫MQ 的持久化,这样即使服务器宕机,消息在本地还是有,仍就可以访问到。
官网 : http://activemq.apache.org/persistence
ActiveMQ 支持的消息持久化机制: 带赋值功能的 LeavelDB 、 KahaDB 、 AMQ 、 JDBC
持久化就是高可用的机制,即使服务器宕机了,消息也不会丢失

AMQ 是文件存储形式,写入快、易恢复 默认 32M 在 ActiveMQ 5.3 之后不再适用
KahaDB : 5.4 之后基于日志文件的持久化插件,默认持久化插件,提高了性能和恢复能力
KahaDB 的属性配置 : http://activemq.apache.org/kahadb
它使用一个事务日志和 索引文件来存储所有的地址
在这里插入图片描述
db-<数字>.log 存储数据,一个存满会再次创建 db-2 db-3 …… ,当不会有引用到数据文件的内容时,文件会被删除或归档
db.data 是一个BTree 索引,索引了消息数据记录的消息,是消息索引文件,它作为索引指向了 db-.log 里的消息
一点题外话:就像mysql 数据库,新建一张表,就有这个表对应的 .MYD 文件,作为它的数据文件,就有一个 .MYI 作为索引文件。
db.free 存储空闲页 ID 有时会被清除
db.redo 当 KahaDB 消息存储在强制退出后启动,用于恢复 BTree 索引
lock 顾名思义就是锁
四类文件+一把锁 ==》 KahaDB

LeavelDB : 希望作为以后的存储引擎,5.8 以后引进,也是基于文件的本地数据存储形式,但是比 KahaDB 更快
它比KahaDB 更快的原因是她不使用BTree 索引,而是使用本身自带的 LeavelDB 索引
题外话:为什么LeavelDB 更快,并且5.8 以后就支持,为什么还是默认 KahaDB 引擎,因为 activemq 官网本身没有定论,LeavelDB 之后又出了可复制的LeavelDB 比LeavelDB 更性能更优越,但需要基于 Zookeeper 所以这些官方还没有定论,任就使用 KahaDB

JDBC : 有一部分数据会真实的存储到数据库中
使用JDBC 的持久化,
①修改配置文件,默认 kahaDB
修改之前:

<persistenceAdapter><kahaDB directory="${activemq.data}/kahadb"/>  </persistenceAdapter>

修改之后:

        <persistenceAdapter><jdbcPersistenceAdapter dataSource="#mysql-xql" createTablesOnStartup="false"/></persistenceAdapter><bean id="mysql-xql" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"><property name="driverClassName" value="com.mysql.jdbc.Driver"/><property name="url" value="jdbc:mysql://xxxxxxx:3307/activemq?relaxAutoCommit=true&amp;useSSL=false&amp;serverTimezone=GMT"/><property name="username" value="root"/><property name="password" value="xxxx"/><property name="poolPreparedStatements" value="true"/></bean>
④  让linux 上activemq 可以访问到 mysql ,之后产生消息。ActiveMQ 启动后会自动在 mysql 的activemq 数据库下创建三张表:activemq_msgs 、activemq_acks、activemq_lock   
activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存
activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker
activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中点对点会在数据库的数据表 ACTIVEMQ_MSGS 中加入消息的数据,且在点对点时,消息被消费就会从数据库中删除  但是对于主题,订阅方式接受到的消息,会在 ACTIVEMQ_MSGS 存储消息,即使MQ 服务器下线,并在 ACTIVEMQ_ACKS 中存储消费者信息 。 并且存储以 activemq 为主,当activemq 中的消息被删除后,数据库中的也会自动被删除。  

在这里插入图片描述
持久化消息是指:
MQ 所在的服务器down 了消息也不会丢失
持久化机制演化过程:
从最初的AMQ Message Store 方案到 ActiveMQ V4版本推出的High performance journal (高性能事务)附件,并且同步推出了关系型数据库的存储方案, ActiveMQ 5.3 版本有推出了KahaDB 的支持,(也是5.4之后的默认持久化方案),后来ActiveMQ 从5.8开始支持LevelDB ,现在5.9 提供了 Zookeeper + LevelDB 的集群化方案。
ActiveMQ 消息持久化机制有:
AMQ 基于日志文件
KahaDB 基于日志文件,5.4 之后的默认持久化
JDBC 基于第三方数据库
LevelDB : 基于文件的本地数据库存储,从5.8 之后推出了LevelDB 性能高于 KahaDB
ReplicatedLevelDB Store 从5.8之后提供了基于LevelDB 和Zookeeper 的数据复制方式,用于Master-slave方式的首数据复制选方案

但是无论使用哪种持久化方式,消息的存储逻辑都一样

小结

  • 1> 引入消息队列后 如何保证高可用性
    持久化、事务、签收、 以及带复制的 Leavel DB + zookeeper 主从集群搭建
  • 2> 异步投递 Async send
    对于一个慢消费者,使用同步有可能造成堵塞,消息消费较慢时适合用异步发送消息
    activemq 支持同步异步 发送的消息,默认异步。当你设定同步发送的方式和 未使用事务的情况下发持久化消息,这时是同步的。
    如果没有使用事务,且发送的是持久化消息,每次发送都会阻塞一个生产者直到 broker 发回一个确认,这样做保证了消息的安全送达,但是会阻塞客户端,造成很大延时 。
    在高性能要求下,可以使用异步提高producer 的性能。但会消耗较多的client 端内存,也不能完全保证消息发送成功。在 useAsyncSend = true 情况下容忍消息丢失。

// 开启异步投递
activeMQConnectionFactory.setUseAsyncSend(true);

在这里插入图片描述

如何在投递快还可以保证消息不丢失 ?
异步发送消息丢失的情况场景是: UseAsyncSend 为 true 使用 producer(send)持续发送消息,消息不会阻塞,生产者会认为所有的 send 消息均会被发送到 MQ ,如果MQ 突然宕机,此时生产者端尚未同步到 MQ 的消息均会丢失 。
故 正确的异步发送方法需要接收回调
同步发送和异步发送的区别就在于 :
同步发送send 不阻塞就代表消息发送成功
异步发送需要接收回执并又客户端在判断一次是否发送

在代码中接收回调的函数 :

activeMQConnectionFactory.setUseAsyncSend(true);……  for (int i = 1; i < 4 ; i++) {textMessage = session.createTextMessage("msg--" + i);textMessage.setJMSMessageID(UUID.randomUUID().toString()+"--  orderr");String msgid = textMessage.getJMSMessageID();messageProducer.send(textMessage, new AsyncCallback() {@Overridepublic void onSuccess() {// 发送成功怎么样System.out.println(msgid+"has been successful send ");}@Overridepublic void onException(JMSException e) {// 发送失败怎么样System.out.println(msgid+" has been failure send ");}});
}    

在这里插入图片描述

long delay = 3 * 1000 ;
long perid = 4 * 1000 ;
int repeat = 7 ;
for (int i = 1; i < 4 ; i++) {TextMessage textMessage = session.createTextMessage("delay msg--" + i);// 消息每过 3 秒投递,每 4 秒重复投递一次 ,一共重复投递 7 次textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,perid);textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);messageProducer.send(textMessage);
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

http://www.lryc.cn/news/43590.html

相关文章:

  • 区块链多方计算 人工智能学习笔记
  • 基于opencv的边缘检测方法
  • 视频封装格式篇(TS)
  • 静态路由+DHCP实验(四路由器八PC)
  • 数据挖掘(作业汇总)
  • 基于微信小程序的图书馆选座系统源码
  • K8S 三种探针 readinessProbe、livenessProbe和startupProbe
  • Android 设置背景颜色透明度
  • 聚类算法层次聚类
  • js 数据类型
  • 多级评论单表结构设计
  • Mac M1通过VMWare Fusion安装Centos7记录(镜像和网络有大坑)
  • 女生适合当程序员吗?
  • 昇腾AI机器人发布,12家企业、5家高校签约,昇腾AI开发者创享日全国巡展沈阳首站成功举办
  • anaconda如何改变虚拟环境安装路径
  • 根据卫星运动矢量计算轨道六根数
  • 关于微信小程序安装npm的过程,从下载到小程序内部安装完成
  • IO-操作系统
  • Downie 4 4.6.12 MAC上最好的一款视频下载工具
  • unity 玩家移动时idle和run动画频繁切换
  • 小程序 table组件
  • 利用摄影测量进行地形建模的介绍
  • 中文代码138
  • JQuery用法
  • Python采集热门城市景点数据+简单制作数据可视化图
  • VUE-cli搭建项目
  • Feign返回值统一处理
  • 探究如何在Linux系统中修改进程资源限制:四种方法调整进程限制,让你的系统高效运行(包含应用层getrlimit和setrlimit API)
  • 9.5. 机器翻译与数据集
  • 跟着凯新生物2 Arm PEG Biotin,2-Branched PEG Biotin,生物素-聚乙二醇-二臂/支,学试剂知识