rabbitmq之Federation配置

摘要:当我们有多个rabbitmq集群的时候,如果想要单向的同步集群的消息,也就是说把新集群当作老集群的镜像集群,实时的同步老集群的消息,在老集群消息被消费的时候不会影响同步到新集群的消息。在外部看上去就像每次写入消息的时候,同时向新老两个集群写入一样,不论mq的跨版本,不论mq的用户。一般我们会将这种情况应用于存在两个不同的系统,但是老数据来源只能向一个队列写入数据,此时为了在新系统上也可以实时同步到老系统队列中的数据的时候。

Federation介绍

federation 插件的最终目标是,在不同 broker 之间进行消息传递而无需建立集群;该功能在很多场景下非常有用:

注意:当你在一个cluster中使用federation插件,所有在集群中 的nodes都需要安装federation插件

特点

松耦合性(Loose coupling)

  • federation 插件能够在分属不同管理域的 broker 或 cluster 之间传递消息:
  • 他们可能设置了不同的 user 和 vhost ;
  • 他们可能运行在不同版本的 RabbitMQ 和 Erlang 上;

WAN 友好性(WAN-friendly)

  • federation 插件基于 AMQP 0-9-1 协议在不同 broker 之间进行通信,并设计成能够容忍不稳定的网络连通情况;

扩展性(Scalability)

  • federation 不需要在 n 个 broker 之间建立 O(n^2) 个连接(尽管这是最简单的使用模式),这也就意味着 federation 在使用时更容易扩展

federation能做什么?

federation 插件允许你将多个 exchange 或多个 queue 进行 federate ;federated exchange 或 federated queue 能够从一个或多个 upstream 接收到消息;

也就是说,你的队列可以和其他集群的队列建立一种关系,他们之间可以相互的同步数据,可以是我同步给你,也可以是你同步给我,不过这种关系有两个角色一个是上游一个是下游,数据流向是上游流向下流。
这里有三个名词,federation 插件允许你将多个 exchange 或多个 queue 进行 federate:

  • upstream: 上游,是指位于其他 broker 上的、远端 exchange 和 queue ;
  • federated exchange: 到exchange的关系,能够将发给 upstream 的消息路由到本地的某个 queue 中;
  • federated queue: 到queue的关系,则允许一个本地消费者接收到来自 upstream queue 的消息;

配置的种类

关于 federation upstream 的信息全都保存在 RabbitMQ 的数据库中,其中包括了 user 信息、permission 信息、queue 信息等等;
在 federation 中存在 3 种界别的配置:

  • Upstreams - 每一个 upstream 用于定义如何与另外的 broker 建立连接;
  • Upstream sets - 每一个 upstream set 用于针对一系列使用 federation 功能 upstream 进行了分组;
  • Policies - 每一种 policy 会限定(过滤)出一组 exchange ,或者一组 queue ,或者同时针对两者进行限定;policy 最终将作用于一个单独的 upstream 上,或者一个 upstream set 上,并对其他对象发挥作用;

实际上,在最简单的使用情况下,你可以忽略已经存在的upstream设置,因为有一个隐含的默认upstream叫做“all”,他会添加所有的upstream。

身份验证

我们讨论的是免身份验证的方式,如果有身份难的需求请参考官网:http://www.rabbitmq.com/authentication.html

操作步骤说明

parameter 和 policy 可以通过 3 种方式进行设置:
通过 rabbitmqctl 脚本;
通过 management 插件提供的 HTTP API ;
通过 rabbitmq_federation_management 插件提供的 Web UI(更通用的方式,我们也是通过页面来配置就可以了);注意:基于 Web UI 的方式不能提供全部功能,尤其无法针对 upstream set 进行管理;

1. 在集群的每一个node开启federation插件(同步和被同步集群都需要)

参考命令:

1
2
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management

2. 登录到同步集群的管理界面::http://x.x.x.:15672/#/

3. 创建upstream

tips:在下游,也就是新队列(被同步队列)上操作

upstrem

1
2
UI操作:Admin->Federation Upstreams->Add a new upstream Name:随意填写 URI:填被同步集群(例如:amqp://user1:xxx@x.x.x.x,xxx为连接密码) Expires:默认填写3600000 单位ms
其余字段可不用填写

Expires:是代表缓存时间,如果说网络连通性不好的时候,消息会在上游的队列中缓存的时间,超时丢弃,设置为空则表示,永远缓存不会丢弃数据(但是如果长时候不恢复内存会占用越来越大,建议设置上)
Acknowledgement Mode: 代表消息确认方式,用来防止消息在传输过程中丢失,有三个值,on-confirm、on-publish、no-ack,对传输速度的影响是从慢速到快速,对安全性是不会丢失到可能会丢失。通常使用on-publish,不然on-confirm太慢了。

4. 创建policy

tips:在下游,也就是新队列(被同步队列)上操作
policy

1
2
3
4
UI操作:Admin->Policies->Add / update a policy
Name:随意填写(sync_data)
Pattern:匹配表达式(例如:^(?!amq.).* 剔除系统队列后的所有队列)
Apply to: 默认选择Exchange and queues Definition:federation-upstream-set = all (选定federation规则)

5. 查看状态图

现在,所有内置的 exchange 都应该建立了 federation ,因为他们都能匹配上面的 policy,可以通过页面查看状态

1
2
UI操作:Admin > Federation Status > Running Links 查看针对每个 exchange 的 federation 连接。
配置成功可以看到匹配的Exchange / Queue, state:running

也可以通过下面的命令查看状态图:

1
rabbitmqctl eval 'rabbit_federation_status:status().'

也可以通过 management 插件中的 exchange 列表,或者下面的命令输出,确认上述 policy 已经作用到了 exchange 上;

1
rabbitmqctl list_exchanges name policy | grep federate-me

通常情况下,针对每个 upstream 都会有一条 federation 连接,该 federation 连接对应到一个 exchange 上;例如 3 个 exchange 与 2 个 upstream 分别建立 federation 的情况下,会有 6 条连接。

6. 查看连接

登录到被同步集群(上游)的管理界面::http://x.x.x.:15672/#/ 前往 Connections选项 配置成功可以看到来自同步集群的连接

高级

更复杂的配置:https://www.rabbitmq.com/federation-reference.html

参考

官网

坚持原创技术分享,您的支持将鼓励我继续创作!