spring整合各种中间件(RocketMQ、kafka、RabbitMQ、TubeMQ、NSQ)
小编:管理员 37阅读 2022.08.01
本文源码下载:https://gitee.com/hong99/spring/issues/I1N1DF
TubeMQ是什么?
简介
TubeMQ是2019年腾讯在ApacheCon开源的一个消息中间件系统,性能优越。经过近7年、万亿规模的海量数据沉淀,TubeMQ目前日均接入量超过25万亿条消息。较之于其他的开源MQ组件,TubeMQ长期应用于真实生产环境中,在稳定性、性能和成本方面都有着核心优势。
https://inlong.apache.org/zh-cn/docs/quick_start.html
功能介绍
纯 Java 实现语言
引入 Master 协调节点:相比 Kafka 依赖于 Zookeeper 完成元数据的管理和实现 HA 保障不同,TubeMQ 系统采用的是自管理的元数据仲裁机制方式进行,Master 节点通过采用内嵌数据库 BDB 完成集群内元数据的存储、更新以及 HA 热切功能,负责 TubeMQ 集群的运行管控和配置管理操作,对外提供接口等;通过 Master 节点,TubeMQ 集群里的 Broker 配置设置、变更及查询实现了完整的自动化闭环管理,减轻了系统维护的复杂度
服务器侧消费负载均衡:TubeMQ 采用的是服务侧负载均衡的方案,而不是客户端侧操作,提升系统的管控能力同时简化客户端实现,更便于均衡算法升级
系统行级锁操作:对于 Broker 消息读写中存在中间状态的并发操作采用行级锁,避免重复问题
Offset 管理调整:Offset 由各个 Broker 独自管理,ZK 只作数据持久化存储用(最初考虑完全去掉ZK依赖,考虑到后续的功能扩展就暂时保留)
消息读取机制的改进:TubeMQ 采用的是消息随机读取模式,同时为了降低消息时延又增加了内存缓存读写,对于带 SSD 设备的机器,增加消息滞后转 SSD 消费的处理,解决消费严重滞后时吞吐量下降以及 SSD 磁盘容量小、刷盘次数有限的问题,使其满足业务快速生产消费的需求
消费者行为管控:支持通过策略实时动态地控制系统接入的消费者行为,包括系统负载高时对特定业务的限流、暂停消费,动态调整数据拉取的频率等;
服务分级管控:针对系统运维、业务特点、机器负载状态的不同需求,系统支持运维通过策略来动态控制不同消费者的消费行为,比如是否有权限消费、消费时延分级保证、消费限流控制,以及数据拉取频率控制等
系统安全管控:根据业务不同的数据服务需要,以及系统运维安全的考虑,TubeMQ 系统增加了 TLS 传输层加密管道,生产和消费服务的认证、授权,以及针对分布式访问控制的访问令牌管理,满足业务和系统运维在系统安全方面的需求
资源利用率提升改进:相比于 Kafka,TubeMQ 采用连接复用模式,减少连接资源消耗;通过逻辑分区构造,减少系统对文件句柄数的占用,通过服务器端过滤模式,减少网络带宽资源使用率;通过剥离对 Zookeeper 的使用,减少 Zookeeper 的强依赖及瓶颈限制
客户端改进:基于业务使用上的便利性以,我们简化了客户端逻辑,使其做到最小的功能集合,我们采用基于响应消息的接收质量统计算法来自动剔出坏4的 Broker 节点,基于首次使用时作连接尝试来避免大数据量发送时发送受阻。

Portal:负责对外交互和运维操作的Portal部分,包括API和Web两块,API对接集群之外的管理系统,Web是在API基础上对日常运维功能做的页面封装;
Master: 负责集群控制的Control部分,该部分由1个或多个Master节点组成,Master HA通过Master节点间心跳保活、实时热备切换完成(这是大家使用TubeMQ的Lib时需要填写对应集群所有Master节点地址的原因),主Master负责管理整个集群的状态、资源调度、权限检查、元数据查询等;
Broker: 负责实际数据存储的Store部分,该部分由相互之间独立的Broker节点组成,每个Broker节点对本节点内的Topic集合进行管理,包括Topic的增、删、改、查,Topic内的消息存储、消费、老化、分区扩容、数据消费的offset记录等,集群对外能力,包括Topic数目、吞吐量、容量等,通过水平扩展Broker节点来完成;
Client: 负责数据生产和消费的Client部分,该部分我们以Lib形式对外提供,大家用得最多的是消费端,相比之前,消费端现支持Push、Pull两种数据拉取模式,数据消费行为支持顺序和过滤消费两种。对于Pull消费模式,支持业务通过客户端重置精确offset以支持业务exactly-once消费,同时,消费端新推出跨集群切换免重启的BidConsumer客户端;
Zookeeper: 负责offset存储的zk部分,该部分功能已弱化到仅做offset的持久化存储,考虑到接下来的多节点副本功能该模块暂时保留。
详细请对考这里:https://inlong.apache.org/zh-cn/docs/architecture.html
源码实现
直接上docker (强列建议用docker哈,之前的tars自己部署搞了好几个星期....)
docker run -p 8080:8080 -p 8000:8000 -p 8123:8123 --name tubemq -d apachetubemq/tubemq-all:latest复制
然后运行:localhost:8080 如下

集群部署参考:https://inlong.apache.org/zh-cn/docs/quick_start.html
新增topic

默认这里的授权字段是:abc


纯java实现
引入jar包
<properties> <tubemq-client-version>0.8.0-incubating</tubemq-client-version> </properties> <dependencies> <dependency> <groupId>org.apache.tubemq</groupId> <artifactId>tubemq-client</artifactId> <version>${tubemq-client-version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.71</version> </dependency> </dependencies>复制
生产端
生产-异步消息
import com.alibaba.fastjson.JSONObject; import org.apache.commons.codec.binary.StringUtils; import org.apache.tubemq.client.config.TubeClientConfig; import org.apache.tubemq.client.factory.MessageSessionFactory; import org.apache.tubemq.client.factory.TubeSingleSessionFactory; import org.apache.tubemq.client.producer.MessageProducer; import org.apache.tubemq.client.producer.MessageSentCallback; import org.apache.tubemq.client.producer.MessageSentResult; import org.apache.tubemq.corebase.Message; /** * * 功能描述: 异步消息 * * @param: * @return: * @auther: csh * @date: 2021/5/16 18:04 */ public final class AsyncProducerExample { public static void main(String[] args) throws Throwable { final String masterHostAndPort = "localhost:8000"; final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort); final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig); final MessageProducer messageProducer = messageSessionFactory.createProducer(); final String topic = "java_tubemq"; final String body = "发送异步消息!"; byte[] bodyData = StringUtils.getBytesUtf8(body); messageProducer.publish(topic); final Message message = new Message(topic, bodyData); messageProducer.sendMessage(message, new MessageSentCallback(){ public void onMessageSent(MessageSentResult result) { if (result.isSuccess()) { System.out.println("async send message : " + JSONObject.toJSONString(message)); } else { System.out.println("async send message failed : " + result.getErrMsg()); } } public void onException(Throwable e) { System.out.println("async send message error : " + e); } }); messageProducer.shutdown(); } }复制
发送结果
async send message : {"data":"5Y+R6YCB5byC5q2l5raI5oGv77yB","flag":0,"indexId":0,"topic":"java_tubemq"}复制
同步消息-生产
import com.alibaba.fastjson.JSON; import org.apache.commons.codec.binary.StringUtils; import org.apache.tubemq.client.config.TubeClientConfig; import org.apache.tubemq.client.factory.MessageSessionFactory; import org.apache.tubemq.client.factory.TubeSingleSessionFactory; import org.apache.tubemq.client.producer.MessageProducer; import org.apache.tubemq.client.producer.MessageSentResult; import org.apache.tubemq.corebase.Message; /** * * 功能描述:同步消息 * * @param: * @return: * @auther: csh * @date: 2021/5/16 17:45 */ public final class SyncProducerExample { public static void main(String[] args) throws Throwable { //主节点地址 final String masterHostAndPort = "localhost:8000"; final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort); final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig); final MessageProducer messageProducer = messageSessionFactory.createProducer(); //topic 刚刚创建那个 final String topic = "java_tubemq"; final String body = "hello,i'm hong!How are you?"; byte[] bodyData = StringUtils.getBytesUtf8(body); messageProducer.publish(topic); Message message = new Message(topic, bodyData); MessageSentResult result = messageProducer.sendMessage(message); //判断结果 如果成功打印出 sync send message 加上消息内容 if (result.isSuccess()) { System.out.println("同步发送出去的消息: " + JSON.toJSONString(message)); } //关闭服务 messageProducer.shutdown(); } }复制
发送结果
同步发送出去的消息: {"data":"aGVsbG8saSdtIGhvbmchSG93IGFyZSB5b3U/","flag":0,"indexId":0,"topic":"java_tubemq"}复制
消费端
import com.alibaba.fastjson.JSONObject; import org.apache.tubemq.client.config.ConsumerConfig; import org.apache.tubemq.client.consumer.ConsumePosition; import org.apache.tubemq.client.consumer.ConsumerResult; import org.apache.tubemq.client.consumer.PullMessageConsumer; import org.apache.tubemq.client.factory.MessageSessionFactory; import org.apache.tubemq.client.factory.TubeSingleSessionFactory; import org.apache.tubemq.corebase.Message; import org.apache.tubemq.corebase.utils.ThreadUtils; import java.util.List; /** * * 功能描述: 通过pull拉取消息 * * @param: * @return: * @auther: csh * @date: 2021/5/16 17:51 */ public class PullConsumerExample { public static void main(String[] args) throws Throwable { //服务地址 final String masterHostAndPort = "localhost:8000"; //topic final String topic = "java_tubemq"; //消费组 final String group = "hong-group1"; final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group); consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET); final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); final PullMessageConsumer messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig); messagePullConsumer.subscribe(topic, null); messagePullConsumer.completeSubscribe(); // wait for client to join the exact consumer queue that consumer group allocated while (!messagePullConsumer.isPartitionsReady(1000)) { ThreadUtils.sleep(1000); } while (true) { ConsumerResult result = messagePullConsumer.getMessage(); if (result.isSuccess()) { List<Message> messageList = result.getMessageList(); for (Message message : messageList) { System.out.println("接收到的消息: " + JSONObject.toJSONString(message)+"内容是:"+new String(message.getData())); } messagePullConsumer.confirmConsume(result.getConfirmContext(), true); } else { if (result.getErrCode() == 400) { ThreadUtils.sleep(100); } else { if (result.getErrCode() != 404) { System.out.println(String.format("Receive messages errorCode is %d, Error message is %s", result.getErrCode(), result.getErrMsg())); } } } } } }复制
结果
接收到的消息: {"data":"5Y+R6YCB5byC5q2l5raI5oGv77yB","flag":0,"indexId":1200513646816395264,"topic":"java_tubemq"}内容是:发送异步消息! 接收到的消息: {"data":"aGVsbG8saSdtIGhvbmchSG93IGFyZSB5b3U/","flag":0,"indexId":2057144306094833664,"topic":"java_tubemq"}内容是:hello,i'm hong!How are you?复制
import org.apache.tubemq.client.config.ConsumerConfig; import org.apache.tubemq.client.consumer.ConsumePosition; import org.apache.tubemq.client.consumer.MessageListener; import org.apache.tubemq.client.consumer.PushMessageConsumer; import org.apache.tubemq.client.factory.MessageSessionFactory; import org.apache.tubemq.client.factory.TubeSingleSessionFactory; import org.apache.tubemq.corebase.Message; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; /** * * 功能描述: push消息模式 * * @param: * @return: * @auther: csh * @date: 2021/5/16 17:53 */ public class PushConsumerExample { public static void main(String[] args) throws Throwable { final String masterHostAndPort = "localhost:8000"; final String topic = "java_tubemq"; final String group = "hong-group"; final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group); consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET); final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); final PushMessageConsumer pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig); pushConsumer.subscribe(topic, null, new MessageListener() { public void receiveMessages(List<Message> messages) throws InterruptedException { for (Message message : messages) { System.out.println("PUSH接收到的消息 : " + new String(message.getData())); } } public Executor getExecutor() { return null; } public void stop() { // } }); pushConsumer.completeSubscribe(); CountDownLatch latch = new CountDownLatch(1); latch.await(10, TimeUnit.MINUTES); } }复制
结果
PUSH接收到的消息 : 发送异步消息! PUSH接收到的消息 : hello,i'm hong!How are you?复制
关于数据拉取模式支持Push、Pull的区别:
Push客户端:TubeMQ最初消费端版本只提供Push模式的消费,这种模式能比较快速地消费数据,减轻服务端压力,但同时也带来一个问题,业务使用的时候因为无法控制拉取频率,从而容易形成数据积压数据处理不过来;
带消费中止/继续的Push客户端: 在收到业务反馈能否控制Push拉取动作的需求后,我们增加了resumeConsume()/pauseConsume()函数对,让业务可以模拟水位线控制机制,状态比较繁忙时调用pauseConsume()函数来中止Lib后台的数据拉取,在状态恢复后,再调用resumeConsume()通知Lib后台继续拉取数据;
Pull客户端: 我们后来版本里增加了Pull客户端,该客户端有别于Push客户端,是由业务而非Lib主动的拉取消息并对数据处理的结果进行成功与否的确认,将数据处理的主动权留给业务。这样处理后,虽然服务端压力有所提升,但业务消费时积压情况可大大缓解。
客户端与服务器端RPC交互过程:

客户端要维持已发请求消息的本地保存,直到RPC超时,或者收到响应消息,响应消息通过请求发送时生成的SerialNo关联;从服务器端收到的Broker信息,以及Topic信息,SDK要保存在本地,并根据最新的返回信息进行更新,以及定期的上报给服务器端;SDK要维持到Master或者Broker的心跳,如果发现Master反馈注册超时错误时,要进行重注册操作;SDK要基于Broker进行连接建立,同一个进程不同对象之间,要允许业务进行选择,是支持按对象建立连接,还是按照进程建立连接。
spring整合TubeMQ
网上基本没有找着相关spring整合tubemq以及相关的学习资料,除了官网那些很久也没更新的...所以在java基础之上做一个简单的整合...如下
spring生产
│ pom.xml │ spring_tubemq_producer.iml │ ├─src │ ├─main │ │ ├─java │ │ │ └─com │ │ │ └─hong │ │ │ └─spring │ │ │ ├─config │ │ │ │ CommonTopic.java │ │ │ │ TubeMqProducer.java │ │ │ │ │ │ │ └─controller │ │ │ │ UserController.java │ │ │ │ │ │ │ └─ao │ │ │ UserSaveAO.java │ │ │ │ │ └─resources │ │ application.properties │ │ applicationContext.xml │ │ log4j2.xml │ │ logging.properties │ │ tubemq.properties │ │ tubemq.xml │ │ │ └─test │ └─java └─web └─WEB-INF web.xml复制

com.hong.spring.config.CommonTopic
package com.hong.spring.config; /** * @author: csh * @Date: 2021/5/18 10:56 * @Description:公共的topic */ public class CommonTopic { //用户mq public static final String TUBETOPIC="tubemq_spring_user"; }复制
com.hong.spring.config.TubeMqProducer
package com.hong.spring.config; import com.alibaba.fastjson.JSONObject; import lombok.extern.log4j.Log4j2; import org.apache.commons.codec.binary.StringUtils; import org.apache.tubemq.client.config.TubeClientConfig; import org.apache.tubemq.client.exception.TubeClientException; import org.apache.tubemq.client.factory.MessageSessionFactory; import org.apache.tubemq.client.factory.TubeSingleSessionFactory; import org.apache.tubemq.client.producer.MessageProducer; import org.apache.tubemq.client.producer.MessageSentCallback; import org.apache.tubemq.client.producer.MessageSentResult; import org.apache.tubemq.corebase.Message; /** * @author: csh * @Date: 2021/4/27 18:06 * @Description:tubemq配置 */ @Log4j2 public class TubeMqProducer { /**服务端 */ private static MessageProducer messageProducer; /**服务地址 */ private String url; public TubeMqProducer(String url) { this.url = url; } public void init () throws TubeClientException { TubeClientConfig clientConfig =new TubeClientConfig(url); MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig); messageProducer = messageSessionFactory.createProducer(); } /** * * 功能描述: 发送消息 * * @param: * @return: * @auther: csh * @date: 2021/4/28 17:05 */ private static Boolean resultFlag; public Boolean send(String msg,String topic) { try { byte[] bodyData = StringUtils.getBytesUtf8(msg); messageProducer.publish(topic); final Message message = new Message(topic, bodyData); messageProducer.sendMessage(message, new MessageSentCallback(){ public void onMessageSent(MessageSentResult result) { if (result.isSuccess()) { resultFlag=true; log.info("同步发送消息成功 : " + JSONObject.toJSONString(message)); } else { resultFlag =false; log.info("发送消息出错 : " + result.getErrMsg()); } } public void onException(Throwable e) { log.error("同步消息出错 : " + e); } }); return resultFlag; }catch (Exception e){ log.error("发送失败{}",e); } return false; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } /** * * 功能描述: 关闭 * * @param: * @return: * @auther: csh * @date: 2021/5/18 14:50 */ public void close(){ try { if(null!=messageProducer){ messageProducer.shutdown(); } }catch (Exception e){ log.error("关闭失败{}",e); } catch (Throwable throwable) { log.error("关闭失败{}",throwable); throwable.printStackTrace(); } } }复制
com.hong.spring.controller.ao.UserSaveAO
package com.hong.spring.controller.ao; import lombok.Data; import java.io.Serializable; /** * @author: csh * @Date: 2021/3/16 11:21 * @Description:用户入参 */ @Data public class UserSaveAO implements Serializable { private Integer id; private String username; private Integer age; }复制
com.hong.spring.controller.UserController
package com.hong.spring.controller; import com.alibaba.fastjson.JSONObject; import com.hong.spring.config.CommonTopic; import com.hong.spring.config.TubeMqProducer; import com.hong.spring.controller.ao.UserSaveAO; import com.hong.spring.utils.DataResponse; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Auther: csh * @Date: 2020/8/18 16:11 * @Description: */ @RestController @RequestMapping("/user/") @Log4j2 public class UserController { @Autowired private TubeMqProducer push; @RequestMapping("save") public DataResponse<Boolean> save(UserSaveAO ao){ log.info("添加用户入参{}",JSONObject.toJSONString(ao)); if(null==ao){ return DataResponse.BuildFailResponse("参数不能为空!"); } try { Boolean send = push.send(JSONObject.toJSONString(ao),CommonTopic.TUBETOPIC); if(null==send || !send){ return DataResponse.BuildFailResponse("添加用户失败!"); } return DataResponse.BuildFailResponse("添加用户成功!"); }catch (Exception e){ log.error("添加出错{}",e); return DataResponse.BuildFailResponse("添加出错请重试!"); } } }复制
相关配置文件
application.properties
logging.level.root=WARN logging.level.org.springframework.web=DEBUG logging.level.org.hibernate=ERROR复制
applicationContext.xml
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:mvc="http://www.springframework.org/schema/mvc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd"> <!-- 配置组件扫描 --> <context:component-scan base-package="com.hong.spring"></context:component-scan> <!--加载配置文件--> <context:property-placeholder location="classpath:tubemq.properties"/> <!-- 开启注解 --> <context:annotation-config /> <mvc:default-servlet-handler /> <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver" id="internalResourceViewResolver"> <!-- 前缀 --> <property name="prefix" value="/WEB-INF/pages/" /> <!-- 后缀 --> <property name="suffix" value=".html" /> <property name="contentType" value="text/html"/> </bean> <!--开启mvc注解事务--> <!-- 定义注解驱动 --> <mvc:annotation-driven> <mvc:message-converters> <!-- 设置支持中文 --> <bean class="org.springframework.http.converter.StringHttpMessageConverter"> <property name="supportedMediaTypes"> <list> <value>text/plain;charset=UTF-8</value> <value>text/html;charset=UTF-8</value> </list> </property> </bean> <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/> </mvc:message-converters> </mvc:annotation-driven> </beans>复制
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration status="INFO"> <appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> </Console> <RollingFile name="RollingFile" fileName="logs/app.log" filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz"> <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/> <SizeBasedTriggeringPolicy size="5 MB"/> </RollingFile> </appenders> <loggers> <root level="DEBUG"> <appender-ref ref="Console"/> <appender-ref ref="RollingFile"/> </root> </loggers> </configuration>复制
logging.properties
org.apache.catalina.core.ContainerBase.[Catalina].level=INFO org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler ############################################################ # Handler specific properties. # Describes specific configuration info for Handlers. ############################################################ org.apache.juli.FileHandler.level = FINE org.apache.juli.FileHandler.directory = ../logs org.apache.juli.FileHandler.prefix = error-debug. java.util.logging.ConsoleHandler.level = FINE java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter复制
tubemq.properties
tubemq.url=localhost:8000复制
tubemq.xml
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"> <!--zero配置--> <bean id="push" class="com.hong.spring.config.TubeMqProducer" init-method="init" destroy-method="close"> <constructor-arg name="url" value="${tubemq.url}" /> </bean> </beans>复制
WEB-INF/web.xml
<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" version="3.1"> <servlet> <servlet-name>spring_tubemq_producer</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:applicationContext.xml, classpath:tubemq.xml </param-value> </init-param> <load-on-startup>1</load-on-startup> </servlet> <filter> <filter-name>encodingFilter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>UTF-8</param-value> </init-param> <init-param> <param-name>forceEncoding</param-name> <param-value>true</param-value> </init-param> </filter> <filter-mapping> <filter-name>encodingFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> <servlet-mapping> <servlet-name>spring_tubemq_producer</servlet-name> <url-pattern>/</url-pattern> </servlet-mapping> </web-app>复制
spring_mq/spring_tubemq_producer/pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>spring_mq</artifactId> <groupId>com.hong</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <groupId>com.hong.tubemq</groupId> <artifactId>spring_tubemq_producer</artifactId> <properties> <tubemq-client-version>0.8.0-incubating</tubemq-client-version> </properties> <dependencies> <dependency> <artifactId>spring_mq_common_api</artifactId> <version>1.0-SNAPSHOT</version> <groupId>com.hong</groupId> </dependency> <dependency> <groupId>org.apache.tubemq</groupId> <artifactId>tubemq-client</artifactId> <version>${tubemq-client-version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.71</version> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/java</directory> <includes> <include>**/*.properties</include> <include>**/*.xml</include> </includes> <filtering>false</filtering> </resource> <resource> <directory>src/main/resources</directory> <includes> <include>**/*.properties</include> <include>**/*.xml</include> </includes> <filtering>false</filtering> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> </plugin> </plugins> </build> </project>复制
tomcat启动配置

postman发送配置
username:spring_tubemq age:1复制

结果

因为我的topic没有建,所以可以直接确认这个默认的情况下tubemq的topic需要手动来建。
重新添加topic

11:23:22.108 [http-nio-8683-exec-3] INFO com.hong.spring.controller.UserController - 添加用户入参{"age":1,"username":"spring_tubemq"} 11:23:22.108 [http-nio-8683-exec-3] INFO org.apache.tubemq.client.producer.ProducerManager - [Publish begin 1] publish topic tubemq_spring_user, address = org.apache.tubemq.client.producer.ProducerManager@23c7763a 11:23:22.197 [pool-6-thread-2] INFO com.hong.spring.config.TubeMqProducer - 同步发送消息成功 : {"data":"eyJhZ2UiOjEsInVzZXJuYW1lIjoic3ByaW5nX3R1YmVtcSJ9","flag":0,"indexId":0,"topic":"tubemq_spring_user"} 11:24:54.517 [http-nio-8683-exec-6] INFO com.hong.spring.controller.UserController - 添加用户入参{"age":1,"username":"spring_tubemq"} 11:24:54.517 [http-nio-8683-exec-6] INFO org.apache.tubemq.client.producer.ProducerManager - [Publish begin 1] publish topic tubemq_spring_user, address = org.apache.tubemq.client.producer.ProducerManager@23c7763a 11:24:54.519 [pool-6-thread-1] INFO com.hong.spring.config.TubeMqProducer - 同步发送消息成功 : {"data":"eyJhZ2UiOjEsInVzZXJuYW1lIjoic3ByaW5nX3R1YmVtcSJ9","flag":0,"indexId":0,"topic":"tubemq_spring_user"}复制

消费端
│ pom.xml │ spring_tubemq_consumer.iml │ ├─src │ ├─main │ │ ├─java │ │ │ └─com │ │ │ └─hong │ │ │ └─spring │ │ │ ├─config │ │ │ │ TubeMqConsumer.java │ │ │ │ │ │ │ ├─dao │ │ │ │ UserMapper.java │ │ │ │ │ │ │ ├─listener │ │ │ │ UserListener.java │ │ │ │ │ │ │ ├─mapper │ │ │ │ UserMapper.xml │ │ │ │ │ │ │ └─provider │ │ │ UserServiceImpl.java │ │ │ │ │ └─resources │ │ application.properties │ │ applicationContext.xml │ │ jdbc.properties │ │ log4j2.xml │ │ logging.properties │ │ mybatis.xml │ │ tubemq.properties │ │ tubemq.xml │ │ │ └─test │ └─java └─web └─WEB-INF web.xml复制

源码实现
com.hong.spring.config.TubeMqConsumer
import com.alibaba.fastjson.JSONObject; import com.hong.spring.listener.UserListener; import lombok.extern.log4j.Log4j2; import org.apache.tubemq.client.config.ConsumerConfig; import org.apache.tubemq.client.consumer.ConsumePosition; import org.apache.tubemq.client.consumer.ConsumerResult; import org.apache.tubemq.client.consumer.PullMessageConsumer; import org.apache.tubemq.client.factory.MessageSessionFactory; import org.apache.tubemq.client.factory.TubeSingleSessionFactory; import org.apache.tubemq.corebase.Message; import org.apache.tubemq.corebase.utils.ThreadUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.util.List; /** * @author: csh * @Date: 2021/4/27 18:06 * @Description:zero配置 */ @Log4j2 public class TubeMqConsumer { //服务地址 private String addrHost; //topic private String topic; //消费组 private String group; // private MessageSessionFactory messageSessionFactory; // PullMessageConsumer messagePullConsumer; @Autowired private UserListener userListener; public TubeMqConsumer(String addrHost, String topic, String group) { this.addrHost = addrHost; this.topic = topic; this.group = group; } public void init(){ try { final ConsumerConfig consumerConfig = new ConsumerConfig(addrHost, group); consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET); messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig); messagePullConsumer.subscribe(topic, null); messagePullConsumer.completeSubscribe(); recvStr(); }catch (Exception e){ log.error("tubemq初始化失败",e); } } @Async public void recvStr(){ try { log.info("开始自动拉取消息!"); while (true){ ConsumerResult result = messagePullConsumer.getMessage(); if (result.isSuccess()) { List<Message> messageList = result.getMessageList(); for (Message message : messageList) { log.info("接收到的消息: " + JSONObject.toJSONString(message)+"内容是:"+new String(message.getData())); userListener.Listener(new String(message.getData())); } messagePullConsumer.confirmConsume(result.getConfirmContext(), true); } else { if (result.getErrCode() == 400) { ThreadUtils.sleep(100); } else { if (result.getErrCode() != 404) { log.error("Receive messages errorCode is %d, Error message is %s", result.getErrCode(), result.getErrMsg()); } } } } }catch (Exception e){ log.error("接收消失败请重试{}",e); } } /** * * 功能描述: 关闭接口 * * @param: * @return: * @auther: csh * @date: 2021/5/18 14:25 */ public void close(){ try { if(messageSessionFactory!=null){ messageSessionFactory.shutdown(); } }catch (Exception e){ log.error("关闭tubemq失败{}",e); } } }复制
com.hong.spring.dao.UserMapper
import com.hong.spring.entity.User; import com.hong.spring.entity.ao.UserAO; import org.apache.ibatis.annotations.Param; import java.util.List; /** * @Auther: csh * @Date: 2020/8/18 15:04 * @Description:用户dao层 */ public interface UserMapper { /** * * 功能描述:查询总条数 * * @param: * @return: * @auther: csh * @date: 2020/8/18 15:31 */ List<User> findAllUserList(); /** * * 功能描述:获取总数 * * @param: * @return: * @auther: csh * @date: 2020/8/18 15:30 */ int findAllTotal(); /** * * 功能描述:更新 * * @param: * @return: * @auther: csh * @date: 2020/8/18 15:30 */ int update(User user); /** * * 功能描述:添加 * * @param: * @return: * @auther: csh * @date: 2020/8/19 18:39 */ int save(User user); /** * * 功能描述:批量添加 * * @param: * @return: * @auther: csh * @date: 2020/8/21 15:46 */ int insertBatch(@Param("list") List <User> list); /** * * 功能描述:通过id查询 * * @param: * @return: * @auther: csh * @date: 2020/8/19 18:39 */ User findById(int id); /** * * 功能描述:通过分页查询 * * @param: * @return: * @auther: csh * @date: 2020/8/21 16:05 */ List<User> findByPage(UserAO ao); }复制
com.hong.spring.listener.UserListener
import com.alibaba.fastjson.JSONObject; import com.hong.spring.api.IUserService; import com.hong.spring.entity.User; import com.hong.spring.utils.DataResponse; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author: csh * @Date: 2021/3/16 11:14 * @Description:用户监听 */ @Log4j2 @Component public class UserListener { @Autowired private IUserService userService; public void Listener(String str){ log.info("获取的用户信息{}", str); User user = JSONObject.parseObject(str, User.class); DataResponse <Boolean> save = userService.save(user); if(save==null || save.getData()==null || !save.getData()){ log.info("添加失败,原因{}",JSONObject.toJSONString(save)); } } }复制
com/hong/spring/mapper/UserMapper.xml
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.hong.spring.dao.UserMapper"> <resultMap type="com.hong.spring.entity.User" id="user"> <id column="id" property="id" /> <result column="user_name" property="username" /> <result column="age" property="age" /> </resultMap> <select id="findById" resultType="com.hong.spring.entity.User"> SELECT * FROM user WHERE id = #{id,jdbcType=INTEGER} </select> <select id="findByPage" resultMap="user" parameterType="com.hong.spring.entity.ao.UserAO"> select * from user where 1=1 limit #{page},#{pageSize} </select> <select id="findAllUserList" resultMap="user"> SELECT * FROM user </select> <select id="findAllTotal" resultType="int"> SELECT count(*) FROM user </select> <insert id="save" > INSERT INTO user ( user_name, age) VALUES (#{username,jdbcType=VARCHAR}, #{age,jdbcType=INTEGER}) </insert> <insert id="insertBatch"> insert into user ( user_name, age) values <foreach collection="list" item="user" index="index" separator=","> (#{user.username,jdbcType=VARCHAR},#{user.age,jdbcType=INTEGER}) </foreach> </insert> <update id="update" > update user <set> <if test="username !=null"> user_name=#{username,jdbcType=VARCHAR}, </if> <if test="age !=null"> age =#{age,jdbcType=INTEGER} </if> </set> where id = #{id,jdbcType=INTEGER} </update> </mapper>复制
com.hong.spring.provider.UserServiceImpl
import com.hong.spring.api.IUserService; import com.hong.spring.dao.UserMapper; import com.hong.spring.entity.User; import com.hong.spring.entity.ao.UserAO; import com.hong.spring.utils.DataResponse; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.List; /** * @Auther: csh * @Date: 2020/8/18 15:16 * @Description:用户实现 */ @Service("userService") public class UserServiceImpl implements IUserService { @Autowired private UserMapper userDao; @Override public DataResponse<List<User>> findByAll() { List <User> allUserList = userDao.findAllUserList(); int allTotal = userDao.findAllTotal(); return DataResponse.BuildSuccessResponse(allUserList,allTotal); } @Override @Transactional public DataResponse <Boolean> save(User user) { if(null==user){ return DataResponse.BuildFailResponse("必传参数不能为空!"); } int save = userDao.save(user); return DataResponse.BuildSuccessResponse(save>0?true:false); } @Override public DataResponse <Boolean> insertBatch(List <User> list) { if(null==list){ return DataResponse.BuildFailResponse("参数不能为空!"); } int batchSave = userDao.insertBatch(list); return DataResponse.BuildSuccessResponse(batchSave>0?true:false); } @Override @Transactional public DataResponse <Boolean> update(User user) { if(null==user || user.getId()==null){ return DataResponse.BuildFailResponse("必传参数不能为空!"); } int update = userDao.update(user); return DataResponse.BuildSuccessResponse(update>0?true:false); } @Override public DataResponse <User> findById(int i) { User byId = userDao.findById(i); return DataResponse.BuildSuccessResponse(byId); } @Override public DataResponse <List <User>> findByPage(UserAO ao) { if(ao==null){ ao.setPage(0); ao.setPageSize(10); }else{ ao.setPage(ao.getPageSize() * ao.getPage()); } int allTotal = userDao.findAllTotal(); List <User> byPage = userDao.findByPage(ao); return DataResponse.BuildSuccessResponse(byPage,allTotal); } }复制
相关配置文件
application.properties
logging.level.root=WARN logging.level.org.springframework.web=DEBUG logging.level.org.hibernate=ERROR复制
applicationContext.xml
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:mvc="http://www.springframework.org/schema/mvc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd"> <!-- 配置组件扫描 --> <context:component-scan base-package="com.hong.spring"></context:component-scan> <!--加载配置文件--> <context:property-placeholder location="classpath:jdbc.properties,classpath:tubemq.properties"/> <!-- 开启注解 --> <context:annotation-config /> <!--开启注解事务--> <tx:annotation-driven transaction-manager="transactionManager" /> <!--放行静态资源--> <mvc:default-servlet-handler /> <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver" id="internalResourceViewResolver"> <!-- 前缀 --> <property name="prefix" value="/WEB-INF/pages/" /> <!-- 后缀 --> <property name="suffix" value=".html" /> <property name="contentType" value="text/html"/> </bean> <!--开启mvc注解事务--> <!-- 定义注解驱动 --> <mvc:annotation-driven> <mvc:message-converters> <!-- 设置支持中文 --> <bean class="org.springframework.http.converter.StringHttpMessageConverter"> <property name="supportedMediaTypes"> <list> <value>text/plain;charset=UTF-8</value> <value>text/html;charset=UTF-8</value> </list> </property> </bean> <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/> </mvc:message-converters> </mvc:annotation-driven> <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource"> <!-- 基础配置 --> <property name="url" value="${jdbc.url}"></property> <property name="driverClassName" value="${jdbc.driver}"></property> <property name="username" value="${jdbc.user}"></property> <property name="password" value="${jdbc.password}"></property> <!-- 关键配置 --> <!-- 初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时 --> <property name="initialSize" value="3" /> <!-- 最小连接池数量 --> <property name="minIdle" value="2" /> <!-- 最大连接池数量 --> <property name="maxActive" value="15" /> <!-- 配置获取连接等待超时的时间 --> <property name="maxWait" value="10000" /> <!-- 性能配置 --> <!-- 打开PSCache,并且指定每个连接上PSCache的大小 --> <property name="poolPreparedStatements" value="true" /> <property name="maxPoolPreparedStatementPerConnectionSize" value="20" /> <!-- 其他配置 --> <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 --> <property name="timeBetweenEvictionRunsMillis" value="60000" /> <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 --> <property name="minEvictableIdleTimeMillis" value="300000" /> <!-- 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis, 执行validationQuery检测连接是否有效。--> <property name="testWhileIdle" value="true" /> <!-- 这里建议配置为TRUE,防止取到的连接不可用 ,申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。--> <property name="testOnBorrow" value="true" /> <!-- 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能 --> <property name="testOnReturn" value="false" /> </bean> <!--事务管理器--> <!-- sqlSessionFactory --> <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> <!-- 加载 MyBatis 的配置文件 --> <property name="configLocation" value="classpath:mybatis.xml"/> <!-- 数据源 --> <property name="dataSource" ref="dataSource"/> <!-- 所有配置的mapper文件 --> <property name="mapperLocations" value="classpath*:com/hong/spring/mapper/*.xml" /> </bean> <!-- Mapper 扫描器 --> <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer"> <!-- 扫描 包下的组件 --> <property name="basePackage" value="com.hong.spring.dao" /> <!-- 关联mapper扫描器 与 sqlsession管理器 --> <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" /> </bean> <!--事务配置--> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean> </beans>复制
jdbc.properties
config.properties: #数据库驱动 jdbc.driver=com.mysql.jdbc.Driver #数据库连接url jdbc.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8 #数据库用户名 jdbc.user=root #数据库密码 jdbc.password=123456复制
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration status="INFO"> <appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> </Console> <RollingFile name="RollingFile" fileName="logs/app.log" filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz"> <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/> <SizeBasedTriggeringPolicy size="5 MB"/> </RollingFile> </appenders> <loggers> <root level="DEBUG"> <appender-ref ref="Console"/> <appender-ref ref="RollingFile"/> </root> </loggers> </configuration>复制
logging.properties
org.apache.catalina.core.ContainerBase.[Catalina].level=INFO org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler org.apache.jasper.servlet.TldScanner.level = FINE handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler ############################################################ # Handler specific properties. # Describes specific configuration info for Handlers. ############################################################ org.apache.juli.FileHandler.level = FINE org.apache.juli.FileHandler.directory = ../logs org.apache.juli.FileHandler.prefix = error-debug. java.util.logging.ConsoleHandler.level = FINE java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter复制
mybatis.xml
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd"> <configuration> <!-- settings --> <settings> <!-- 打开延迟加载的开关 --> <setting name="lazyLoadingEnabled" value="true"/> <!-- 将积极加载改为消极加载(即按需加载) --> <setting name="aggressiveLazyLoading" value="false"/> <!-- 打开全局缓存开关(二级缓存)默认值就是 true --> <setting name="cacheEnabled" value="true"/> <!-- 开启驼峰命名转换 Table(create_time) -> Entity(createtime) --> <setting name="mapUnderscoreToCamelCase" value="true"/> <!-- 使用列别名代替列名 默认:true seslect name as title from table --> <setting name="useColumnLabel" value="true"/> <!--使用jdbc的getGeneratedKeys获取数据库自增主键值--> <setting name="useGeneratedKeys" value="true"/> </settings> <!-- 别名定义 --> <typeAliases> <package name="com.hong.spring.entity"/> </typeAliases> </configuration>复制
tubemq.properties
tubemq.url=localhost:8000 tubemq.topic_user=tubemq_spring_user tubemq.group=tubemq_hong_consumer复制
tubemq.xml
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"> <!--tubemq配置--> <bean id="pull" class="com.hong.spring.config.TubeMqConsumer" init-method="init" destroy-method="close"> <constructor-arg name="addrHost" value="${tubemq.url}" /> <constructor-arg name="topic" value="${tubemq.topic_user}" /> <constructor-arg name="group" value="${tubemq.group}" /> </bean> </beans>复制
WEB-INF/web.xml
<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" version="3.1"> <servlet> <servlet-name>spring_tubemq_consumer</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:applicationContext.xml, classpath:tubemq.xml </param-value> </init-param> <load-on-startup>1</load-on-startup> </servlet> <filter> <filter-name>encodingFilter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>UTF-8</param-value> </init-param> <init-param> <param-name>forceEncoding</param-name> <param-value>true</param-value> </init-param> </filter> <filter-mapping> <filter-name>encodingFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> <servlet-mapping> <servlet-name>spring_tubemq_consumer</servlet-name> <url-pattern>/</url-pattern> </servlet-mapping> </web-app>复制
spring_mq/spring_tubemq_consumer/pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>spring_mq</artifactId> <groupId>com.hong</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <groupId>com.hong.tubemq</groupId> <artifactId>spring_tubemq_consumer</artifactId> <properties> <tubemq-client-version>0.8.0-incubating</tubemq-client-version> </properties> <dependencies> <dependency> <artifactId>spring_mq_common_api</artifactId> <version>1.0-SNAPSHOT</version> <groupId>com.hong</groupId> </dependency> <dependency> <groupId>org.apache.tubemq</groupId> <artifactId>tubemq-client</artifactId> <version>${tubemq-client-version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.71</version> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/java</directory> <includes> <include>**/*.properties</include> <include>**/*.xml</include> </includes> <filtering>false</filtering> </resource> <resource> <directory>src/main/resources</directory> <includes> <include>**/*.properties</include> <include>**/*.xml</include> </includes> <filtering>false</filtering> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>6</source> <target>6</target> </configuration> </plugin> </plugins> </build> </project>复制
结果

数据库结果


简单做了下整合,用是可以用,可以在这个基础上再继续完善相关的功能。
springboot 整合TubeMq
因为官网也没有相关的支持,做了也没有啥太大的意义,等后续有支持再统一整合....(感觉没跟上时代...)
最后
相对来说这个TubeMq没有之前tars那么恶心,相对简洁功能过得去,也文档比较简单,但是想了解下为啥,鹅产开源的项目总感觉要么很难整、要么感觉好久没维护,感觉遗弃的娃一样....,看着19年的相关宣传又很霸气,怎么总感觉有点虎头蛇尾,大家觉得呢?
相关推荐
- 【RocketMQ系列】RocketMQ集群,RocketMQ-on-DLedger集群 本文RocketMQ系列第四篇,主要介绍RocketMQ集群及如何部署自动容灾切换的 RocketMQ-on-DLedger Group。RocketMQ集群搭建ROcketMQ集群搭建有以下几种方案:「单Master模式」「多Master模式」「多Master多Slave模式-异步复制」「多Master多Slave模式-同步双写」其…
- 在 Outlook for Windows 中搜索电子邮件 在 Outlook 中, 使用"即时搜索"在拥挤收件箱或许多文件夹之一中快速查找电子邮件。使用搜索节省时间 选择搜索框。键入要查找的内容,例如主题的一部分或联系人的姓名。如果要缩小搜索范围,请在出现的"搜索"选项卡 中选择 一个选项:发件…