Rocketmq消息中间件中通过message key找消息的问题
小编:管理员 22阅读 2022.08.01
参考:http://rocketmq.apache.org/docs/quick-start/
2. Rocketmq的简单应用参考:https://github.com/apache/rocketmq/tree/master/example
3. MessageQueueSelectorpublic interface MessageQueueSelector { MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);}复制
RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上,RocketMQ默认提供了三种实现,分别是SelectMessageQueueByHash、SelectMessageQueueByMachineRoom、SelectMessageQueueByRandoom。MessageQueueSelector的select方法提供了三个入参,分别为消息队列集合、消息和扩展参数。本示例通过使用扩展参数来实现消息通道的定向发送和接收。
RocketMQ在设计的时候就支持tag了,因为他的索引文件就包含了tag的。 后来为了更去的过滤功能,更是扩展格式里,能进一步根据SQL92或者创建时间来过滤了。可以自定义MessageSelector来获取需要的消息。
ConsumeQueue扩展格式:支持sql92标准来过滤 ConsumeQueue标准格式只能通过tags搜索,不能使用用filters和commitTime搜索,于是扩展格式增加了: 参考:http://rocketmq.apache.org/docs/filter-by-sql92-example/
- 生产者示例:
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();Message msg = new Message("TopicTest", tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Set some properties.msg.putUserProperty("a", String.valueOf(i));SendResult sendResult = producer.send(msg);producer.shutdown();复制
- 消费者示例:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");// only subsribe messages have property a, also a >=0 and a <= 3consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});consumer.start();复制

IndexFile:支持查询消息,topic+key+最多条数+开始时间+结束时间 public QueryOffsetResult queryOffset(String topic,String key,int maxNum,long begin,long end){...}
4. 怎样设计IndexFile的物理存储内容才能满足上面的要求?RocketMQ的物理存储总结:
- 消息实际内容存储在CommitLog中(这点和Kafka大有不同,这也是RocketMQ没有kafka那么大的吞吐但是吞吐更稳定的原因);
- 为了能有多个Consumer并行消费,设计了基于(topic,queued)区分的ConsumeQueue;
- 为了在消费时在Broker上就过滤掉不感兴趣的内容,支持为Message打tag,订阅时只得到相关的tag的消息,将tagCode存储于其上。
- 为了订阅时能做到除了tag外的更多过滤,设计ConsumeQueueExt格式,通过BloomFilter;
- 为了满足根据key和时间段进行查询,设计了IndexFile
- Kafka是不支持broker端过滤的,只能通过offset拿数据,拿到Consumer里,自己把Message解析出来,在Consumer里过滤。
相关推荐