当前位置: 首页 > article >正文

详解一下RabbitMQ中的channel.Publish

函数定义(来自 github.com/streadway/amqp)

func (ch *Channel) Publish(exchange string,key string,mandatory bool,immediate bool,msg Publishing,
) error

这个方法的作用是:向指定的交换机 exchange 发送一条消息 msg,带上路由键 key。

参数名类型含义
exchangestring指定要将消息发布到哪个 交换机(exchange)。可以是 “” 表示默认交换机。
keystring路由键(routing key),根据交换机类型决定消息怎么路由。
mandatorybool是否强制投递。若为 true 且无法路由到队列,则会触发 Basic.Return(需要监听返回)。
immediatebool是否立即投递。很少使用,RabbitMQ 通常不支持,建议设为 false
msgPublishing消息体及其元数据(headers、content-type、body等)

📦 msg(Publishing)结构

type Publishing struct {ContentType     stringContentEncoding stringDeliveryMode    uint8 // 1=非持久化 2=持久化Priority        uint8CorrelationId   stringReplyTo         stringExpiration      stringMessageId       stringTimestamp       time.TimeType            stringUserId          stringAppId           stringBody            []byteHeaders         Table
}

常用字段:
Body: 消息内容([]byte)
ContentType: 比如 “application/json”,“text/plain”
DeliveryMode: 2 表示持久化消息,1 表示不持久化(内存中)
Headers: 自定义属性(可以设置 key-value)

✅ 使用示例:基本用法

body := "Hello RabbitMQ!"
err := channel.Publish("my-exchange", // exchange 这表示你要将消息发布到一个叫 "my-exchange" 的交换机。"my-key",      // routing key 会被用于匹配绑定在交换机上的队列。false,         // mandatory 如果消息无法路由到队列,不返回任何信息。false,         // immediate 不要求立即投递(几乎总是 false)。amqp.Publishing{ ContentType: "text/plain",Body:        []byte(body), //消息正文,以字节形式传递。DeliveryMode: amqp.Persistent, // 2 = 持久化},
)
if err != nil {log.Fatalf("Publish failed: %s", err)
}

通俗的讲一下mandatory和immediate两个参数及其应用场景

参数通俗解释
mandatory“找不到接收方要告诉我”(确保消息不被悄悄丢掉)
immediate“没有消费者就别投了”(对方不在线就别发)
err := ch.Publish("logs", "debug.key",true,  // mandatoryfalse,msg,
)

结果:
如果没有任何队列绑定了 “logs” 交换机并匹配 “debug.key”,这条消息就会被退回来,你可以通过监听 channel.NotifyReturn() 获取退回消息。

⚠️ 注意:RabbitMQ 早就不支持 immediate = true了!这个参数基本是“历史遗留”。几乎都设置为false
RabbitMQ 默认就不支持 immediate。 设置 immediate = true 会直接报错:“immediate=true” not supported

带 mandatory 回退处理机制的 RabbitMQ 生产者完整示例代码

✅ 功能概览
启动 RabbitMQ 连接与通道
使用 mandatory = true 发布消息
使用 NotifyReturn() 接收“退回的消息”
输出退回原因和消息内容

package mainimport ("log""time""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接 RabbitMQconn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "连接失败")defer conn.Close()ch, err := conn.Channel()failOnError(err, "打开通道失败")defer ch.Close()// 声明交换机(topic 类型)err = ch.ExchangeDeclare("my-exchange", // name"topic",       // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)failOnError(err, "声明交换机失败")// 设置 return 回退监听(必须在 Publish 之前设置)returns := ch.NotifyReturn(make(chan amqp.Return))// 模拟发送消息,但没有任何队列绑定这个 key → 消息将被退回err = ch.Publish("my-exchange", // exchange"unmatched.key", // routing keytrue,          // mandatory:要求通知投递失败false,         // immediate:RabbitMQ 不支持amqp.Publishing{ContentType:  "text/plain",Body:         []byte("This message will be returned"),DeliveryMode: amqp.Persistent,},)if err != nil {log.Printf("Publish error: %s", err)}// 检查是否被回退(注意:这是异步的)select {case ret := <-returns:log.Println("❗ 消息被退回!")log.Printf("原因:%s", ret.ReplyText)log.Printf("交换机:%s", ret.Exchange)log.Printf("路由键:%s", ret.RoutingKey)log.Printf("内容:%s", string(ret.Body))case <-time.After(2 * time.Second):log.Println("✅ 消息已成功路由(没有被退回)")}
}

🧪 测试说明
你不绑定任何队列到 my-exchange + unmatched.key,运行这段代码会看到:

❗ 消息被退回!
原因:NO_ROUTE
交换机:my-exchange
路由键:unmatched.key
内容:This message will be returned
http://www.lryc.cn/news/2398830.html

相关文章:

  • 硬件学习笔记--62 MCU的ECC功能简介
  • Uiverse.io:免费UI组件库
  • 普中STM32F103ZET6开发攻略(四)
  • ck-editor5的研究 (5):优化-页面离开时提醒保存,顺便了解一下 Editor的生命周期 和 6大编辑器类型
  • [3D GISMesh]三角网格模型中的孔洞修补算法
  • 11.2 java语言执行浅析3美团面试追魂七连问
  • MySQL 全量、增量备份与恢复
  • 【25.06】FISCOBCOS使用caliper自定义测试 通过webase 单机四节点 helloworld等进行测试
  • MonoPCC:用于内窥镜图像单目深度估计的光度不变循环约束|文献速递-深度学习医疗AI最新文献
  • 如何计算H5页面加载时的白屏时间
  • SpringAI系列 - MCP篇(三) - MCP Client Boot Starter
  • 【深度学习新浪潮】以Dify为例的大模型平台的对比分析
  • Asp.net core 使用EntityFrame Work
  • isp中的 ISO代表什么意思
  • AI Coding 资讯 2025-06-03
  • 2024年12月 C/C++(三级)真题解析#中国电子学会#全国青少年软件编程等级考试
  • 3d GIS数据来源与编辑工具
  • NeRF PyTorch 源码解读 - 体渲染
  • SpringBoot 数据库批量导入导出 Xlsx文件的导入与导出 全量导出 数据库导出表格 数据处理 外部数据
  • 解决:install via Git URL失败的问题
  • OpenCV CUDA模块特征检测------创建Harris角点检测器的GPU实现接口cv::cuda::createHarrisCorner
  • 【氮化镓】钝化层对p-GaN HEMT阈值电压的影响
  • C++:优先级队列
  • 睡眠分期 html
  • Java求职者面试:Spring、Spring Boot、Spring MVC与MyBatis技术深度解析
  • Github 2025-05-29 Go开源项目日报Top9
  • 前端项目种对某个文件夹进行大小写更改,git识别不到差异导致无变化
  • AWS VPC 网络详解:理解云上专属内网的关键要素
  • Ubuntu24.04.2 + kubectl1.33.1 + containerdv1.7.27 + calicov3.30.0
  • 循环神经网络(RNN)全面教程:从原理到实践