南京做网站牛,全国企业信用信息公示系统黑龙江,百度免费网站申请,毕节建设厅网站一、前言前段时间公司预研了设备app端与服务端的交互方案#xff0c;出于多方面考量最终选用了阿里云的微服务队列MQTT方案#xff0c;基于此方案#xff0c;本人主要实践有#xff1a;1. 封装了RocketMQ实现MQTT订阅与发布的实现细节#xff1b;2. 实现了注解式分发处理出于多方面考量最终选用了阿里云的微服务队列MQTT方案基于此方案本人主要实践有 1. 封装了RocketMQ实现MQTT订阅与发布的实现细节 2. 实现了注解式分发处理可利用如MqttController, MqttTopicMapping等相关自定义注解的方式来统一订阅MQTT的Topic以及消息处理的分发 3. 使用了一套请求和响应的同步机制来达到PUB/SUB异步通信的伪同步调用。Github 地址点此链接二、RocketMQ的接入细节1. 为什么服务端要使用RocketMQ接入阿里云微消息队列MQTT是在以消息队列 RocketMQ 为核心存储的基础上实现更适合移动互联网和IoT领域的无状态网关两者之间具备天然的数据互通性。MQTT实例本身并不提供消息数据持久化功能消息数据持久化需要搭配后端的消息存储实例来使用。因此现阶段每一个阿里云MQTT实例都必须配套一个消息存储实例即RocketMQ实例来提供消息数据持久化功能因此他们之间可以说是消息互通的即可用RocketMQ订阅的方式来消费用MQTT协议发布的消息同理也可用 MQTT协议订阅的方式来消费RocketMQ发布的消息。帮助文档也给出了以下两种产品的区别说明微消息队列MQTT基于MQTT协议实现单个客户端的处理能力较弱。因此微消息队列MQTT适用于拥有大量在线客户端很多企业设备端过万甚至上百万但每个客户端消息较少的场景。 相比之下消息队列RocketMQ是面向服务端的消息引擎主要用于服务组件之间的解耦、异步通知、削峰填谷等服务器规模较小极少企业服务器规模过万但需要大量的消息处理吞吐量要求高。因此消息队列RocketMQ适用于服务端进行大批量的数据处理和分析的场景。基于以上区别官方也推荐在移动端设备上使用微消息队列MQTT而在服务端应用中则使用消息队列RocketMQ具体则可以通过 MQTT SDK 以公网访问方式来实现设备间的通信通过MQ SDK以内网方式来实现服务端通信。2. RocketMQ如何对接RocketMQ与MQTT在消息结构和一些属性字段上都有一定的映射关系具体内容(摘自帮助文档)如下。微消息队列MQTT使用MQTT协议接入而消息队列RocketMQ使用的是私有协议因此两者的关键概念存在如下映射关系。MQ与MQTT消息结构映射关系如上图所示MQTT协议中Topic是多级结构而消息队列RocketMQ的Topic 仅有一级因此MQTT中的一级Topic映射到消息队列RocketMQ的Topic而二级和三级Topic则映射到消息队列RocketMQ的消息属性Properties中。消息队列 RocketMQ 协议中的消息Message可以拥有自定义属性Properties而MQTT协议目前的版本不支持属性但为了方便对MQTT协议中的Header信息和设备信息进行溯源MQTT的部分信息将被映射到 RocketMQ的消息属性中方便使用消息队列RocketMQ的SDK接入的用户获取。目前微消息队列MQTT和消息队列RocketMQ支持的属性字段映射表如下图所示。使用消息队列RocketMQ的SDK的应用和使用消息队列MQTT的SDK的应用进行交互时可以通过读写这些属性字段来达到信息获取或者设置的目的。属性字段映射关系3. RocketMQ对MQTT消息订阅的实现Properties properties new Properties();
// 在控制台创建的Group ID
properties.put(PropertyKeyConst.GROUP_ID, xxx);
// 阿里云AccessKey
properties.put(PropertyKeyConst.AccessKey, xxx);
// 阿里云SecretKey
properties.put(PropertyKeyConst.SecretKey, xxx);
// 在RocketMQ控制台的实例基本信息中可查看到的TCP协议接入点
properties.put(PropertyKeyConst.NAMESRV_ADDR,xxx);
Consumer consumer ONSFactory.createConsumer(properties);
consumer.subscribe(topic, *, new MessageListener() { //订阅全部 Tagpublic Action consume(Message message, ConsumeContext context) {//获得mqtt消息中的第一级topicString mqttFirstTopic message.getTopic();//获得mqtt消息中除去1级后的所有topicString mqttSecondTopic message.getUserProperties(PropertyKeyConst.MqttSecondTopic);//获得mqtt消息中的messageIdString messageId message.getUserProperties(UNIQ_KEY);//获得mqtt消息中的消息体String messageBody new String(message.getBody());//...return Action.CommitMessage;}});
consumer.start();实现主要注意2点这边的 MQ 只需要订阅 MQTT 的一级 Topic 。如果 MQTT 会发布2个 Topic 的消息 robot/alarm 和 robot/task/test 则在此处只需要订阅 robot 这个第一级Topic即可。MQTT 的一些属性字段可以从 RocketMQ 消息 Message 的 userProperties 字段中获得比如上面代码中通过 message.getUserProperties(PropertyKeyConst.MqttSecondTopic); 可以获得 MQTT 中的 除去1级后的所有 Topic 字符串如上述举例的2个 Topic 可分别获得 /alarm 和 /task/test。 具体能够获得哪些字段可以参考上一节的属性字段映射表也可自行查看 PropertyKeyConst 类中定义的一些字符串常量来大概知晓。使用阿里云MQTT控制台发送一个MQTT消息如图所示MQTT控制台发送消在程序中加一个断点获得当前Message对象的字段如下Message消息体上图可看到userProperties中的一些值比如qoslevelmqttSecondTopic等这些字段都可以在PropertyKeyConst 类中找到对应的字符串常量但是UNIQ_KEYcleansessionflag等PropertyKeyConst 类中并没有对应的字符串常量这边暂时就message.getUserProperties(UNIQ_KEY)这样使用自定义字符量来获得。4. RocketMQ对MQTT消息发布的实现Properties properties new Properties();
// 在控制台创建的Group ID
properties.put(PropertyKeyConst.GROUP_ID, xxx);
// 阿里云AccessKey
properties.put(PropertyKeyConst.AccessKey, xxx);
// 阿里云SecretKey
properties.put(PropertyKeyConst.SecretKey, xxx);
// 在RocketMQ控制台的实例基本信息中可查看到的TCP协议接入点
properties.put(PropertyKeyConst.NAMESRV_ADDR,xxx);
//设置发送超时时间单位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, 3000);
Producer producer ONSFactory.createProducer(properties);
// 在发送消息前必须调用 start 方法来启动 Producer只需调用一次即可
producer.start();//发送一个mqtt消息
String parentTopic topic.substring(0, topic.indexOf(/));
String subTopic topic.substring(topic.indexOf(/));
Message msg new Message(parentTopic, , message.getBytes());
msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);
msg.putUserProperties(PropertyKeyConst.MqttQOS, qos);
msg.putUserProperties(cleansessionflag, cleanSessionFlag);
SendResult result producer.send(msg);该代码仅实现了普通消息的同步发送若需发送顺序消息、延时消息等可参考SDK帮助文档创建不同的Producer实现即可。上述代码将需要发送的MQTT全量Topic拆分成1级与2级1级Topic设置为MQ中的Topic参数2级Topic字符串则设为userProperties中PropertyKeyConst.MqttSecondTopic的其他属相如qoslevel和cleansessionflag等也是通过userProperties的相关字段来设置。三、注解式分发处理的实现1. 前置知识点1.1 BeanPostProcessorBeanPostProcessor是Spring IOC容器给我们提供的一个扩展接口。BeanPostProcessor接口定义了两个方法public interface BeanPostProcessor {// 前置处理Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException;// 后置处理Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;
}Spring中Bean的整个生命周期如图所示Bean生命周期postProcessBeforeInitialization()方法与postProcessAfterInitialization()分别对应图中前置处理和后置处理两个步骤将执行的方法。这两个方法中都传入了bean对象实例的引用为扩展容器的对象实例化过程提供了很大便利在这儿几乎可以对传入的实例执行任何操作。可以看到Spring容器通过BeanPostProcessor给了我们一个机会对Spring管理的bean进行再加工注解、AOP等功能的实现均大量使用了BeanPostProcessor。通过实现BeanPostProcessor的接口在其中处理方法中判断bean对象上是否有自定义的一些注解如果有则可以对这个bean实例继续进行其他操作这也是本例中使用该接口要实现的主要目的。1.2 ApplicationListener在IOC的容器的启动过程当所有的bean都已经处理完成之后spring ioc容器会有一个发布事件的动作。从 AbstractApplicationContext 的源码中就可以看出protected void finishRefresh() {// Initialize lifecycle processor for this context.initLifecycleProcessor();// Propagate refresh to lifecycle processor first.getLifecycleProcessor().onRefresh();// Publish the final event.publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.LiveBeansView.registerApplicationContext(this);
}因此当所有的bean都初始化完成并被成功装载后会触发ContextRefreshedEvent事件。ApplicationListener是spring中用来监听事件ApplicationEvent的传递每个实现了ApplicationListener接口的bean都会收到ApplicationEvent对象的通知每个ApplicationListener可根据事件类型只接收处理自己感兴趣的事件因此利用实现ApplicationListener的接口可以收到监听ContextRefreshedEvent动作然后可以写自己的一些处理逻辑比如初始化环境准备测试数据、加载一些数据到内存等等。用法如下Component
public class TestApplicationListener implements ApplicationListenerContextRefreshedEvent{Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//todo:一些处理逻辑}}1.3 反射Java反射机制是在运行状态中对于任意一个类都能够知道这个类的所有属性和方法对于任意一个对象都能够调用它的任意一个方法和属性这种动态获取的信息以及动态调用对象的方法的功能称为java语言的反射机制。在Java中Class类与java.lang.reflect类库一起对反射技术进行了全力的支持。获取Class对象有三种方式:通过实例对象获得Class? class object.getClass();通过类名获得Class? class ClassName.class;通过类名全路径获得Class? class Class.forName(类名全路径);反射包中常用的类主要有 Constructor表示的类的构造方法信息利用它可以在运行时动态创建对象 Field表示类的成员变量信息通过它可以在运行时动态修改成员变量的属性值(包含private) Method表示类的成员方法信息通过它可以动态调用对象的方法(包含private)下面说明一下本例中用到的一些反射api://获得Class对象
Class clazz obj.getClass();//判断注解B是否在此A上
boolean isAnnotation A.isAnnotationPresent(B.class);//获得该clazz上的注解对象
B bclazz.getAnnotation(B.class));//获得本类以及父类或者父接口中所有的公共方法
Method[] methodsclazz.getMethods();//获取方法上的所有参数
Parameter[] parameters method.getParameters();//执行某对象的方法,owner为该对象,paramValues为入参数组,method为Method对象
method.invoke(owner, paramValues);2. 整体实现思路自定义注解MqttControllerMqttTopicMappingMqttMessageIdMqttMessageBody利用BeanPostProcessor获得所有注解了MqttController的bean及其注解值获得其所有注解了MqttTopicMapping的Method方法及其注解值利用两者的注解值作为其key分别将bean,Method为value放入不同的map中记录所有注解了MqttController的注解值作为下一步需要订阅的Topic利用ApplicationListener在所有bean加载完成后使用实例化的mqConsumer来订阅所有需要订阅的Topic在mq订阅的处理方法中根据消息的全Topic在上述步骤的map中获得其对应的bean和Method同时根据MqttMessageIdMqttMessageBody来设置相关参数使用method.invoke(owner, paramValues);实现方法的调用来达到消息的处理分发。3. 实现细节3.1 自定义注解MqttController在类上使用其中parentTopic值为需要监听的1级Topic其中使用Component可以使其注解的类实例化为为Bean对象放入到Spring容器中基于此才能在利用BeanPostProcessor中获得其对象。Target(ElementType.TYPE)
Retention(RetentionPolicy.RUNTIME)
Documented
Component
public interface MqttController {/*** 监听的父topic** return 监听的父topic*/String parentTopic();
}MqttTopicMapping在方法上使用其中subTopic的值为需要订阅的子Topic与1级Topic共同组成MQTT的TopicTarget(ElementType.METHOD)
Retention(RetentionPolicy.RUNTIME)
Documented
public interface MqttTopicMapping {/*** 订阅的子topic默认可以只订阅1级topic** return 订阅的子topic*/String subTopic() default ;
}MqttMessageBody在方法参数上使用使得参数自动获得messageBody对象Target(ElementType.PARAMETER)
Retention(RetentionPolicy.RUNTIME)
Documented
public interface MqttMessageBody {}MqttMessageId在方法参数上使用使得参数自动获得messageId的值Target(ElementType.PARAMETER)
Retention(RetentionPolicy.RUNTIME)
Documented
public interface MqttMessageId {}自定义注解的使用示例如下Slf4j
MqttController(parentTopic robot1)
public class MqttRobot1 {MqttTopicMappingpublic void dealFirstTopic() {log.info(MqttRobot1.dealAlarm 收到消息啦只处理了一级topic);}MqttTopicMapping(subTopic alarm)public void dealAlarm(MqttMessageId String messageId, MqttMessageBody AlarmVo alarmVo) {log.info(MqttRobot1.dealAlarm 收到消息啦);log.info(messageId:{}, messageId);log.info(alarmVo:{}, alarmVo);}MqttTopicMapping(subTopic task)public void dealTask() {log.info(MqttRobot1.dealTask 收到消息啦);}
}3.2 提取Method和Bean对象在MqttHandlerFactory类中定义以下几个容器分别存储mqtt处理类的beanmqtt处理方法以及parentTopic列表/*** 用于存储mqtt处理类的bean,key为parentTopic/subTopic*/
private static MapString, Object mqttControllers new HashMap();/*** 用于存储mqtt处理方法,key为parentTopic/subTopic*/
private static MapString, Method mqttHandlers new HashMap();/*** 存储parentTopic列表*/
private static SetString parentTopicSet new HashSet();利用BeanPostProcessor接口来处理实现了自定义注解的Bean对象。具体代码及注释如下Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class beanClazz bean.getClass();//使用的MqttController注解的bean对象if (beanClazz.isAnnotationPresent(MqttController.class)) {//获得MqttController的注解值//存储parentTopic列表String parentTopic ((MqttController) beanClazz.getAnnotation(MqttController.class)).parentTopic();MqttHandlerFactory.getParentTopicSet().add(parentTopic);for (Method method : beanClazz.getMethods()) {//获得MqttTopicMapping的Method对象if (method.isAnnotationPresent(MqttTopicMapping.class)) {//获得MqttTopicMapping的注解值String subTopic method.getAnnotation(MqttTopicMapping.class).subTopic();String realTopic;if (.equals(subTopic)) {realTopic parentTopic /;} else {realTopic (parentTopic / subTopic /).replaceAll(/, /);}if (null ! MqttHandlerFactory.getMqttHandler(realTopic)) {throw new MqttBeansException(bean.getClass().getSimpleName() topic 重复定义,值为 realTopic);}//存储mqtt处理类的beanMqttHandlerFactory.registerMqttHandler(realTopic, method);//存储mqtt处理方法MqttHandlerFactory.registerMqttController(realTopic, bean);log.info(MqttHandler Mapped {} onto {}, realTopic, method.toString());}}}return bean;
}3.3 mq的消息订阅与处理实现ApplicationListener接口在所有Bean对象加载完之后根据前面记录的parentTopicSet作为所有需要订阅的1级Topic开始订阅。在订阅消息处理中从message信息可以获得其对应的1级Topic与2级Topic将其处理成MQTT的全Toic并从前面记录的mqttHandlersmqttControllers中获得对应的Method对象及Bean对象从message信息中提取对应的messageId级messageBody并设置为使用了MqttMessageBody和MqttMessageId的注解的参数中利用反射method.invoke(MqttHandlerFactory.getMqttController(mqttTopic), paramValues);来实现方法的调用。具体代码及注释如下Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//...//设置mqConsumer实例化所需propertiesConsumer mqConsumer ONSFactory.createConsumer(properties);SetString parentTopicSet MqttHandlerFactory.getParentTopicSet();if (parentTopicSet.size() 0) {log.warn(当前应用并未有任何topic订阅);}//根据parentTopic和subTopic订阅parentTopicSet.forEach(parentTopic - {log.info(Add a new rocketMq subscription,topic:{}, parentTopic);mqConsumer.subscribe(parentTopic, *, (message, context) - {log.debug(MqReceive Message: message);//获得topicString mqttFirstTopic message.getTopic();String mqttSecondTopic message.getUserProperties(PropertyKeyConst.MqttSecondTopic);if (null mqttSecondTopic) {//只有1级topic的情况mqttSecondTopic /;}if (!/.equals(mqttSecondTopic.substring(mqttSecondTopic.length() - 1))) {mqttSecondTopic /;}String mqttTopic mqttFirstTopic mqttSecondTopic;Method method MqttHandlerFactory.getMqttHandler(mqttTopic);if (null method) {log.warn(当前没有处理该topic的handler,topic:{}, mqttTopic);return Action.CommitMessage;} else {//获得mqtt的一些数据String messageId message.getUserProperties(UNIQ_KEY);String messageBody new String(message.getBody());//处理入参Parameter[] parameters method.getParameters();Object[] paramValues new Object[parameters.length];for (int i 0; i parameters.length; i) {if (parameters[i].isAnnotationPresent(MqttMessageId.class)) {//MqttMessageId注解的参数paramValues[i] messageId;} else if (parameters[i].isAnnotationPresent(MqttMessageBody.class)) {//MqttMessageBody注解的参数Class parameterClazz parameters[i].getType();try {paramValues[i] JSONObject.parseObject(messageBody, parameterClazz);} catch (Exception e) {log.error(mqttMessageBody 格式错误messageId:{},messageBody:{}, messageId, messageBody);// return Action.ReconsumeLater;return Action.CommitMessage;}} else {//自己定义的一些参数就给null把paramValues[i] null;}}try {method.invoke(MqttHandlerFactory.getMqttController(mqttTopic), paramValues);} catch (Exception e) {log.error(处理失败啦);}}return Action.CommitMessage;});});mqConsumer.start();log.info(MqConsumer Started);}四、MQTT同步调用的实现MQTT协议是基于PUB/SUB的异步通信模式不适用于服务端同步控制设备端返回结果的场景。通过制定一套请求和响应的同步机制可以无需改动MQTT协议来达到同步调用的目的。1. 整体实现思路MQTT的同步调用实际上是使用了两个异步调用完成的即生产者调用消费者的同时自己也作为消费者等待某一队列的返回消息消费者接受到生产者的消息同时也作为消息发送者发送一消息给生产者。具体同步调用机制示意如下:同步调用示意图首先服务端和设备端服务端都订阅了相关的Topic服务端发起同步调用即发布一个示意需同步返回的message到指定request Topic设备端接收到该message后处理完业务逻辑则会将调用结果发布一个返回message到request消息体中携带的response Topic中最后服务端接收到设备端返回的message可以从消息体中获得其调用结果。整个调用过程中客户端需要做的工作有订阅request Topic收到消息判断消息是否是同步消息是的话则处理完业务逻辑后异步发送特别的response Topic服务端需要做的工作有统一订阅特别的设备的response Topic发送消息到特别的request TopicFuture.get(timeout) 模式处理request和response的关系异步超时请求线程处理(处理超时请求30ms运行一次)消息监控和异常补救服务端处理异步为同步调用的逻辑借鉴了Dubbo底层将Netty的异步调用转化成同步的方式下面在实现细节中会具体阐述。2. 实现细节MQTT同步调用的代码如下public String publishSync(String topic, String qos, boolean cleanSessionFlag, Object data, int timeout)throws MqttRemoteException {String parentTopic topic.substring(0, topic.indexOf(/));String subTopic topic.substring(topic.indexOf(/));String mId UUID.randomUUID().toString().replaceAll(-, );MqttMessage mqttMessage new MqttMessage(mId, replyParentTopic / mId, data);Message msg new Message(parentTopic, , JSON.toJSONString(mqttMessage).getBytes());msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);msg.putUserProperties(PropertyKeyConst.MqttQOS, qos);msg.putUserProperties(cleansessionflag, cleanSessionFlag);MqttFuture mqttFuture new MqttFuture(mqttMessage, timeout);try {producer.send(msg);} catch (ONSClientException e) {mqttFuture.cancel();throw e;}return mqttFuture.get();
}前部分就是正常RocketMQ发布MQTT消息的代码但是发送的消息是自定义的MqttMessage同时这边在调用producer.send(msg);前先构建了一个MqttFuture对象然后发送完消息后使用mqttFuture.get();来获得同步调用的结果。MqttMessage定义如下其中syncFlag表示该消息是否需要同步返回,mId表示该消息的唯一id用于本地判断消息返回具体映射哪个同步调用的keyreplyTopic表示该消息需要返回消息的Topicdata则是具体业务数据。Data
public class MqttMessage implements Serializable {private static final long serialVersionUID 6648680154051903549L;/*** 是否需要同步返回默认为true*/private boolean syncFlag true;/*** 生成的id用uuid生成把*/private final String mId;/*** 客户端返回消息的Topic*/private final String replyTopic;/*** 发送数据*/private Object data;public MqttMessage(String mId, String replyTopic, Object data) {this.mId mId;this.replyTopic replyTopic;this.data data;}
}MqttFuture对象则是用来处理同步调用的逻辑每一个MqttFuture对象都有有一个mId作为唯一标识,发送的message消息体,调用返回结果MqttResponse包括正常mqtt返回或者超时等异常返回结果和同步调用超时时间还有一个锁及其创建的Condition用来处理线程的等待与通知后面会对其具体逻辑进行分析。每创建一个新的MqttFuture对象都会将其放入到存储MqttFuture的Map中key即为该消息的mId。Slf4j
Data
public class MqttFuture {public static final int DEFAULT_TIMEOUT 1000;public static final MapString, MqttFuture FUTURES new ConcurrentHashMap();private final Lock lock new ReentrantLock();private final Condition done lock.newCondition();/*** 唯一id*/private final String mId;/*** 发送的message消息体*/private final MqttMessage message;/*** 设置的同步调用超时时间*/private final int timeout;/*** 等待开始时间*/private final long start System.currentTimeMillis();/*** 返回结果*/private volatile MqttResponse response;public MqttFuture(MqttMessage message, int timeout) {this.message message;this.timeout timeout;this.mId message.getMId();FUTURES.put(mId, this);}//...调用返回结果MqttResponse定义如下如果是正常成功返回则返回状态是OK且会有对应的消息体messageResult如果超时或者其他异常情况则会返回对应的错误消息errorMessage。Data
public class MqttResponse {/*** ok状态正常返回result否则返回errorMessage*/public static final Integer OK 20000;/*** 客户端超时未处理*/public static final Integer TIMEOUT 40001;/*** 服务端主动取消*/public static final Integer CANCEL 40002;private Integer mStatus OK;/*** request生成的messageId*/private String mId;/*** 收到的消息体*/private String messageResult;/*** 状态不是成功返回的错误信息*/private String errorMessage;public MqttResponse(String mId) {this.mId mId;}
}在上述MQ同步调用代码中若调用producer.send(msg);同步发送mqtt消息失败的话则会调用mqttFuture.cancel();来取消该MqttFture对象代码如下这边主要是设置了异常的相应response避过将该MqttFuture对象从存储MqttFuture的Map中移除。Slf4j
Data
public class MqttFuture {//...public void cancel() {MqttResponse errorResult new MqttResponse(mId);errorResult.setMStatus(MqttResponse.CANCEL);errorResult.setErrorMessage(主动请求取消);response errorResult;FUTURES.remove(mId);}//...若调用producer.send(msg);同步发送mqtt消息成功的话则会调用mqttFuture.get();来获得其同步调用的结果具体代码如下首先判断是否调用已完成有响应结果mqttResponse包括正常获得返回结果或者超时等其他异常的情况,若完成的话则直接返回调用结果或者抛出相应的异常若没有调用完成则获得锁并循环判断是否调用完成没有的话则调用done.await(timeout, TimeUnit.MILLISECONDS);来实现该线程的超时等待直至其他线程调用该Condition的signal方法将其唤醒。Slf4j
Data
public class MqttFuture {//...public String get() throws MqttRemoteException {return this.get(timeout);}public String get(int timeout) throws MqttRemoteException {if (timeout 0) {timeout DEFAULT_TIMEOUT;}if (!isDone()) {long start System.currentTimeMillis();lock.lock();try {while (!isDone()) {done.await(timeout, TimeUnit.MILLISECONDS);if (isDone() || System.currentTimeMillis() - start timeout) {break;}}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}if (!isDone()) {throw new MqttTimeoutException(Waiting client-side response timeout);}}if (response null) {throw new IllegalStateException(response cannot be null);}if (response.getMStatus().equals(MqttResponse.OK)) {return response.getMessageResult();}if (response.getMStatus().equals(MqttResponse.TIMEOUT)) {throw new MqttTimeoutException(Waiting client-side response timeout);}throw new MqttRemoteException(response.getErrorMessage());}/*** 判断是否有response结果** return 是否返回结果*/public boolean isDone() {return response ! null;}//...将上述等待唤醒的代码如下,只需调用下面的received方法并传入相应的MqttResponse结果即可该方法会在后面讲的判断调用超时和正常mqtt消息结果返回的情况中调用。Slf4j
Data
public class MqttFuture {//...private void doReceived(MqttResponse res) {lock.lock();try {response res;done.signal();} finally {lock.unlock();}}public static void received(MqttResponse response) {MqttFuture future FUTURES.remove(response.getMId());if (future ! null) {future.doReceived(response);} else {log.warn(The timeout response finally returned at (new SimpleDateFormat(yyyy-MM-dd HH:mm:ss.SSS).format(new Date())) , response response);}}//...后台会开启一个线程来扫描超时任务主要是遍历上述存储MqttFuture的Map如果对应的MqttFuture并没有完成获得对应的response且调用时间已超过设置的超时时间则设置一个超时异常的MqttResponse并调用MqttFuture.received(timeoutResponse);来唤醒上述代码中的done.await(timeout, TimeUnit.MILLISECONDS);的线程等待。该线程每30s会遍历一个存储MqttFuture的Map。Slf4j
Data
public class MqttFuture {//...private static class RemotingInvocationTimeoutScan implements Runnable {Overridepublic void run() {while (true) {try {for (MqttFuture future : FUTURES.values()) {if (future null || future.isDone()) {continue;}if (System.currentTimeMillis() - future.getStartTimestamp() future.getTimeout()) {//当前mqtt请求已超时MqttResponse timeoutResponse new MqttResponse(future.getMId());timeoutResponse.setMStatus(MqttResponse.TIMEOUT);MqttFuture.received(timeoutResponse);}}//每30ms扫一次Thread.sleep(30);} catch (Throwable e) {log.error(Exception when scan the timeout invocation of remoting., e);}}}}static {Thread th new Thread(new RemotingInvocationTimeoutScan(), MqttResponseTimeoutScanTimer);th.setDaemon(true);th.start();}//...为了处理正常的mqtt的消息返回除了使用上一节中讲到的自定义注解MqttController来订阅相关的Topic还需要订阅特殊的一个Topic来处理mqtt同步调用的返回消息订阅代码如下,根据收到的mId和messageBody设置对应的MqttResponse并调用MqttFuture.received(response);来唤醒上述代码中的done.await(timeout, TimeUnit.MILLISECONDS);的线程等待。//订阅伪同步请求回复
log.info(Add a new rocketMq subscription,topic:{}, replyParentTopic);
mqConsumer.subscribe(replyParentTopic, *, (message, context) - {String mqttSecondTopic message.getUserProperties(PropertyKeyConst.MqttSecondTopic);if (null mqttSecondTopic) {mqttSecondTopic ;}String mId mqttSecondTopic.replaceAll(/, );String messageBody new String(message.getBody());MqttResponse response new MqttResponse(mId);response.setMessageResult(messageBody);MqttFuture.received(response);return Action.CommitMessage;
});五、demo使用1. 项目结构由于使用了springboot框架来实现该demo所以项目结构如下demo项目结其中mqtt工具包目录如下mqtt工具包结构2. mqtt工具包的使用2.1 在yml配置中添加相关配置配置示例如下其中xxx改为自己使用的即可ali:mqtt:accessKey: xxxsecretKey: xxxgroupId: xxxnamesrvAddr: xxxsendMsgTimeoutMillis: 3000#消费者线程固定位50个consumeThreadNums: 50
# 用于同步调用返回发送的topicreplyParentTopic: xxx2.2 添加工具包中的MqttConfigImport({ MqttConfig.class})
Configuration
public class MqttConfigure {}2.3 自定义注解的使用Slf4j
MqttController(parentTopic robot1)
public class MqttRobot1 {MqttTopicMappingpublic void dealFirstTopic() {log.info(MqttRobot1.dealAlarm 收到消息啦只处理了一级topic);}MqttTopicMapping(subTopic alarm)public void dealAlarm(MqttMessageId String messageId, MqttMessageBody AlarmVo alarmVo) {log.info(MqttRobot1.dealAlarm 收到消息啦);log.info(messageId:{}, messageId);log.info(alarmVo:{}, alarmVo);}MqttTopicMapping(subTopic task)public void dealTask() {log.info(MqttRobot1.dealTask 收到消息啦);}
}2.4 测试同步调用模拟MQTT客户端消息返回代码mqtt客户端实现代码示例参考阿里云官方demo https://github.com/AliwareMQ/lmq-demo 其中xxx的地方都改成自己的即可下面代码中mqttClient2.publish(replyTopic, message);即将结果发送到replyTopic中。public class MqttClientTest {public static void main(String[] args) throws Exception {String instanceId xxx;String endPoint xxx;String accessKey xxx;String secretKey xxx;String clientId xxx;final String parentTopic xxx;//这边需自定义mqtt客户端topic,final String mq4IotTopic parentTopic / xxx /xxx;final int qosLevel 0;ConnectionOptionWrapper connectionOptionWrapper new ConnectionOptionWrapper(instanceId, accessKey, secretKey,clientId);final MemoryPersistence memoryPersistence new MemoryPersistence();final MqttClient mqttClient new MqttClient(tcp:// endPoint :1883, clientId, memoryPersistence);final MqttClient mqttClient2 new MqttClient(tcp:// endPoint :1883, clientId, memoryPersistence);/*** 客户端设置好发送超时时间防止无限阻塞*/mqttClient.setTimeToWait(5000);final ExecutorService executorService new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueueRunnable());mqttClient.setCallback(new MqttCallbackExtended() {Overridepublic void connectComplete(boolean reconnect, String serverURI) {/*** 客户端连接成功后就需要尽快订阅需要的 topic*/System.out.println(connect success);executorService.submit(new Runnable() {Overridepublic void run() {try {final String topicFilter[] { mq4IotTopic };final int[] qos { qosLevel };mqttClient.subscribe(topicFilter, qos);} catch (MqttException e) {e.printStackTrace();}}});}Overridepublic void connectionLost(Throwable throwable) {throwable.printStackTrace();}Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {System.out.println(receive msg from topic s , body is new String(mqttMessage.getPayload()));JSONObject jsonObject JSON.parseObject(new String(mqttMessage.getPayload()));String mId jsonObject.getString(mId);String replyTopic jsonObject.getString(replyTopic);String result mId 回复啦;MqttMessage message new MqttMessage(result.getBytes());message.setQos(qosLevel);//这边会将结果发送到replyTopic中mqttClient2.publish(replyTopic, message);System.out.println(发送啦);}Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println(send msg succeed topic is : iMqttDeliveryToken.getTopics()[0]);}});mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());mqttClient2.connect(connectionOptionWrapper.getMqttConnectOptions());Thread.sleep(Long.MAX_VALUE);}
}3. demo体验启动项目可以在控制台看到有如下日志MQTT处理Topic映射日志MQTT订阅Topic日志从日志中可以看出程序自动处理了自定义注解的Mqtt消息处理的映射并根据mqtt的1级Topic进行了RocketMQ的相关订阅。提供了一个测试MQTT消息简单发送接口如下GetMapping(/publish)
public String doPublish(RequestParam(topic) String topic, RequestParam(message) String message) {try {return mqttClient.publish(topic, message);} catch (ONSClientException e) {return e.getMessage();}
}使用 http://localhost:8080/mqtt/publish?topicrobot1/alarmmessage{id:1,code:heheh}来调用该接口。可以在控制台看到如下日志打印MQTT订阅的消息日志由于本demo中使用自定义注解订阅了该Topic所以调用该接口发送消息之后也会被本demo成功接收并分发到对应的处理函数中因此调用了该接口后可以在控制台看到如上日志打印可以看到MQTT消息成功发布订阅到该消息并实现了MQTT消息的处理分发。提供了一个测试MQTT同步调用的接口如下GetMapping(/publish-sync)public String publishSync(RequestParam(topic) String topic, RequestParam(message) String message) {try {return mqttClient.publishSync(topic, message, 5000);} catch (MqttRemoteException e) {return e.getMessage();}
}使用http://localhost:8080/mqtt/publish-sync?topicrobot1/alarm/GID_ROBOTDEVICEID_001messagehehehe来调用接口。可以看到下图的结果MQTT同步调用超时这是MQTT同步调用超时的情况因为此时还没有开启对应的MQTT客户端因此发送的MQTT消息并没有客户端进行回应所以出现了调用超时的情况如果运行上述模拟MQTT客户端消息返回的代码后再次调用该接口可以看到同步调用成功返回了对应的结果如图所示。MQTT同步调用成功返回源码地址如下仅供学习参考DavidDingXu/panda-mqttgithub.com六、参考阿里云微消息队列 MQTT帮助文档阿里云消息队列 RocketMQ帮助文档本文原创欢迎转载转载请注明出处如有不正确的地方恳请各位看官指正。