当前位置: 首页 > news >正文

go消息队列RabbitMQ - 订阅模式-direct

1.发布订阅

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

2.绑定

绑定可以采用额外的routing_key参数。为了避免与Channel.Publish参数混淆,我们将其称为binding key。这是我们如何使用键创建绑定的方法:

err = ch.QueueBind(q.Name,    // queue name"black",   // routing key"logs",    // exchangefalse,nil)

 3.直连交换器

direct交换器背后的路由算法很简单——消息进入其binding key与消息的routing key完全匹配的队列。

direct-exchang

绑定了两个队列的direct交换器X。第一个队列绑定键为orange,第二个队列绑定为两个,一个绑定键为black,另一个为green

在这种设置中,使用orange路由键发布到交换器的消息将被路由到队列Q1。路由键为blackgreen的消息将转到Q2。所有其他消息将被丢弃。

4.多重绑定

direct-exchange-multiple

用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以使用绑定键blackXQ1之间添加绑定。在这种情况下,direct交换器的行为将类似fanout,并将消息广播到所有匹配的队列。带有black路由键的消息将同时传递给Q1Q2

5.发送日志

在日志系统中使用这个模型,发送消息到direct交换器。这样,接收脚本将能够选择其想要接收的日志级别。

先创建一个direct交换器:

err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments
)

指定routing key发送一条消息:

body := bodyFrom(os.Args)
err = ch.Publish("logs_direct",         // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),
})

为了简化问题,我们假设“严重性”可以是“info”、“warning”、“error”之一。

6.订阅

为感兴趣的每种严重性(日志级别)创建一个新的绑定。绑定的routing key通过os.Args获取

q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments
)
failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)
}
// 建立多个绑定关系
for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name,        // queue names,             // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")
}

7.完整示例

img

emit_log_direct.go脚本的代码:

package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs_direct",         // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}func bodyFrom(args []string) string {var s stringif (len(args) < 3) || os.Args[2] == "" {s = "hello"} else {s = strings.Join(args[2:], " ")}return s
}func severityFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "info"} else {s = os.Args[1]}return s
}

receive_logs_direct.go的代码:

package mainimport ("log""os""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)}for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name,        // queue names,             // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")}msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto ackfalse,  // exclusivefalse,  // no localfalse,  // no waitnil,    // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

如果你只想将“warning”和“err”(而不是“info”)级别的日志消息保存到文件中,只需打开控制台并输入:

go run receive_logs_direct.go warning error > logs_from_rabbit.log

如果你想在屏幕上查看所有日志消息,请打开一个新终端并执行以下操作:

go run receive_logs_direct.go info warning error
# => [*] Waiting for logs. To exit press CTRL+C

例如,要发出error日志消息,只需输入:

go run emit_log_direct.go error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

(这里是(emit_log_direct.go)和(receive_logs_direct.go)的完整源码)

参考文章:

go rabbitmq Routing模式 - 范斯猫 (fansimao.com)

RabbitMQ Go语言客户端教程4——路由 - 范斯猫 (fansimao.com)

 

http://www.lryc.cn/news/294022.html

相关文章:

  • PyTorch 2.2 中文官方教程(十八)
  • jenkins部署vue项目
  • 十一、C++核心编程(2)引用
  • numpy学习总结二
  • 3 编辑器(Vim)
  • C/C++ (stdio.h)标准库详解
  • 深度学习介绍
  • ywtool dhcp命令
  • ChatGPT高效提问—基础知识(LM、PLM以及LLM)
  • MongoDB复制集实战及原理分析
  • Java并发之synchronized详解
  • Flask 项目自动生成 API 文档的高效实践
  • WebChat——一个开源的聊天应用
  • 【Linux系统 01】Vim工具
  • Oracle 面试题 | 09.精选Oracle高频面试题
  • 基于Springboot的校园失物招领网站(有报告)。Javaee项目,springboot项目。
  • WPF布局面板
  • 灵活应对:策略模式在软件设计中的应用
  • eosio.token 智能合约介绍
  • 3D 转换
  • AI智能语音机器人安装方法
  • Python 潮流周刊#38:Django + Next.js 构建全栈项目
  • Jenkins升级后,构建任务配置界面重复错位
  • Python基础学习 -07 运算符
  • Nim游戏
  • Pytorch: torch.linspace等间隔数值函数
  • 【C++】案例:数列求和 与 条件筛选
  • 问题:下列哪些属于历史文化资源的特征( ). #学习方法#学习方法
  • 大数据 - Spark系列《四》- Spark分布式运行原理
  • Java使用规范