Rocketmq消息中间件中通过message key找消息的问题

小编:管理员 22阅读 2022.08.01

1. Rocketmq的安装布署:

参考:http://rocketmq.apache.org/docs/quick-start/

2. Rocketmq的简单应用

参考:https://github.com/apache/rocketmq/tree/master/example

3. MessageQueueSelector
public interface MessageQueueSelector {    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);}
复制

RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上,RocketMQ默认提供了三种实现,分别是SelectMessageQueueByHash、SelectMessageQueueByMachineRoom、SelectMessageQueueByRandoom。MessageQueueSelector的select方法提供了三个入参,分别为消息队列集合、消息和扩展参数。本示例通过使用扩展参数来实现消息通道的定向发送和接收。

RocketMQ在设计的时候就支持tag了,因为他的索引文件就包含了tag的。 后来为了更去的过滤功能,更是扩展格式里,能进一步根据SQL92或者创建时间来过滤了。可以自定义MessageSelector来获取需要的消息。

ConsumeQueue扩展格式:支持sql92标准来过滤 ConsumeQueue标准格式只能通过tags搜索,不能使用用filters和commitTime搜索,于是扩展格式增加了: 参考:http://rocketmq.apache.org/docs/filter-by-sql92-example/

  1. 生产者示例:
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();Message msg = new Message("TopicTest",    tag,    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Set some properties.msg.putUserProperty("a", String.valueOf(i));SendResult sendResult = producer.send(msg);producer.shutdown();
复制
  1. 消费者示例:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");// only subsribe messages have property a, also a >=0 and a <= 3consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");consumer.registerMessageListener(new MessageListenerConcurrently() {    @Override    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    }});consumer.start();
复制

IndexFile:支持查询消息,topic+key+最多条数+开始时间+结束时间 public QueryOffsetResult queryOffset(String topic,String key,int maxNum,long begin,long end){...}

4. 怎样设计IndexFile的物理存储内容才能满足上面的要求?

RocketMQ的物理存储总结:

  • 消息实际内容存储在CommitLog中(这点和Kafka大有不同,这也是RocketMQ没有kafka那么大的吞吐但是吞吐更稳定的原因);
  • 为了能有多个Consumer并行消费,设计了基于(topic,queued)区分的ConsumeQueue;
  • 为了在消费时在Broker上就过滤掉不感兴趣的内容,支持为Message打tag,订阅时只得到相关的tag的消息,将tagCode存储于其上。
  • 为了订阅时能做到除了tag外的更多过滤,设计ConsumeQueueExt格式,通过BloomFilter;
  • 为了满足根据key和时间段进行查询,设计了IndexFile
  • Kafka是不支持broker端过滤的,只能通过offset拿数据,拿到Consumer里,自己把Message解析出来,在Consumer里过滤。
关联标签: