资讯专栏INFORMATION COLUMN

Node.js连接RabbitMQ,断线重连,动态绑定routing key

frank_fun / 2902人阅读

摘要:官方提供的教程,是基于回调的。下面将给出基于式的写法。并且实现动态的队列绑定初始化配置地址交换机名称读取在跑多实例时,例如在中,可以获取当前的名称多实例时,写日志,或者建立连接时,最好带上名称,如果出现问题,也比较好定位哪个出现的问题。

RabbitMQ官方提供的教程https://www.rabbitmq.com/tuto...,是基于回调的。

下面将给出基于Promise式的写法。并且实现动态的队列绑定

初始化配置
const amqp = require("amqplib")
// rabbitMQ地址
const {amqpAddrHost} = require("../config/index.js")

// 交换机名称
const ex = "amq.topic"

const amqpAddr = `amqp://${amqpAddrHost}`

// 读取HOSTNAME, 在跑多实例时,例如在k8s中,HOSTNAME可以获取当前pod的名称
// 多实例时,写日志,或者建立连接时,最好带上pod名称,如果出现问题,也比较好定位哪个pod出现的问题。
const hostName = process.env.HOSTNAME

// 队列的属性设置
// 一般来说,最好设置队列自动删除autoDelete,当链接断开时,队列也会删除,这样不会产生非常多的无用队列
// durable是用来的持久化的,最好也可以设置成不持久化

const queueAttr = {autoDelete: true, durable: false}

// 定义channel的引用,当链接建立时,所有方法都可以通过引用CH来获取channel方法
let CH = null
向队列发送消息的函数
// 向队列发送消息的函数
function publishMessage (msg) {
  if (!CH) {
    return ""
  }

  msg = JSON.stringify(msg)
  // 指定交换机ex, routing key, 以及消息的内容
  CH.publish(ex, eventBusTopic, Buffer.from(msg))
}
当链接rabbitMQ断开时,要主动去重连
function reconnectRabbitMq () {
  log.info("reconnect_rabbit_mq")
  connectRabbitMq()
}
连接rabbitMQ的主要函数
function connectRabbitMq () {
  amqp.connect(amqpAddr, {
    // 设置connection_name的属性,可以在rabbitMQ的控制台的UI上,看到连接是来自哪个实例
    clientProperties: {
      connection_name: hostName
    }
  })
  .then((conn) => {
    log.info("rabbitmq_connect_successd")
    // 一定要加上链接的报错事件处理,否则一旦报error错,如果不处理这个错误,程序就会崩溃
    // error是个特别的事件,务必要处理的
    // 报错就直接去重连
    conn.on("error", (err) => {
      log.error("connect_error " + err.message, err)
      reconnectRabbitMq()
    })
    // 创建channel
    return conn.createChannel()
  })
  .then((ch) => {
    CH = ch
    // 初始化交换机
    ch.assertExchange(ex, "topic", {durable: true})
    // 初始化一个队列,队列名就用hostName, 比较容易从对列名上知道是哪个实例创建的队列
    return ch.assertQueue(hostName, queueAttr)
  })
  .then((q) => {
    // 可以在队列初始化完毕就立即绑定routing key, 也可以暂时不绑定,后续动态的绑定
    // CH.bindQueue(q.queue, ex, "some.topic.aaa")
    // 消费者,获取消息
    CH.consume(q.queue, (msg) => {
      var _msg = msg.content.toString()
      var MSG = JSON.parse(_msg)
      log.info(_msg, MSG)
    }, {noAck: true})
  })
  .catch((err) => {
    console.log(err)
  })
}
动态给队列绑定或者解绑routing key
function toggleBindQueue (routingKey, bind) {
  return new Promise((resolve, reject) => {
    if (!CH) {
      log.error("channel not established")
      reject(new Error("channel not established"))
      return ""
    }
    // 初始化队列,如果队列已经存在,就会直接使用
    CH.assertQueue(`${hostName}`, queueAttr)
    .then((q) => {
      // 如果bind是true,就绑定。否则就解绑
      if (bind) {
        log.info(`bindQueue ${hostName} ${topic}`)
        return CH.bindQueue(q.queue, ex, topic)
      } else {
        return CH.unbindQueue(q.queue, ex, topic)
      }
    })
    .then((res) => {
      resolve()
    })
    .catch((err) => {
      reject(err)
      log.error(err)
    })
  })
}

module.exports = {
  connectRabbitMq,
  toggleBindQueue,
  publishMessage
}
使用方法

加入你的服务端用的是Express, 那么在app.js中可以

...
const {connectRabbitMq} = require("./connect-mq.js")
connectRabbitMq()
...
完整代码
// onnect-mq.js
const amqp = require("amqplib")
// rabbitMQ地址
const {amqpAddrHost} = require("../config/index.js")

// 交换机名称
const ex = "amq.topic"

const amqpAddr = `amqp://${amqpAddrHost}`

// 读取HOSTNAME, 在跑多实例时,例如在k8s中,HOSTNAME可以获取当前pod的名称
// 多实例时,写日志,或者建立连接时,最好带上pod名称,如果出现问题,也比较好定位哪个pod出现的问题。
const hostName = process.env.HOSTNAME

// 队列的属性设置
// 一般来说,最好设置队列自动删除autoDelete,当链接断开时,队列也会删除,这样不会产生非常多的无用队列
// durable是用来的持久化的,最好也可以设置成不持久化

const queueAttr = {autoDelete: true, durable: false}

// 定义channel的引用,当链接建立时,所有方法都可以通过引用CH来获取channel方法
let CH = null


// 向队列发送消息的函数
function publishMessage (msg) {
  if (!CH) {
    return ""
  }

  msg = JSON.stringify(msg)
  // 指定交换机ex, routing key, 以及消息的内容
  CH.publish(ex, eventBusTopic, Buffer.from(msg))
}

// 当链接rabbitMQ断开时,要主动去重连
function reconnectRabbitMq () {
  log.info("reconnect_rabbit_mq")
  connectRabbitMq()
}

// 链接rabbitMQ的主要函数
function connectRabbitMq () {
  amqp.connect(amqpAddr, {
    // 设置connection_name的属性,可以在rabbitMQ的控制台的UI上,看到链接是来自哪个实例
    clientProperties: {
      connection_name: hostName
    }
  })
  .then((conn) => {
    log.info("rabbitmq_connect_successd")
    // 一定要加上链接的报错事件处理,否则一旦报error错,如果不处理这个错误,程序就会崩溃
    // error是个特别的事件,务必要处理的
    // 报错就直接去重连
    conn.on("error", (err) => {
      log.error("connect_error " + err.message, err)
      reconnectRabbitMq()
    })
    // 创建channel
    return conn.createChannel()
  })
  .then((ch) => {
    CH = ch
    // 初始化交换机
    ch.assertExchange(ex, "topic", {durable: true})
    // 初始化一个队列,队列名就用hostName, 比较容易从对列名上知道是哪个实例创建的队列
    return ch.assertQueue(hostName, queueAttr)
  })
  .then((q) => {
    // 可以在队列初始化完毕就立即绑定routing key, 也可以暂时不绑定,后续动态的绑定
    // CH.bindQueue(q.queue, ex, "some.topic.aaa")
    // 消费者,获取消息
    CH.consume(q.queue, (msg) => {
      var _msg = msg.content.toString()
      var MSG = JSON.parse(_msg)
      log.info(_msg, MSG)
    }, {noAck: true})
  })
  .catch((err) => {
    console.log(err)
  })
}


// 动态给队列绑定或者解绑routing key
function toggleBindQueue (routingKey, bind) {
  return new Promise((resolve, reject) => {
    if (!CH) {
      log.error("channel not established")
      reject(new Error("channel not established"))
      return ""
    }
    // 初始化队列,如果队列已经存在,就会直接使用
    CH.assertQueue(`${hostName}`, queueAttr)
    .then((q) => {
      // 如果bind是true,就绑定。否则就解绑
      if (bind) {
        log.info(`bindQueue ${hostName} ${topic}`)
        return CH.bindQueue(q.queue, ex, topic)
      } else {
        return CH.unbindQueue(q.queue, ex, topic)
      }
    })
    .then((res) => {
      resolve()
    })
    .catch((err) => {
      reject(err)
      log.error(err)
    })
  })
}

module.exports = {
  connectRabbitMq,
  toggleBindQueue,
  publishMessage
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/32755.html

相关文章

  • Node.js连接RabbitMQ断线重连动态绑定routing key

    摘要:官方提供的教程,是基于回调的。下面将给出基于式的写法。并且实现动态的队列绑定初始化配置地址交换机名称读取在跑多实例时,例如在中,可以获取当前的名称多实例时,写日志,或者建立连接时,最好带上名称,如果出现问题,也比较好定位哪个出现的问题。 RabbitMQ官方提供的教程https://www.rabbitmq.com/tuto...,是基于回调的。 下面将给出基于Promise式的写法。...

    Lsnsh 评论0 收藏0
  • Node.js连接RabbitMQ断线重连动态绑定routing key

    摘要:官方提供的教程,是基于回调的。下面将给出基于式的写法。并且实现动态的队列绑定初始化配置地址交换机名称读取在跑多实例时,例如在中,可以获取当前的名称多实例时,写日志,或者建立连接时,最好带上名称,如果出现问题,也比较好定位哪个出现的问题。 RabbitMQ官方提供的教程https://www.rabbitmq.com/tuto...,是基于回调的。 下面将给出基于Promise式的写法。...

    Cheriselalala 评论0 收藏0
  • vue项目前端知识点整理

    摘要:在如下几个属性,表示当前的真实时间,用于和服务器时间同步,表示创建时间,主要用于分页,以及重连时的判断,表示是否断线重连。初始化连接时,将赋值为当前本地时间,连接成功后,将赋值为服务器返回的当前时间,再设置一个定时器,保持时间与服务器一致。 vue项目前端知识点整理 微信授权后还能通过浏览器返回键回到授权页 在导航守卫中可以在next({})中设置replace: true来重定向到改...

    bang590 评论0 收藏0
  • 20170917 前端开发周报:JavaScript函数式编程、作用域和闭包

    摘要:用函数式编程对进行断舍离当从业的老司机学会函数式编程时,他扔掉了的特性,也不用面向对象了,最后发现了真爱啊作用域和闭包作用域和闭包在里非常重要。旨在帮助非函数式编程的同学,能快速切入到函数式编程的理念。 1、用函数式编程对JavaScript进行断舍离 当从业20的JavaScript老司机学会函数式编程时,他扔掉了90%的特性,也不用面向对象了,最后发现了真爱啊!!! https:/...

    tomener 评论0 收藏0
  • 20170917 前端开发周报:JavaScript函数式编程、作用域和闭包

    摘要:用函数式编程对进行断舍离当从业的老司机学会函数式编程时,他扔掉了的特性,也不用面向对象了,最后发现了真爱啊作用域和闭包作用域和闭包在里非常重要。旨在帮助非函数式编程的同学,能快速切入到函数式编程的理念。 1、用函数式编程对JavaScript进行断舍离 当从业20的JavaScript老司机学会函数式编程时,他扔掉了90%的特性,也不用面向对象了,最后发现了真爱啊!!! https:/...

    cyixlq 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<