博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Rocketmq
阅读量:6895 次
发布时间:2019-06-27

本文共 3849 字,大约阅读时间需要 12 分钟。

hot3.png

 

 

4.6 Message Reliablity

影响消息可靠性的几种情况:
(1). Broker 正常关闭
(2). Broker 异常 Crash
(3). OS Crash
(4). 机器掉电,但是能立即恢复供电的情况。
(5). 机器无法开机(可能是 cpu、主板、内存等关键设备损坏)
(6). 磁盘设备损坏。
(1)(2)(3)(4)四种情况都属于硬件资源可立即恢复的情况,RocketMQ 在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
(5)(6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。 RocketMQ 在这两种情况下,通
异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。

4.7 Low Latency Messaging

在消息不堆积情况下,消息到达 Broker 后,能立刻到达 Consumer
RocketMQ 使用长轮询 Pull 方式,可保证消息非常实时,消息实时性不低于 Push

4.8 At least Once

是指每个消息必须投递一次;
RocketMQ Consumer pull 消息到本地,消费完成后,才向服务器返回 ack,如果没有消费一定不会返回 ack 消息,所以 RocketMQ 可以很好的支持此特性。

4.9 Exactly Only Once

(1). 发送消息阶段,不允许发送重复的消息。
(2). 消费消息阶段,不允许消费重复的消息。
只有以上两个条件都满足情况下,才能认为消息是“ Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以 RocketMQ 为了追求高性能,并不保证此特性,要求在业务上进行去重,也就是说消费消息要做到
幂等性RocketMQ 虽然不能严格保证不重复,但是正常情况下很少会出现重复发送、消费的情况,只有网络异常,Consumer 启停等异常情况下会出现消息重复。
此问题的本质原因是网络调用存在不确定性,即既不成功也不失败的第三种状态,所以才产生了消息重复性问题;

4.10 Broker Buffer 满了怎么办?

Broker Buffer 通常指的是 Broker 中一个队列的内存 Buffer 大小,这类 Buffer 通常大小有限,如果 Buffer 了以后怎举办?

下面是 CORBA Notification 规范中处理方式:
(1). Reject New Events
    拒绝新来的消息,向 Producer 返回 Reject New Events 错误码。
(2). 按照特定策略丢弃已有消息
a) AnyOrder - Any event may be discarded on overflow. This is the default setting for this
property.
b) FifoOrder - The first event received will be the first discarded.
c) LifoOrder - The last event received will be the first discarded.
d) PriorityOrder - Events should be discarded in priority order, such that lower priority
events will be discarded before higher priority events.
e) DeadlineOrder - Events should be discarded in the order of shortest expiry deadline first.

RocketMQ 没有内存 Buffer 概念,RocketMQ 的队列都是持久化磁盘,数据定期清除。

对于此问题的解决思路,RocketMQ 同其他 MQ 有非常显著的区别,RocketMQ 的内存 Buffer 抽象成一个无限长度的队列,不管有多少数据进来都能装得下,这个无限是有前提的,Broker 会定期删除过期的数据,例如 Broker 只保存 3 天的消息, 3 天前的数据会被从队尾删除。

4.11 回溯消费

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。 RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。

 

4.12 消息堆积

消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的消息堆积能力,消息堆积分以下两种情况:
(1). 消息堆积在内存 Buffer,一旦超过内存 Buffer,可以根据一定的丢弃策略来丢弃消息,如 CORBA Notification 规范中描述。适合能容忍丢弃消息的业务,这种情况消息的堆积能力主要在于内存 Buffer 大小,
而且消息堆积后,性能下降不会太大,因为内存中数据多少对于对外提供的访问能力影响有限;

(2). 消息堆积到持久化存储系统中,例如 DBKV 存储,文件记录形式。

当消息不能在内存 Cache 命中时,要不可避免的访问磁盘,会产生大量读 IO,读 IO 的吞吐量直接决定了
消息堆积后的访问能力。
评估消息堆积能力主要有以下四点:

(1). 消息能堆积多少条,多少字节?即消息的堆积容量。

(2). 消息堆积后,发消息的吞吐量大小,是否会受堆积影响?

(3). 消息堆积后,正常消费的 Consumer 是否会受影响?

(4). 消息堆积后,访问堆积在磁盘的消息时,吞吐量有多大?
 

 

7.2 刷盘策略

    RocketMQ 的所有消息都是持久化的,先写入系统 PAGECACHE,然后刷盘,可以保证内存与磁盘都有一份数据,访问时,直接从内存读取。

7.2.1 异步刷盘

    在有 RAID 卡,SAS 15000 转磁盘测试顺序写文件,速度可以达到 300M 每秒左右,而线上的网卡一般都为千兆网卡,写磁盘速度明显快于数据网络入口速度,那么是否可以做到写完内存就向用户返回,由后台线程刷盘呢?
(1). 由于磁盘速度大于网卡速度,那么刷盘的速度肯定可以跟上消息的写入速度。
(2). 万一由于此时系统压力过大,可能堆积消息,除了写入 IO,还有读取 IO,万一出现磁盘读取落后情况,会不会导致系统内存溢出,答案是否定的,原因如下:
   a) 写入消息到 PAGECACHE 时,如果内存不足,则尝试丢弃干净的 PAGE,腾出内存供新消息使用,策略是 LRU 方式。
   b) 如果干净页不足,此时写入 PAGECACHE 会被阻塞,系统尝试刷盘部分数据,大约每次尝试 32 个 PAGE,来找出更多干净 PAGE。
综上,内存溢出的情况不会出现。

7.2.2 同步刷盘

    同步刷盘与异步刷盘的唯一区别是异步刷盘写完 PAGECACHE 直接返回,而同步刷盘需要等待刷盘完成才返回,

同步刷盘流程如下:
   (1). 写入 PAGECACHE 后,线程等待,通知刷盘线程刷盘。
   (2). 刷盘线程刷盘后,唤醒前端等待线程,可能是一批线程。
   (3). 前端等待线程向用户返回成功。

 

7.11 发送定时消息

7.12 消息消费失败,定时重试
7.13 HA,同步双写/异步复制
    异步复制的实现思路非常简单,Slave 启动一个线程,不断从 Master 拉取 Commit Log 中的数据,然后在异步 build 出 Consume Queue 数据结构。整个实现过程基本同 Mysql 主从同步类似。
 

10 RocketMQ 服务发现(Name Server

    Name Server 是专为 RocketMQ 设计的轻量级名称服务,代码小于 1000 行,具有简单、可集群横向扩展、无状态等特点。将要支持的主备自动切换功能会强依赖 Name Server。
 

13.2 消息发送失败如何处理

Producer 的 send 方法本身支持内部重试,重试罗辑如下:
    1. 至多重试 3 次。
    2. 如果发送失败,则轮转到下一个 Broker。
    3. 返个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认 10s。
所以,如果本身向 broker 发送消息产生超时异常,就不会再做重试。
以上策略仍然不能保证消息一定发送成功,为保证消息一定成功,建议应用这样做:如果调用 send 同步方法发送失败,则尝试将消息存储到 db,由后台线程定时重试,保证消息一定到达 Broker。

 

 

 

 

转载于:https://my.oschina.net/zfscofield/blog/1602242

你可能感兴趣的文章
System 这四个单元多用用(近期)
查看>>
Html5使用history对象history.pushState()和history.replaceState()方法添加和修改浏览历史记录...
查看>>
SVN 钩子 允许用户修改Subversion日志的钩子脚本(转)
查看>>
算法学习一
查看>>
像素PX厘米转换
查看>>
webpack使用babel
查看>>
201314
查看>>
python list 的查找, 搜索, 定位, 统计
查看>>
[Usaco2009 Feb]Revamping Trails 道路升级 BZOJ1579
查看>>
在Altium Designer9.0中建造自己的库
查看>>
转:c++ 11 新特性
查看>>
CodeVS 1058 合唱队形(DP--最长子序列问题)
查看>>
陶哲轩实分析引理 11.1.4
查看>>
秩-零化度定理
查看>>
mvc与三层架构终极区别
查看>>
python基础知识~ 函数详解
查看>>
简单几何(线段与直线的位置) POJ 3304 Segments
查看>>
DFS/BFS+思维 HDOJ 5325 Crazy Bobo
查看>>
P2312 解方程
查看>>
P2447 [SDOI2010]外星千足虫
查看>>