当前位置: 首页 > news >正文

尚硅谷rabbitmq 2024 流式队列2024指定偏移量 第55节答疑

rabbitmq的stream:
4、对比
autoTrackingstrategy方式:始终监听Stream中的新消息(狗狗看家,忠于职守)指定偏移量方式:针对指定偏移量的消息消费之后就停止(狗狗叼飞盘,回来就完)
这两种分别怎么写?java


在 RabbitMQ 中,Stream 是一种新的消息传递机制,允许高吞吐量和低延迟的消息传递。你提到的两种消费方式分别是自动跟踪策略(Auto Tracking Strategy)和指定偏移量(Offset)的消费方式。下面我们将分别演示这两种方式的 Java 实现。

### 1. 自动跟踪策略(Auto Tracking Strategy)

这种方式下,消费者会一直监听 Stream 中的新消息,并不断处理这些消息。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class AutoTrackingConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.first()) // 从第一个消息开始
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAsBinary());
                System.out.println("Received message: " + body);
            })
            .build();

        // 消费者会一直运行,监听新的消息
        System.out.println("Consumer is listening for new messages...");
    }
}
```

### 2. 指定偏移量(Offset)的消费方式

这种方式下,消费者会从指定的偏移量开始消费消息,并在消费完这些消息后停止。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class OffsetConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";
        long offset = 10; // 指定偏移量

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始
            .messageHandler((context, message


============
为什么没写.autoTrackingStrategy()?

你提到的 `.autoTrackingStrategy()` 是 RabbitMQ Stream API 中的一个重要方法,它用于指定消费者的自动跟踪策略。让我们详细解释一下如何在代码中使用这个方法。

### 自动跟踪策略(Auto Tracking Strategy)

自动跟踪策略允许消费者始终监听 Stream 中的新消息,并自动跟踪消息的偏移量。这种方式下,消费者会持续消费新消息,而不会停止。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class AutoTrackingConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.first()) // 从第一个消息开始
            .autoTrackingStrategy() // 启用自动跟踪策略
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAsBinary());
                System.out.println("Received message: " + body);
            })
            .build();

        // 消费者会一直运行,监听新的消息
        System.out.println("Consumer is listening for new messages...");
    }
}
```

### 指定偏移量(Offset)的消费方式

这种方式下,消费者会从指定的偏移量开始消费消息,并在消费完这些消息后停止。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class OffsetConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";
        long offset = 10; // 指定偏移量

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始
            .messageHandler((context, message) -> {


============
            .offset(OffsetSpecification.first()) // 从第一个消息开始
            .autoTrackingStrategy() /
-----这里指定了 .autoTrackingStrategy() /就不用指定.offset(OffsetSpecification.first())了吧?


确实,如果你使用 `.autoTrackingStrategy()`,那么你不需要再指定 `.offset(OffsetSpecification.first())`,因为自动跟踪策略会自动从最新的消息开始消费并持续监听新的消息。让我们分别看看两种方式的完整代码示例。

### 自动跟踪策略(Auto Tracking Strategy)

这种方式下,消费者会自动从最新的消息开始消费,并持续监听新的消息。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.ConsumerBuilder;

public class AutoTrackingConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .autoTrackingStrategy() // 启用自动跟踪策略
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAsBinary());
                System.out.println("Received message: " + body);
            })
            .build();

        // 消费者会一直运行,监听新的消息
        System.out.println("Consumer is listening for new messages...");
    }
}
```

### 指定偏移量(Offset)的消费方式

这种方式下,消费者会从指定的偏移量开始消费消息,并在消费完这些消息后停止。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class OffsetConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";
        long offset = 10; // 指定偏移量

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAsBinary());
                System.out.println("Received message: " + body);
           


============
那里之前为什么要写:            .offset(OffsetSpecification.first()) // 从第一个消息开始
            .autoTrackingStrategy() /

你说得对,如果你使用 `.autoTrackingStrategy()`,就不需要再指定 `.offset(OffsetSpecification.first())`。这是一个错误的示例。使用 `.autoTrackingStrategy()` 时,消费者会自动从最新的消息开始消费,并持续监听新的消息。

让我们修正并简化这两种方式的示例代码:

### 自动跟踪策略(Auto Tracking Strategy)

这种方式下,消费者会自动从最新的消息开始消费,并持续监听新的消息。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.ConsumerBuilder;

public class AutoTrackingConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .autoTrackingStrategy() // 启用自动跟踪策略
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAsBinary());
                System.out.println("Received message: " + body);
            })
            .build();

        // 消费者会一直运行,监听新的消息
        System.out.println("Consumer is listening for new messages...");
    }
}
```

### 指定偏移量(Offset)的消费方式

这种方式下,消费者会从指定的偏移量开始消费消息,并在消费完这些消息后停止。

#### 示例代码

```java
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ConsumerBuilder;

public class OffsetConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder().host("localhost").build();

        String streamName = "test_stream";
        long offset = 10; // 指定偏移量

        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
            .stream(streamName)
            .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始
            .messageHandler((context, message) -> {
                String body = new String(message.getBodyAs

http://www.lryc.cn/news/459186.html

相关文章:

  • NSSCTF-WEB-pklovecloud
  • 深入Postman- 自动化篇
  • react-JSX
  • 深度对比:IPguard与Ping32在企业网络管理中的应用
  • AI测试之 TestGPT
  • JavaEE-进程与线程
  • JAVA软开-面试经典问题(6)-equals与hashcode方法
  • 计算机网络(以Linux讲解)
  • 计算机网络基本架构知识点
  • GES DISC 的 ATMOS L2 潜在温度网格上的痕量气体,固定场格式 V3 (ATMOSL2TF)
  • MLCC贴片电容不同材质区别:【及电容工作原理】
  • Word粘贴时出现“文件未找到:MathPage.WLL”的解决方案
  • 前端开发笔记--html 黑马程序员1
  • ARM/Linux嵌入式面经(四四):华星光电
  • 帮助,有奖提问
  • Java编辑工具IDEA
  • 闲谈Promise
  • 【C++堆(优先队列)】1882. 使用服务器处理任务|1979
  • VBA高级应用30例应用3Excel中的ListObject对象:选择表的一部分
  • C语言-变量
  • linux下位机出现使用TCP socket为0的问题
  • 论文笔记:Prototypical Verbalizer for Prompt-based Few-shot Tuning
  • nn.functional.softmax(X, dim=-1)
  • 【动态规划】子数组系列(上)
  • 字节青训营入门算法题:飞行棋分组
  • # 执行 rpm -qa | grep qq 查询软件安装情况时报错 数据库损坏 db3 error(-30974)
  • 离线服务器上复现G3SR论文实验
  • Android 未来可能支持 Linux 应用,Linux 终端可能登陆 Android 平台
  • PostgreSQL学习笔记十四:PL/Python自定义函数
  • 计算机毕业设计 | springboot商城售后管理系统 购物平台(附源码)