当前位置: 首页 > news >正文

渭南市网站建设_网站建设公司_内容更新_seo优化

购物网站建设教程,建筑网校排名前十的品牌,网站模板织梦免费,合肥网站制作哪儿好薇java JMS技术 .1. 什么是JMS JMS即Java消息服务#xff08;Java Message Service#xff09;应用程序接口是一个Java平台中关于面向消息中间件#xff08;MOM#xff09;的API#xff0c;用于在两个应用程序之间#xff0c;或分布式系统中发送消息#xff0c;进行异步…java JMS技术 .1.   什么是JMS          JMS即Java消息服务Java Message Service应用程序接口是一个Java平台中关于面向消息中间件MOM的API用于在两个应用程序之间或分布式系统中发送消息进行异步通信。Java消息服务是一个与具体平台无关的API绝大多数MOM提供商都对JMS提供支持。          JMS是一种与厂商无关的 API用来访问消息收发系统消息。它类似于JDBC(Java Database Connectivity)这里JDBC 是可以用来访问许多不同关系数据库的 API而 JMS 则提供同样与厂商无关的访问方法以访问消息收发服务。许多厂商都支持 JMS包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ这只是几个例子。 JMS 使您能够通过消息收发服务有时称为消息中介程序或路由器从一个 JMS 客户机向另一个 JMS客户机发送消息。消息是 JMS 中的一种类型对象由两部分组成报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分可以将消息分为几种类型它们分别携带简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage)还有无有效负载的消息 (Message)。 .2.   JMS规范 .2.1.    专业技术规范 JMSJava Messaging Service是Java平台上有关面向消息中间件(MOM)的技术规范它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发翻译为Java消息服务。 .2.2.    体系架构 JMS由以下元素组成。 JMS提供者provider连接面向消息中间件的JMS接口的一个实现。提供者可以是Java平台的JMS实现也可以是非Java平台的面向消息中间件的适配器。 JMS客户生产或消费基于消息的Java的应用程序或对象。 JMS生产者创建并发送消息的JMS客户。 JMS消费者接收消息的JMS客户。 JMS消息包括可以在JMS客户之间传递的数据的对象 JMS队列一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读该消息将被从队列中移走。 JMS主题一种支持发送消息给多个订阅者的机制。 .2.3.    Java消息服务应用程序结构支持两种模型 1、  点对点或队列模型 在点对点或队列模型下一个生产者向一个特定的队列发布消息一个消费者从该队列中读取消息。这里生产者知道消费者的队列并直接将消息发送到消费者的队列。 这种模式被概括为 只有一个消费者将获得消息 生产者不需要在接收者消费该消息期间处于运行状态接收者也同样不需要在消息发送时处于运行状态。 每一个成功处理的消息都由接收者签收 2、发布者/订阅者模型 发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。   这种模式被概括为 多个消费者可以获得消息 在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅subscription以便客户能够订阅。订阅者必须保持持续的活动状态以接收消息除非订阅者建立了持久的订阅。在那种情况下在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。   1.下载ActiveMQ 去官方网站下载http://activemq.apache.org/ 2.运行ActiveMQ 解压缩apache-activemq-5.5.1-bin.zip 修改配置文件activeMQ.xml将0.0.0.0修改为localhost 默认的activeMQ.xml文件如下: 修改后: transportConnectorstransportConnector nameopenwire uritcp://localhost:61616/transportConnector namessl urissl://localhost:61617/transportConnector namestomp uristomp://localhost:61613/transportConnector urihttp://localhost:8081/transportConnector uriudp://localhost:61618/ /transportConnectors 然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。 访问的时候如果需要用户名和密码 都是admin admin... 启动topic的相关的生产者和消费者: 生产者代码: ProducerTest.java import java.util.Random;import javax.jms.JMSException; public class ProducerTest { /** * param args */ public static void main(String[] args) throws JMSException, Exception { ProducerTool producer new ProducerTool(); Random random new Random();for(int i0;i20;i){Thread.sleep(random.nextInt(10)*1000);producer.produceMessage(Hello, world!--i); producer.close();}} }  ProducerTool.java import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ProducerTool { private String user ActiveMQConnection.DEFAULT_USER; private String password ActiveMQConnection.DEFAULT_PASSWORD; private String url ActiveMQConnection.DEFAULT_BROKER_URL; private String subject mytopic; private Destination destination null; private Connection connection null; private Session session null; private MessageProducer producer null;// 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory new ActiveMQConnectionFactory( user, password, url); connection connectionFactory.createConnection(); session connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination session.createTopic(subject); producer session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); }// 发送消息 public void produceMessage(String message) throws JMSException, Exception { initialize(); TextMessage msg session.createTextMessage(message); connection.start(); System.out.println(Producer:-Sending message: message); producer.send(msg); System.out.println(Producer:-Message sent complete!); }// 关闭连接 public void close() throws JMSException { System.out.println(Producer:-Closing connection); if (producer ! null){producer.close(); } if (session ! null){session.close(); } if (connection ! null){connection.close(); } } } 消费者代码: ConsumerTest.java import javax.jms.JMSException;public class ConsumerTest implements Runnable {static Thread t1 null;/*** param args* throws InterruptedException* throws InterruptedException* throws JMSException* throws InterruptedException*/public static void main(String[] args) throws InterruptedException {t1 new Thread(new ConsumerTest());t1.setDaemon(false);t1.start();/*** 如果发生异常则重启consumer*//*while (true) {System.out.println(t1.isAlive());if (!t1.isAlive()) {t1 new Thread(new ConsumerTest());t1.start();System.out.println(重新启动);}Thread.sleep(5000);}*/// 延时500毫秒之后停止接受消息// Thread.sleep(500);// consumer.close();}public void run() {try {ConsumerTool consumer new ConsumerTool();consumer.consumeMessage();while (ConsumerTool.isconnection) { }} catch (Exception e) {}} } ConsumerTool.java import javax.jms.Connection; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.MessageListener; import javax.jms.Message; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /*** 消费者的模板 * author ABC**/ public class ConsumerTool implements MessageListener,ExceptionListener { private String user ActiveMQConnection.DEFAULT_USER; private String password ActiveMQConnection.DEFAULT_PASSWORD; private String url ActiveMQConnection.DEFAULT_BROKER_URL; private String subject mytopic; private Destination destination null; private Connection connection null; private Session session null; private MessageConsumer consumer null; public static Boolean isconnectionfalse;// 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory new ActiveMQConnectionFactory( user, password, url); connection connectionFactory.createConnection(); session connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination session.createTopic(subject); consumer session.createConsumer(destination); } // 消费消息 public void consumeMessage() throws JMSException, Exception { initialize(); connection.start();consumer.setMessageListener(this); //注册一个消息监听器,有消息就执行onMessage()方法connection.setExceptionListener(this);//注册一个异常监听器,有异常就执行onException()方法isconnectiontrue;System.out.println(Consumer:-Begin listening...); // 开始监听 // Message message consumer.receive(); }// 关闭连接 public void close() throws JMSException { System.out.println(Consumer:-Closing connection); if (consumer ! null) consumer.close(); if (session ! null) session.close(); if (connection ! null) connection.close(); }// 消息处理函数 public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage txtMsg (TextMessage) message; String msg txtMsg.getText(); System.out.println(Consumer:-Received: msg); } else { System.out.println(Consumer:-Received: message); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }public void onException(JMSException arg0) {isconnectionfalse;//出现异常把isconnection设置成false} } 只启动ProducerTest.java 如果这个时候把ActiveMq 关闭再开启....重新访问   之前的主题 mytopic产生的数据就没有了..... ActiveMq默认是没有做持久化的,如果是Kafka只要是发过去的消息,都会一直存在,也可以设置一个过期的时间.到了期限,那些消息也是可以清除掉.否则就会一直都在. ActiveMq一般是用在JavaEE中的....Kafka是用在大数据领域的. 再运行生产者的模板代码: ConsumerTest.java 生产者生产的数据: 再运行生产者的模板代码: ConsumerTest.java 生产者生产的数据: 消费者消费到数据:   看WEBUI   其他常用的JMS实现 要使用Java消息服务你必须要有一个JMS提供者管理会话和队列。既有开源的提供者也有专有的提供者。 开源的提供者包括 Apache ActiveMQ JBoss 社区所研发的 HornetQ Joram Coridan的MantaRay The OpenJMS Group的OpenJMS 专有的提供者包括 BEA的BEA WebLogic Server JMS TIBCO Software的EMS GigaSpaces Technologies的GigaSpaces Softwired 2006的iBus IONA Technologies的IONA JMS SeeBeyond的IQManager2005年8月被Sun Microsystems并购 webMethods的JMS - my-channels的Nirvana Sonic Software的SonicMQ SwiftMQ的SwiftMQ IBM的WebSphere MQ   附关于ActiveMq处理queue的模板代码: ProducerTest.java import java.util.Random;import javax.jms.JMSException; public class ProducerTest { /** * param args * throws Exception * throws JMSException */ public static void main(String[] args) throws JMSException, Exception{ ProducerTool producer new ProducerTool();Random random new Random();for(int i0;i20;i){Thread.sleep(random.nextInt(10)*1000);producer.produceMessage(Hello, world!--i); producer.close();}} } ProducerTool.java import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ProducerTool { private String user ActiveMQConnection.DEFAULT_USER; private String password ActiveMQConnection.DEFAULT_PASSWORD; private String url ActiveMQConnection.DEFAULT_BROKER_URL; private String subject myqueue; private Destination destination null; private Connection connection null; private Session session null; private MessageProducer producer null;// 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory new ActiveMQConnectionFactory( user, password, url); connection connectionFactory.createConnection(); session connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination session.createQueue(subject); producer session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); }// 发送消息 public void produceMessage(String message) throws JMSException, Exception { initialize(); TextMessage msg session.createTextMessage(message); connection.start(); System.out.println(Producer:-Sending message: message); producer.send(msg); System.out.println(Producer:-Message sent complete!); }// 关闭连接 public void close() throws JMSException { System.out.println(Producer:-Closing connection); if (producer ! null){producer.close(); } if (session ! null){session.close(); } if (connection ! null){connection.close(); } } } CustomerTest.java public class ConsumerTest implements Runnable {static Thread t1 null;public static void main(String[] args) throws InterruptedException {t1 new Thread(new ConsumerTest());t1.start(); // while (true) { // System.out.println(t1.isAlive()); // if (!t1.isAlive()) { // t1 new Thread(new ConsumerTest()); // t1.start(); // System.out.println(重新启动); // } // Thread.sleep(5000); // }// 延时500毫秒之后停止接受消息// Thread.sleep(500);// consumer.close();}public void run() {try {ConsumerTool consumer new ConsumerTool();consumer.consumeMessage();while (ConsumerTool.isconnection) { //System.out.println(123);}} catch (Exception e) {}} } CustomerTool.java import javax.jms.Connection; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.MessageListener; import javax.jms.Message; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class ConsumerTool implements MessageListener,ExceptionListener {private String user ActiveMQConnection.DEFAULT_USER;private String password ActiveMQConnection.DEFAULT_PASSWORD;private String url ActiveMQConnection.DEFAULT_BROKER_URL;private String subject myqueue;private Destination destination null;private Connection connection null;private Session session null;private MessageConsumer consumer null;private ActiveMQConnectionFactory connectionFactorynull;public static Boolean isconnectionfalse;// 初始化private void initialize() throws JMSException {connectionFactory new ActiveMQConnectionFactory(user, password, url);connection connectionFactory.createConnection();session connection.createSession(false, Session.AUTO_ACKNOWLEDGE);destination session.createQueue(subject);consumer session.createConsumer(destination);}// 消费消息public void consumeMessage() throws JMSException {initialize();connection.start();consumer.setMessageListener(this);connection.setExceptionListener(this);System.out.println(Consumer:-Begin listening...);isconnectiontrue;// 开始监听Message message consumer.receive();System.out.println(message.getJMSMessageID());}// 关闭连接public void close() throws JMSException {System.out.println(Consumer:-Closing connection);if (consumer ! null){consumer.close();}if (session ! null){session.close();}if (connection ! null){connection.close();}}// 消息处理函数public void onMessage(Message message) {try {if (message instanceof TextMessage) {TextMessage txtMsg (TextMessage) message;String msg txtMsg.getText();System.out.println(Consumer:-Received: msg);} else {System.out.println(Consumer:-Received: message);}} catch (JMSException e) {e.printStackTrace();}}public void onException(JMSException arg0){isconnectionfalse;} }
http://www.ihoyoo.com/news/118711.html

相关文章:

  • 汕头网站网站建设智联人才招聘网
  • 外贸网站建设制作phpcms 调用网站名称
  • 网站合同书百度免费域名注册网站
  • 网站备案ps什么网站需要备案
  • 做企业网站 需要用服务器吗建设四川网站
  • 兰州网站公司杭州市建设工程造价管理协会网站
  • 重庆企业网站优化不会编程怎么做网站
  • 湘潭网站优化wordpress 插件 教程
  • 个人网站建设教学视频国内知名展示设计公司
  • 万网 网站广州市手机网站建设品牌
  • 备案网站负责人必须为法人吗百度收录网站名字
  • 怎么样建立网站方案免费搭建自助网站
  • wordpress主题漏洞莱芜网站优化加徽信xiala5
  • 未来中森网站建设公司贵州建设厅二建考试网站
  • 公司网站seo怎么做天元建设集团有限公司市值
  • 罗湖商城网站建设哪家公司便宜点手机软件怎么做出来的
  • 才做的网站怎么搜不到网站建设基本流程详细说明
  • 网站建设互联wordpress 重制密码
  • 全景网站开发待遇建设通网站电话
  • icp备案网站建设方案书苏州公司注册代理
  • 网站制作厂家wordpress自定义文章排列顺序
  • 深圳做网站佰达科技三十软件开发需要什么学历
  • 太原网站设计公司网站运维
  • 企业库长沙seo排名优化公司
  • 设计院门户网站建设方案资讯网站模板带会员投稿功能
  • 网站建设公司比较信息技术初二做网站
  • 请人做竞价网站的要求重点山东网站建设市场
  • 深圳知名网站设计公司排名2022互联网企业排名
  • 顺德企业手机网站建设开发网站建设设计公司
  • 本地做网站绑定域名北京计算机培训机构哪个最好