当前位置: 首页 > news >正文

【RabbitMQ】golang客户端教程4——路由(使用direct交换器)

路由

在上一教程中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。

在本教程中,我们将向它添加一个特性-我们将使它能够只订阅消息的一个子集。例如,我们将只能将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

绑定

在前面的示例中,我们已经在创建绑定。你可能会想起以下代码:

err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil)

绑定是交换器和队列之间的关系。这可以简单地理解为:队列对来自此交换器的消息感兴趣。

绑定可以采用额外的routing_key参数。为了避免与Channel.Publish参数混淆,我们将其称为binding key。这是我们如何使用键创建绑定的方法:

err = ch.QueueBind(q.Name,    // queue name"black",   // routing key"logs",    // exchangefalse,nil)

绑定密钥的含义取决于交换器的类型。我们以前使用的fanout交换器只是忽略了这个值。

直连交换器

我们上一个教程中的日志系统将所有消息广播给所有消费者。我们希望扩展这一点,允许根据消息的严重性过滤消息。例如,我们可能希望将日志消息写入磁盘的脚本只接收严重错误,而不会在warning或info日志消息上浪费磁盘空间。

我们使用fanout交换器,这并没有给我们很大的灵活性——它只能进行无脑广播。

我们将使用direct交换器。direct交换器背后的路由算法很简单——消息进入其binding key与消息的routing key完全匹配的队列。

为了说明这一点,请考虑以下设置:
在这里插入图片描述
在此设置中,我们可以看到绑定了两个队列的direct交换器X。第一个队列绑定键为orange,第二个队列绑定为两个,一个绑定键为black,另一个为green

在这种设置中,使用orange路由键发布到交换器的消息将被路由到队列Q1。路由键为blackgreen的消息将转到Q2。所有其他消息将被丢弃。

多重绑定

在这里插入图片描述
用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以使用绑定键blackXQ1之间添加绑定。在这种情况下,direct交换器的行为将类似fanout,并将消息广播到所有匹配的队列。带有black路由键的消息将同时传递给Q1Q2

发送日志

我们将在日志系统中使用这个模型。我们将发送消息到direct交换器,而不是fanout。我们将提供严重性(译注:通常我们使用日志级别划分日志信息的严重性)作为路由键。这样,接收脚本将能够选择其想要接收的日志级别。让我们首先关注发送日志。

与往常一样,我们需要首先创建一个交换器:

err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments
)

我们已经准备好发送一条消息:

err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments
)
failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)
err = ch.Publish("logs_direct",         // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),
})

为了简化问题,我们假设“严重性”可以是“info”“warning”“error”之一。

订阅

接收消息的工作方式与上一教程一样,但有一个例外——我们将为感兴趣的每种严重性(日志级别)创建一个新的绑定。

q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments
)
failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)
}
// 建立多个绑定关系
for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name,        // queue names,             // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")
}

完整示例

在这里插入图片描述
emit_log_direct.go脚本的代码:

package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs_direct",         // exchangeseverityFrom(os.Args), // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}func bodyFrom(args []string) string {var s stringif (len(args) < 3) || os.Args[2] == "" {s = "hello"} else {s = strings.Join(args[2:], " ")}return s
}func severityFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "info"} else {s = os.Args[1]}return s
}

receive_logs_direct.go的代码:

package mainimport ("log""os""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)failOnError(err, "Failed to declare a queue")if len(os.Args) < 2 {log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])os.Exit(0)}for _, s := range os.Args[1:] {log.Printf("Binding queue %s to exchange %s with routing key %s",q.Name, "logs_direct", s)err = ch.QueueBind(q.Name,        // queue names,             // routing key"logs_direct", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")}msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto ackfalse,  // exclusivefalse,  // no localfalse,  // no waitnil,    // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

如果你只想将“warning”和“err”(而不是“info”)级别的日志消息保存到文件中,只需打开控制台并输入:

go run receive_logs_direct.go warning error > logs_from_rabbit.log

如果你想在屏幕上查看所有日志消息,请打开一个新终端并执行以下操作:

go run receive_logs_direct.go info warning error
# => [*] Waiting for logs. To exit press CTRL+C

例如,要发出error日志消息,只需输入:

go run emit_log_direct.go error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

源自:https://www.rabbitmq.com/getstarted.html

http://www.lryc.cn/news/111590.html

相关文章:

  • Shell脚本学习-for循环结构2
  • vue 老项目 npm install 报错Python,c++等相关错误
  • 【c语言初级】c++基础
  • idea打开传统eclipse项目
  • 全国各城市-财政收入-一般预算收入-各项税收-个人所得税(1999-2020年)
  • 【动态网页抓取】 :用Python抓取所有内容的指南
  • Spring Boot数据访问基础知识与JDBC简单实现
  • ubuntu添加万能头文件
  • 聊一聊关于前端语法 ?? 的那些事
  • 宝塔Linux面板升级“获取更新包失败”怎么解决?
  • 训练强化学习的经验回放策略:experience replay
  • uniapp学习
  • 机器学习深度学习——数值稳定性和模型化参数(详细数学推导)
  • layui 整合UEditor 百度编辑器
  • 1、sparkStreaming概述
  • 【Spring Boot】Spring Boot 集成 RocketMQ 实现简单的消息发送和消费
  • uniapp:图片验证码检验问题处理
  • 将Visio和Excel导出成没有白边的PDF文件
  • String类及其工具类
  • 踩坑(5)整合kafka 报错 java.net.UnknownHostException: 不知道这样的主机
  • rust持续学习 get_or_insert_with
  • 卡尔曼滤波 | Matlab实现无迹kalman滤波仿真
  • C++---list常用接口和模拟实现
  • [openCV]基于赛道追踪的智能车巡线方案V1
  • SpringIoc-个人学习笔记
  • 【一文搞懂泛型】
  • 概念解析 | 利用MIMO雷达技术实现高性能目标检测的关键技术解析
  • Grafana制作图表-自定义Flink监控图表
  • 【TypeScript】初识TypeScript和变量类型介绍
  • 阿里云瑶池 PolarDB 开源官网焕新升级上线