spring整合各种中间件(RocketMQ、kafka、RabbitMQ、TubeMQ、NSQ)

小编:管理员 37阅读 2022.08.01

本文源码下载:https://gitee.com/hong99/spring/issues/I1N1DF

TubeMQ是什么?

简介

TubeMQ是2019年腾讯在ApacheCon开源的一个消息中间件系统,性能优越。经过近7年、万亿规模的海量数据沉淀,TubeMQ目前日均接入量超过25万亿条消息。较之于其他的开源MQ组件,TubeMQ长期应用于真实生产环境中,在稳定性、性能和成本方面都有着核心优势。

https://inlong.apache.org/zh-cn/docs/quick_start.html

功能介绍

纯 Java 实现语言

引入 Master 协调节点:相比 Kafka 依赖于 Zookeeper 完成元数据的管理和实现 HA 保障不同,TubeMQ 系统采用的是自管理的元数据仲裁机制方式进行,Master 节点通过采用内嵌数据库 BDB 完成集群内元数据的存储、更新以及 HA 热切功能,负责 TubeMQ 集群的运行管控和配置管理操作,对外提供接口等;通过 Master 节点,TubeMQ 集群里的 Broker 配置设置、变更及查询实现了完整的自动化闭环管理,减轻了系统维护的复杂度

服务器侧消费负载均衡:TubeMQ 采用的是服务侧负载均衡的方案,而不是客户端侧操作,提升系统的管控能力同时简化客户端实现,更便于均衡算法升级

系统行级锁操作:对于 Broker 消息读写中存在中间状态的并发操作采用行级锁,避免重复问题

Offset 管理调整:Offset 由各个 Broker 独自管理,ZK 只作数据持久化存储用(最初考虑完全去掉ZK依赖,考虑到后续的功能扩展就暂时保留)

消息读取机制的改进:TubeMQ 采用的是消息随机读取模式,同时为了降低消息时延又增加了内存缓存读写,对于带 SSD 设备的机器,增加消息滞后转 SSD 消费的处理,解决消费严重滞后时吞吐量下降以及 SSD 磁盘容量小、刷盘次数有限的问题,使其满足业务快速生产消费的需求

消费者行为管控:支持通过策略实时动态地控制系统接入的消费者行为,包括系统负载高时对特定业务的限流、暂停消费,动态调整数据拉取的频率等;

服务分级管控:针对系统运维、业务特点、机器负载状态的不同需求,系统支持运维通过策略来动态控制不同消费者的消费行为,比如是否有权限消费、消费时延分级保证、消费限流控制,以及数据拉取频率控制等

系统安全管控:根据业务不同的数据服务需要,以及系统运维安全的考虑,TubeMQ 系统增加了 TLS 传输层加密管道,生产和消费服务的认证、授权,以及针对分布式访问控制的访问令牌管理,满足业务和系统运维在系统安全方面的需求

资源利用率提升改进:相比于 Kafka,TubeMQ 采用连接复用模式,减少连接资源消耗;通过逻辑分区构造,减少系统对文件句柄数的占用,通过服务器端过滤模式,减少网络带宽资源使用率;通过剥离对 Zookeeper 的使用,减少 Zookeeper 的强依赖及瓶颈限制

客户端改进:基于业务使用上的便利性以,我们简化了客户端逻辑,使其做到最小的功能集合,我们采用基于响应消息的接收质量统计算法来自动剔出坏4的 Broker 节点,基于首次使用时作连接尝试来避免大数据量发送时发送受阻。

Portal:负责对外交互和运维操作的Portal部分,包括API和Web两块,API对接集群之外的管理系统,Web是在API基础上对日常运维功能做的页面封装;

Master: 负责集群控制的Control部分,该部分由1个或多个Master节点组成,Master HA通过Master节点间心跳保活、实时热备切换完成(这是大家使用TubeMQ的Lib时需要填写对应集群所有Master节点地址的原因),主Master负责管理整个集群的状态、资源调度、权限检查、元数据查询等;

Broker: 负责实际数据存储的Store部分,该部分由相互之间独立的Broker节点组成,每个Broker节点对本节点内的Topic集合进行管理,包括Topic的增、删、改、查,Topic内的消息存储、消费、老化、分区扩容、数据消费的offset记录等,集群对外能力,包括Topic数目、吞吐量、容量等,通过水平扩展Broker节点来完成;

Client: 负责数据生产和消费的Client部分,该部分我们以Lib形式对外提供,大家用得最多的是消费端,相比之前,消费端现支持Push、Pull两种数据拉取模式,数据消费行为支持顺序和过滤消费两种。对于Pull消费模式,支持业务通过客户端重置精确offset以支持业务exactly-once消费,同时,消费端新推出跨集群切换免重启的BidConsumer客户端;

Zookeeper: 负责offset存储的zk部分,该部分功能已弱化到仅做offset的持久化存储,考虑到接下来的多节点副本功能该模块暂时保留。

详细请对考这里:https://inlong.apache.org/zh-cn/docs/architecture.html

源码实现

直接上docker (强列建议用docker哈,之前的tars自己部署搞了好几个星期....)

docker run -p 8080:8080 -p 8000:8000 -p 8123:8123 --name tubemq -d apachetubemq/tubemq-all:latest
复制

然后运行:localhost:8080 如下

集群部署参考:https://inlong.apache.org/zh-cn/docs/quick_start.html

新增topic

默认这里的授权字段是:abc

纯java实现

引入jar包

<properties>
    <tubemq-client-version>0.8.0-incubating</tubemq-client-version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.tubemq</groupId>
        <artifactId>tubemq-client</artifactId>
        <version>${tubemq-client-version}</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.71</version>
    </dependency>
</dependencies>
复制

生产端

生产-异步消息

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.client.producer.MessageProducer;
import org.apache.tubemq.client.producer.MessageSentCallback;
import org.apache.tubemq.client.producer.MessageSentResult;
import org.apache.tubemq.corebase.Message;
/**
 *
 * 功能描述: 异步消息
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/5/16 18:04
 */
public final class AsyncProducerExample {

   public static void main(String[] args) throws Throwable {
       final String masterHostAndPort = "localhost:8000";
       final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
       final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
       final MessageProducer messageProducer = messageSessionFactory.createProducer();
       final String topic = "java_tubemq";
       final String body = "发送异步消息!";
       byte[] bodyData = StringUtils.getBytesUtf8(body);
       messageProducer.publish(topic);
       final Message message = new Message(topic, bodyData);
       messageProducer.sendMessage(message, new MessageSentCallback(){
           public void onMessageSent(MessageSentResult result) {
               if (result.isSuccess()) {
                   System.out.println("async send message : " + JSONObject.toJSONString(message));
               } else {
                   System.out.println("async send message failed : " + result.getErrMsg());
               }
           }
           public void onException(Throwable e) {
               System.out.println("async send message error : " + e);
           }
       });
       messageProducer.shutdown();
   }

}
复制

发送结果

async send message : {"data":"5Y+R6YCB5byC5q2l5raI5oGv77yB","flag":0,"indexId":0,"topic":"java_tubemq"}
复制

同步消息-生产

import com.alibaba.fastjson.JSON;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.client.producer.MessageProducer;
import org.apache.tubemq.client.producer.MessageSentResult;
import org.apache.tubemq.corebase.Message;
/**
 *
 * 功能描述:同步消息
 *
 * @param: 
 * @return: 
 * @auther: csh
 * @date: 2021/5/16 17:45
 */
public final class SyncProducerExample {

   public static void main(String[] args) throws Throwable {
       //主节点地址
       final String masterHostAndPort = "localhost:8000";
       final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
       final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
       final MessageProducer messageProducer = messageSessionFactory.createProducer();
       //topic 刚刚创建那个
       final String topic = "java_tubemq";
       final String body = "hello,i'm hong!How are you?";
       byte[] bodyData = StringUtils.getBytesUtf8(body);
       messageProducer.publish(topic);
       Message message = new Message(topic, bodyData);
       MessageSentResult result = messageProducer.sendMessage(message);
       //判断结果 如果成功打印出 sync send message 加上消息内容
       if (result.isSuccess()) {
           System.out.println("同步发送出去的消息: " + JSON.toJSONString(message));
       }
       //关闭服务
       messageProducer.shutdown();
   }
}
复制

发送结果

同步发送出去的消息: {"data":"aGVsbG8saSdtIGhvbmchSG93IGFyZSB5b3U/","flag":0,"indexId":0,"topic":"java_tubemq"}
复制

消费端

import com.alibaba.fastjson.JSONObject;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.client.consumer.ConsumerResult;
import org.apache.tubemq.client.consumer.PullMessageConsumer;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;
import org.apache.tubemq.corebase.utils.ThreadUtils;

import java.util.List;
/**
 *
 * 功能描述: 通过pull拉取消息
 *
 * @param: 
 * @return: 
 * @auther: csh
 * @date: 2021/5/16 17:51
 */
public class PullConsumerExample {

     public static void main(String[] args) throws Throwable {
         //服务地址
         final String masterHostAndPort = "localhost:8000";
         //topic
         final String topic = "java_tubemq";
         //消费组
         final String group = "hong-group1";
         final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
         consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
         final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
         final PullMessageConsumer messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
         messagePullConsumer.subscribe(topic, null);
         messagePullConsumer.completeSubscribe();
         // wait for client to join the exact consumer queue that consumer group allocated
         while (!messagePullConsumer.isPartitionsReady(1000)) {
             ThreadUtils.sleep(1000);
         }
         while (true) {
             ConsumerResult result = messagePullConsumer.getMessage();
             if (result.isSuccess()) {
                 List<Message> messageList = result.getMessageList();
                 for (Message message : messageList) {
                     System.out.println("接收到的消息: " + JSONObject.toJSONString(message)+"内容是:"+new String(message.getData()));
                 }
                 messagePullConsumer.confirmConsume(result.getConfirmContext(), true);
             } else {
                 if (result.getErrCode() == 400) {
                     ThreadUtils.sleep(100);
                 } else {
                     if (result.getErrCode() != 404) {
                         System.out.println(String.format("Receive messages errorCode is %d, Error message is %s", result.getErrCode(), result.getErrMsg()));
                     }
                 }
             }
         }
     } 

 }
复制

结果

接收到的消息: {"data":"5Y+R6YCB5byC5q2l5raI5oGv77yB","flag":0,"indexId":1200513646816395264,"topic":"java_tubemq"}内容是:发送异步消息!
接收到的消息: {"data":"aGVsbG8saSdtIGhvbmchSG93IGFyZSB5b3U/","flag":0,"indexId":2057144306094833664,"topic":"java_tubemq"}内容是:hello,i'm hong!How are you?
复制
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.client.consumer.MessageListener;
import org.apache.tubemq.client.consumer.PushMessageConsumer;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
 *
 * 功能描述: push消息模式
 *
 * @param:
 * @return:
 * @auther: csh
 * @date: 2021/5/16 17:53
 */
public class PushConsumerExample {
     public static void main(String[] args) throws Throwable {
         final String masterHostAndPort = "localhost:8000";
         final String topic = "java_tubemq";
         final String group = "hong-group";
         final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
         consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
         final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
         final PushMessageConsumer pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
         pushConsumer.subscribe(topic, null, new MessageListener() {

             public void receiveMessages(List<Message> messages) throws InterruptedException {
                 for (Message message : messages) {
                     System.out.println("PUSH接收到的消息 : " + new String(message.getData()));
                 }
             }

             public Executor getExecutor() {
                 return null;
             }

             public void stop() {
                 //
             }
         });
         pushConsumer.completeSubscribe();
         CountDownLatch latch = new CountDownLatch(1);
         latch.await(10, TimeUnit.MINUTES);
     }
 }
复制

结果

PUSH接收到的消息 : 发送异步消息!
PUSH接收到的消息 : hello,i'm hong!How are you?
复制

关于数据拉取模式支持Push、Pull的区别:

Push客户端:TubeMQ最初消费端版本只提供Push模式的消费,这种模式能比较快速地消费数据,减轻服务端压力,但同时也带来一个问题,业务使用的时候因为无法控制拉取频率,从而容易形成数据积压数据处理不过来;

带消费中止/继续的Push客户端: 在收到业务反馈能否控制Push拉取动作的需求后,我们增加了resumeConsume()/pauseConsume()函数对,让业务可以模拟水位线控制机制,状态比较繁忙时调用pauseConsume()函数来中止Lib后台的数据拉取,在状态恢复后,再调用resumeConsume()通知Lib后台继续拉取数据;

Pull客户端: 我们后来版本里增加了Pull客户端,该客户端有别于Push客户端,是由业务而非Lib主动的拉取消息并对数据处理的结果进行成功与否的确认,将数据处理的主动权留给业务。这样处理后,虽然服务端压力有所提升,但业务消费时积压情况可大大缓解。

客户端与服务器端RPC交互过程:

客户端要维持已发请求消息的本地保存,直到RPC超时,或者收到响应消息,响应消息通过请求发送时生成的SerialNo关联;从服务器端收到的Broker信息,以及Topic信息,SDK要保存在本地,并根据最新的返回信息进行更新,以及定期的上报给服务器端;SDK要维持到Master或者Broker的心跳,如果发现Master反馈注册超时错误时,要进行重注册操作;SDK要基于Broker进行连接建立,同一个进程不同对象之间,要允许业务进行选择,是支持按对象建立连接,还是按照进程建立连接。

spring整合TubeMQ

网上基本没有找着相关spring整合tubemq以及相关的学习资料,除了官网那些很久也没更新的...所以在java基础之上做一个简单的整合...如下

spring生产

│ pom.xml
│ spring_tubemq_producer.iml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ ├─config
│ │ │ │ CommonTopic.java
│ │ │ │ TubeMqProducer.java
│ │ │ │
│ │ │ └─controller
│ │ │ │ UserController.java
│ │ │ │
│ │ │ └─ao
│ │ │ UserSaveAO.java
│ │ │
│ │ └─resources
│ │ application.properties
│ │ applicationContext.xml
│ │ log4j2.xml
│ │ logging.properties
│ │ tubemq.properties
│ │ tubemq.xml
│ │
│ └─test
│ └─java
└─web
    └─WEB-INF
            web.xml
复制

com.hong.spring.config.CommonTopic

package com.hong.spring.config;

/**
 * @author: csh
 * @Date: 2021/5/18 10:56
 * @Description:公共的topic
 */
public class CommonTopic  {
    //用户mq
    public static final String TUBETOPIC="tubemq_spring_user";
}
复制

com.hong.spring.config.TubeMqProducer

package com.hong.spring.config;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.exception.TubeClientException;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.client.producer.MessageProducer;
import org.apache.tubemq.client.producer.MessageSentCallback;
import org.apache.tubemq.client.producer.MessageSentResult;
import org.apache.tubemq.corebase.Message;

/**
 * @author: csh
 * @Date: 2021/4/27 18:06
 * @Description:tubemq配置
 */
@Log4j2
public class TubeMqProducer {


    /**服务端 */
    private static MessageProducer messageProducer;
    /**服务地址 */
    private String url;


    public TubeMqProducer(String url) {
        this.url = url;
    }

    public void init () throws TubeClientException {

        TubeClientConfig clientConfig =new TubeClientConfig(url);
        MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
        messageProducer = messageSessionFactory.createProducer();
    }

    /**
     *
     * 功能描述: 发送消息
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/4/28 17:05
     */
    private static Boolean resultFlag;
    public Boolean send(String msg,String topic)  {
        try {
            byte[] bodyData = StringUtils.getBytesUtf8(msg);
            messageProducer.publish(topic);
            final Message message = new Message(topic, bodyData);
            messageProducer.sendMessage(message, new MessageSentCallback(){
                public void onMessageSent(MessageSentResult result) {
                    if (result.isSuccess()) {
                        resultFlag=true;
                        log.info("同步发送消息成功 : " + JSONObject.toJSONString(message));
                    } else {
                        resultFlag =false;
                        log.info("发送消息出错 : " + result.getErrMsg());
                    }
                }
                public void onException(Throwable e) {
                    log.error("同步消息出错 : " + e);
                }
            });
            return resultFlag;
        }catch (Exception e){
            log.error("发送失败{}",e);
        }
        return false;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    /**
     *
     * 功能描述: 关闭
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/5/18 14:50
     */
    public void close(){
        try {
            if(null!=messageProducer){
                messageProducer.shutdown();
            }
        }catch (Exception e){
            log.error("关闭失败{}",e);
        } catch (Throwable throwable) {
            log.error("关闭失败{}",throwable);
            throwable.printStackTrace();
        }

    }
}
复制

com.hong.spring.controller.ao.UserSaveAO

package com.hong.spring.controller.ao;

import lombok.Data;

import java.io.Serializable;

/**
 * @author: csh
 * @Date: 2021/3/16 11:21
 * @Description:用户入参
 */
@Data
public class UserSaveAO implements Serializable {
    private Integer id;
    private String username;
    private Integer age;
}
复制

com.hong.spring.controller.UserController

package com.hong.spring.controller;

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.config.CommonTopic;
import com.hong.spring.config.TubeMqProducer;
import com.hong.spring.controller.ao.UserSaveAO;
import com.hong.spring.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Auther: csh
 * @Date: 2020/8/18 16:11
 * @Description:
 */
@RestController
@RequestMapping("/user/")
@Log4j2
public class UserController {

    @Autowired
    private TubeMqProducer push;


    @RequestMapping("save")
    public DataResponse<Boolean> save(UserSaveAO ao){
        log.info("添加用户入参{}",JSONObject.toJSONString(ao));
        if(null==ao){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
       try {
           Boolean send = push.send(JSONObject.toJSONString(ao),CommonTopic.TUBETOPIC);
           if(null==send || !send){
               return DataResponse.BuildFailResponse("添加用户失败!");
           }
           return DataResponse.BuildFailResponse("添加用户成功!");
       }catch (Exception e){
           log.error("添加出错{}",e);
           return DataResponse.BuildFailResponse("添加出错请重试!");
       }
    }
}
复制

相关配置文件

application.properties

logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR
复制

applicationContext.xml

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
      xmlns:mvc="http://www.springframework.org/schema/mvc"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">


   <!-- 配置组件扫描 -->
   <context:component-scan base-package="com.hong.spring"></context:component-scan>
   <!--加载配置文件-->
   <context:property-placeholder location="classpath:tubemq.properties"/>

   <!-- 开启注解 -->
   <context:annotation-config />

   <mvc:default-servlet-handler />


   <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
        id="internalResourceViewResolver">
      <!-- 前缀 -->
      <property name="prefix" value="/WEB-INF/pages/" />
      <!-- 后缀 -->
      <property name="suffix" value=".html" />
      <property name="contentType" value="text/html"/>
   </bean>

   <!--开启mvc注解事务-->
   <!-- 定义注解驱动 -->
   <mvc:annotation-driven>
      <mvc:message-converters>
         <!-- 设置支持中文 -->
         <bean class="org.springframework.http.converter.StringHttpMessageConverter">
            <property name="supportedMediaTypes">
               <list>
                  <value>text/plain;charset=UTF-8</value>
                  <value>text/html;charset=UTF-8</value>
               </list>
            </property>
         </bean>
         <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
      </mvc:message-converters>
   </mvc:annotation-driven>
</beans>
复制

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <RollingFile name="RollingFile" fileName="logs/app.log"
                     filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
            <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
            <SizeBasedTriggeringPolicy size="5 MB"/>
        </RollingFile>
    </appenders>
    <loggers>
        <root level="DEBUG">
            <appender-ref ref="Console"/>
            <appender-ref ref="RollingFile"/>
        </root>
    </loggers>
</configuration>
复制

logging.properties

org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
复制

tubemq.properties

tubemq.url=localhost:8000
复制

tubemq.xml

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">




   <!--zero配置-->
   <bean id="push" class="com.hong.spring.config.TubeMqProducer" init-method="init" destroy-method="close">
      <constructor-arg name="url" value="${tubemq.url}" />
   </bean>
</beans>
复制

WEB-INF/web.xml

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
         version="3.1">
    <servlet>
        <servlet-name>spring_tubemq_producer</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:applicationContext.xml,
                classpath:tubemq.xml
            </param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <servlet-mapping>
        <servlet-name>spring_tubemq_producer</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
</web-app>
复制

spring_mq/spring_tubemq_producer/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_mq</artifactId>
        <groupId>com.hong</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hong.tubemq</groupId>
    <artifactId>spring_tubemq_producer</artifactId>

    <properties>
        <tubemq-client-version>0.8.0-incubating</tubemq-client-version>
    </properties>

    <dependencies>
        <dependency>
            <artifactId>spring_mq_common_api</artifactId>
            <version>1.0-SNAPSHOT</version>
            <groupId>com.hong</groupId>
        </dependency>


        <dependency>
            <groupId>org.apache.tubemq</groupId>
            <artifactId>tubemq-client</artifactId>
            <version>${tubemq-client-version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.71</version>
        </dependency>

    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
复制

tomcat启动配置

postman发送配置

username:spring_tubemq
age:1
复制

结果

因为我的topic没有建,所以可以直接确认这个默认的情况下tubemq的topic需要手动来建。

重新添加topic

11:23:22.108 [http-nio-8683-exec-3] INFO com.hong.spring.controller.UserController - 添加用户入参{"age":1,"username":"spring_tubemq"}
11:23:22.108 [http-nio-8683-exec-3] INFO org.apache.tubemq.client.producer.ProducerManager - [Publish begin 1] publish topic tubemq_spring_user, address = org.apache.tubemq.client.producer.ProducerManager@23c7763a
11:23:22.197 [pool-6-thread-2] INFO com.hong.spring.config.TubeMqProducer - 同步发送消息成功 : {"data":"eyJhZ2UiOjEsInVzZXJuYW1lIjoic3ByaW5nX3R1YmVtcSJ9","flag":0,"indexId":0,"topic":"tubemq_spring_user"}
11:24:54.517 [http-nio-8683-exec-6] INFO com.hong.spring.controller.UserController - 添加用户入参{"age":1,"username":"spring_tubemq"}
11:24:54.517 [http-nio-8683-exec-6] INFO org.apache.tubemq.client.producer.ProducerManager - [Publish begin 1] publish topic tubemq_spring_user, address = org.apache.tubemq.client.producer.ProducerManager@23c7763a
11:24:54.519 [pool-6-thread-1] INFO com.hong.spring.config.TubeMqProducer - 同步发送消息成功 : {"data":"eyJhZ2UiOjEsInVzZXJuYW1lIjoic3ByaW5nX3R1YmVtcSJ9","flag":0,"indexId":0,"topic":"tubemq_spring_user"}
复制

消费端

│ pom.xml
│ spring_tubemq_consumer.iml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ ├─config
│ │ │ │ TubeMqConsumer.java
│ │ │ │
│ │ │ ├─dao
│ │ │ │ UserMapper.java
│ │ │ │
│ │ │ ├─listener
│ │ │ │ UserListener.java
│ │ │ │
│ │ │ ├─mapper
│ │ │ │ UserMapper.xml
│ │ │ │
│ │ │ └─provider
│ │ │ UserServiceImpl.java
│ │ │
│ │ └─resources
│ │ application.properties
│ │ applicationContext.xml
│ │ jdbc.properties
│ │ log4j2.xml
│ │ logging.properties
│ │ mybatis.xml
│ │ tubemq.properties
│ │ tubemq.xml
│ │
│ └─test
│ └─java
└─web
    └─WEB-INF
            web.xml
复制

源码实现

com.hong.spring.config.TubeMqConsumer

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.listener.UserListener;
import lombok.extern.log4j.Log4j2;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.client.consumer.ConsumerResult;
import org.apache.tubemq.client.consumer.PullMessageConsumer;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;
import org.apache.tubemq.corebase.utils.ThreadUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @author: csh
 * @Date: 2021/4/27 18:06
 * @Description:zero配置
 */
@Log4j2
public class TubeMqConsumer {

    //服务地址
    private String addrHost;
    //topic
    private String topic;
    //消费组
    private String group;
    //
    private MessageSessionFactory messageSessionFactory;
    //
    PullMessageConsumer messagePullConsumer;

    @Autowired
    private UserListener userListener;


    public TubeMqConsumer(String addrHost, String topic, String group) {
        this.addrHost = addrHost;
        this.topic = topic;
        this.group = group;
    }

    public void init(){
       try {
           final ConsumerConfig consumerConfig = new ConsumerConfig(addrHost, group);
           consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
           messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
           messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
           messagePullConsumer.subscribe(topic, null);
           messagePullConsumer.completeSubscribe();
           recvStr();
       }catch (Exception e){
            log.error("tubemq初始化失败",e);
       }
    }

    @Async
    public void recvStr(){
        try {
            log.info("开始自动拉取消息!");

            while (true){
                ConsumerResult result = messagePullConsumer.getMessage();
                if (result.isSuccess()) {
                    List<Message> messageList = result.getMessageList();
                    for (Message message : messageList) {
                        log.info("接收到的消息: " + JSONObject.toJSONString(message)+"内容是:"+new String(message.getData()));
                        userListener.Listener(new String(message.getData()));
                    }
                    messagePullConsumer.confirmConsume(result.getConfirmContext(), true);
                } else {
                    if (result.getErrCode() == 400) {
                        ThreadUtils.sleep(100);
                    } else {
                        if (result.getErrCode() != 404) {
                            log.error("Receive messages errorCode is %d, Error message is %s", result.getErrCode(), result.getErrMsg());
                        }
                    }
                }
            }
        }catch (Exception e){
            log.error("接收消失败请重试{}",e);
        }

    }
    /**
     *
     * 功能描述: 关闭接口
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2021/5/18 14:25
     */
    public void close(){
        try {
            if(messageSessionFactory!=null){
                messageSessionFactory.shutdown();
            }
        }catch (Exception e){
            log.error("关闭tubemq失败{}",e);
        }
    }

}
复制

com.hong.spring.dao.UserMapper

import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
 * @Auther: csh
 * @Date: 2020/8/18 15:04
 * @Description:用户dao层
 */

public interface UserMapper {

    /**
     *
     * 功能描述:查询总条数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:31
     */
    List<User> findAllUserList();
    /**
     *
     * 功能描述:获取总数
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int findAllTotal();
    /**
     *
     * 功能描述:更新
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/18 15:30
     */
    int update(User user);
    /**
     *
     * 功能描述:添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    int save(User user);
    /**
     *
     * 功能描述:批量添加
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 15:46
     */
    int insertBatch(@Param("list") List <User> list);
    /**
     *
     * 功能描述:通过id查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/19 18:39
     */
    User findById(int id);
    /**
     *
     * 功能描述:通过分页查询
     *
     * @param:
     * @return:
     * @auther: csh
     * @date: 2020/8/21 16:05
     */
    List<User> findByPage(UserAO ao);
}
复制

com.hong.spring.listener.UserListener

import com.alibaba.fastjson.JSONObject;
import com.hong.spring.api.IUserService;
import com.hong.spring.entity.User;
import com.hong.spring.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author: csh
 * @Date: 2021/3/16 11:14
 * @Description:用户监听
 */
@Log4j2
@Component
public class UserListener {

    @Autowired
    private IUserService userService;

    public void Listener(String str){
        log.info("获取的用户信息{}", str);
        User user = JSONObject.parseObject(str, User.class);
        DataResponse <Boolean> save = userService.save(user);
        if(save==null || save.getData()==null || !save.getData()){
            log.info("添加失败,原因{}",JSONObject.toJSONString(save));
        }
    }

}
复制

com/hong/spring/mapper/UserMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.hong.spring.dao.UserMapper">
    <resultMap type="com.hong.spring.entity.User" id="user">
        <id column="id" property="id" />
        <result column="user_name" property="username" />
        <result column="age" property="age" />
    </resultMap>

    <select id="findById" resultType="com.hong.spring.entity.User">
      SELECT * FROM user WHERE id = #{id,jdbcType=INTEGER}
    </select>

    <select id="findByPage" resultMap="user" parameterType="com.hong.spring.entity.ao.UserAO">
        select * from user where 1=1 limit #{page},#{pageSize}
    </select>

    <select id="findAllUserList" resultMap="user">
      SELECT * FROM user
    </select>

    <select id="findAllTotal" resultType="int">
      SELECT count(*) FROM user
    </select>

    <insert id="save" >
         INSERT INTO user ( user_name, age)
        VALUES (#{username,jdbcType=VARCHAR},
        #{age,jdbcType=INTEGER})
    </insert>

    <insert id="insertBatch">
        insert into user
        ( user_name, age)
        values
        <foreach collection="list" item="user" index="index"
                 separator=",">
            (#{user.username,jdbcType=VARCHAR},#{user.age,jdbcType=INTEGER})
        </foreach>
    </insert>

    <update id="update" >
        update user
        <set>
            <if test="username !=null">
                user_name=#{username,jdbcType=VARCHAR},
            </if>
            <if test="age !=null">
                age =#{age,jdbcType=INTEGER}
            </if>
        </set>
        where id = #{id,jdbcType=INTEGER}
    </update>
</mapper>
复制

com.hong.spring.provider.UserServiceImpl

import com.hong.spring.api.IUserService;
import com.hong.spring.dao.UserMapper;
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import com.hong.spring.utils.DataResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;


/**
 * @Auther: csh
 * @Date: 2020/8/18 15:16
 * @Description:用户实现
 */
@Service("userService")
public class UserServiceImpl implements IUserService {
    @Autowired
    private UserMapper userDao;

    @Override
    public DataResponse<List<User>> findByAll() {
        List <User> allUserList = userDao.findAllUserList();
        int allTotal = userDao.findAllTotal();
        return DataResponse.BuildSuccessResponse(allUserList,allTotal);
    }
    @Override
    @Transactional
    public DataResponse <Boolean> save(User user) {
        if(null==user){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int save = userDao.save(user);
        return DataResponse.BuildSuccessResponse(save>0?true:false);
    }

    @Override
    public DataResponse <Boolean> insertBatch(List <User> list) {
        if(null==list){
            return DataResponse.BuildFailResponse("参数不能为空!");
        }
        int batchSave = userDao.insertBatch(list);
        return DataResponse.BuildSuccessResponse(batchSave>0?true:false);
    }

    @Override
    @Transactional
    public DataResponse <Boolean> update(User user) {
        if(null==user || user.getId()==null){
            return DataResponse.BuildFailResponse("必传参数不能为空!");
        }
        int update = userDao.update(user);
        return DataResponse.BuildSuccessResponse(update>0?true:false);
    }
    @Override
    public DataResponse <User> findById(int i) {
        User byId = userDao.findById(i);
        return DataResponse.BuildSuccessResponse(byId);
    }

    @Override
    public DataResponse <List <User>> findByPage(UserAO ao) {
        if(ao==null){
            ao.setPage(0);
            ao.setPageSize(10);
        }else{
            ao.setPage(ao.getPageSize() * ao.getPage());
        }
        int allTotal = userDao.findAllTotal();
        List <User> byPage = userDao.findByPage(ao);
        return DataResponse.BuildSuccessResponse(byPage,allTotal);
    }
}
复制

相关配置文件

application.properties

logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR
复制

applicationContext.xml

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
      xmlns:mvc="http://www.springframework.org/schema/mvc"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">


   <!-- 配置组件扫描 -->
   <context:component-scan base-package="com.hong.spring"></context:component-scan>
   <!--加载配置文件-->
   <context:property-placeholder location="classpath:jdbc.properties,classpath:tubemq.properties"/>

   <!-- 开启注解 -->
   <context:annotation-config />
   <!--开启注解事务-->
   <tx:annotation-driven transaction-manager="transactionManager" />
   <!--放行静态资源-->
   <mvc:default-servlet-handler />


   <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
        id="internalResourceViewResolver">
      <!-- 前缀 -->
      <property name="prefix" value="/WEB-INF/pages/" />
      <!-- 后缀 -->
      <property name="suffix" value=".html" />
      <property name="contentType" value="text/html"/>

   </bean>

   <!--开启mvc注解事务-->
   <!-- 定义注解驱动 -->
   <mvc:annotation-driven>
      <mvc:message-converters>
         <!-- 设置支持中文 -->
         <bean class="org.springframework.http.converter.StringHttpMessageConverter">
            <property name="supportedMediaTypes">
               <list>
                  <value>text/plain;charset=UTF-8</value>
                  <value>text/html;charset=UTF-8</value>
               </list>
            </property>
         </bean>
         <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
      </mvc:message-converters>
   </mvc:annotation-driven>


   <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
      <!-- 基础配置 -->
      <property name="url" value="${jdbc.url}"></property>
      <property name="driverClassName" value="${jdbc.driver}"></property>
      <property name="username" value="${jdbc.user}"></property>
      <property name="password" value="${jdbc.password}"></property>

      <!-- 关键配置 -->
      <!-- 初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时 -->
      <property name="initialSize" value="3" />
      <!-- 最小连接池数量 -->
      <property name="minIdle" value="2" />
      <!-- 最大连接池数量 -->
      <property name="maxActive" value="15" />
      <!-- 配置获取连接等待超时的时间 -->
      <property name="maxWait" value="10000" />

      <!-- 性能配置 -->
      <!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
      <property name="poolPreparedStatements" value="true" />
      <property name="maxPoolPreparedStatementPerConnectionSize" value="20" />

      <!-- 其他配置 -->
      <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
      <property name="timeBetweenEvictionRunsMillis" value="60000" />
      <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
      <property name="minEvictableIdleTimeMillis" value="300000" />
      <!-- 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,
                  执行validationQuery检测连接是否有效。-->
      <property name="testWhileIdle" value="true" />
      <!-- 这里建议配置为TRUE,防止取到的连接不可用 ,申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。-->
      <property name="testOnBorrow" value="true" />
      <!-- 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能 -->
      <property name="testOnReturn" value="false" />
   </bean>

   <!--事务管理器-->
   <!-- sqlSessionFactory -->
   <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
      <!-- 加载 MyBatis 的配置文件 -->
      <property name="configLocation" value="classpath:mybatis.xml"/>
      <!-- 数据源 -->
      <property name="dataSource" ref="dataSource"/>
      <!-- 所有配置的mapper文件 -->
      <property name="mapperLocations" value="classpath*:com/hong/spring/mapper/*.xml" />
   </bean>

   <!-- Mapper 扫描器 -->
   <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
      <!-- 扫描 包下的组件 -->
      <property name="basePackage" value="com.hong.spring.dao" />
      <!-- 关联mapper扫描器 与 sqlsession管理器 -->
      <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
   </bean>
   <!--事务配置-->
   <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
      <property name="dataSource" ref="dataSource" />
   </bean>
</beans>
复制

jdbc.properties

config.properties:
#数据库驱动
jdbc.driver=com.mysql.jdbc.Driver
#数据库连接url
jdbc.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8
#数据库用户名
jdbc.user=root
#数据库密码
jdbc.password=123456
复制

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
        <RollingFile name="RollingFile" fileName="logs/app.log"
                     filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
            <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
            <SizeBasedTriggeringPolicy size="5 MB"/>
        </RollingFile>
    </appenders>
    <loggers>
        <root level="DEBUG">
            <appender-ref ref="Console"/>
            <appender-ref ref="RollingFile"/>
        </root>
    </loggers>
</configuration>
复制

logging.properties

org.apache.catalina.core.ContainerBase.[Catalina].level=INFO 
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
org.apache.jasper.servlet.TldScanner.level = FINE

handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler

############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################

org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.

java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
复制

mybatis.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
        PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>

    <!-- settings -->
    <settings>
        <!-- 打开延迟加载的开关 -->
        <setting name="lazyLoadingEnabled" value="true"/>
        <!-- 将积极加载改为消极加载(即按需加载) -->
        <setting name="aggressiveLazyLoading" value="false"/>
        <!-- 打开全局缓存开关(二级缓存)默认值就是 true -->
        <setting name="cacheEnabled" value="true"/>
        <!-- 开启驼峰命名转换 Table(create_time) -> Entity(createtime) -->
        <setting name="mapUnderscoreToCamelCase" value="true"/>
        <!-- 使用列别名代替列名 默认:true seslect name as title from table -->
        <setting name="useColumnLabel" value="true"/>
        <!--使用jdbc的getGeneratedKeys获取数据库自增主键值-->
        <setting name="useGeneratedKeys" value="true"/>
    </settings>

    <!-- 别名定义 -->
    <typeAliases>
        <package name="com.hong.spring.entity"/>
    </typeAliases>

</configuration>
复制

tubemq.properties

tubemq.url=localhost:8000
tubemq.topic_user=tubemq_spring_user
tubemq.group=tubemq_hong_consumer
复制

tubemq.xml

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">


   <!--tubemq配置-->
   <bean id="pull" class="com.hong.spring.config.TubeMqConsumer" init-method="init" destroy-method="close">
      <constructor-arg name="addrHost" value="${tubemq.url}" />
      <constructor-arg name="topic" value="${tubemq.topic_user}" />
      <constructor-arg name="group" value="${tubemq.group}" />
   </bean>
</beans>
复制

WEB-INF/web.xml

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
         version="3.1">
    <servlet>
        <servlet-name>spring_tubemq_consumer</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:applicationContext.xml,
                classpath:tubemq.xml
            </param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <servlet-mapping>
        <servlet-name>spring_tubemq_consumer</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
</web-app>
复制

spring_mq/spring_tubemq_consumer/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_mq</artifactId>
        <groupId>com.hong</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hong.tubemq</groupId>
    <artifactId>spring_tubemq_consumer</artifactId>

    <properties>
        <tubemq-client-version>0.8.0-incubating</tubemq-client-version>
    </properties>

    <dependencies>
        <dependency>
            <artifactId>spring_mq_common_api</artifactId>
            <version>1.0-SNAPSHOT</version>
            <groupId>com.hong</groupId>
        </dependency>


        <dependency>
            <groupId>org.apache.tubemq</groupId>
            <artifactId>tubemq-client</artifactId>
            <version>${tubemq-client-version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.71</version>
        </dependency>
    </dependencies>
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>6</source>
                    <target>6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
复制

结果

数据库结果

简单做了下整合,用是可以用,可以在这个基础上再继续完善相关的功能。

springboot 整合TubeMq

因为官网也没有相关的支持,做了也没有啥太大的意义,等后续有支持再统一整合....(感觉没跟上时代...)

最后

相对来说这个TubeMq没有之前tars那么恶心,相对简洁功能过得去,也文档比较简单,但是想了解下为啥,鹅产开源的项目总感觉要么很难整、要么感觉好久没维护,感觉遗弃的娃一样....,看着19年的相关宣传又很霸气,怎么总感觉有点虎头蛇尾,大家觉得呢?

关联标签: