加载中…
正文 字体大小:

JAVA消息服务(JMS):示范工程----需求、API分析、解说

(2012-02-17 23:03:39)
标签:

jms

java

消息服务

生产者

消费者

消息

会话

连接

activemq

rmi

socket

分类: JAVA高级软件工程师

JAVA消息服务(JMS):示范工程----需求、API分析、解说
图:集成商订单处理系统的示例图

导言:

一、业务需求--Transactions Demo:
    The example simulates a simplified eCommerce application with four parts:
the retailer who places the orders, the vendor who assemples the computers,
and two suppliers--one for hard drives and another for monitors.

The retailer sends a message to the vendor's queue and awaits a reply.
The vendor receives the message and sends a message to each of the
supplier's queues. It does this in a single transaction, and will randomly
throw an exception simulating a database error, triggering a rollback.
Each supplier receives the order, checks inventory and replies to the
message stating how many items were sent.
The vendor collects both responses and responds to the retailer, notifying
wheather it cna fulfill the complete order or not.
The retailer receives the message from the vendor.

    事务示范工程

   本例是一个包含四个部分的简化的电子商务应用:零售商下订单,集成商组装电脑,和两个供应商(提供组件),一个提供硬盘设备,另一个提供显示器。1)零售商发送消息到集成商的队列并等待回复;2)集成商接收订单消息,3)并分别发送消息到两个供应商的队列。它是一个单一的事务,会模仿数据库错误随机的抛出一个异常,并且触发事务的回滚操作。4)每一个供应商接收订单消息,5)检查库存,6)并且分别回复集成商他们的库存信息;7)集成商收集两个供应商的响应信息,8)响应并告知零售商是否能完成订单;9)零售商接收来自集成商的消息。

二、解说:

     1.在一个由四个对象组成的消息系统中,业务逻辑比较明确。零售商、供应商(两个)都只与另一个对象的发送消息的交互,即零售商只与集成商交互,发送订单到集成商并接受集成商的反馈消息;供应商只与集成商交互,接受集成商的子订单消息并以库存数据响应集成商。所有,零售商和供应商处理的业务还比较简单。

    2.但集成商要坐的事情就很多,要与三个不同对象发送消息的交互(零售商、供应商(A和B)),即六方向的数据交互。除简单收发信息外,还需要收集并处理两个供应商的组件的库存信息,做出是否接收订单的判断。因此,整个系统的难点在于集成商。

    3.本文的【集成商订单处理系统的示例图】,只是一次性的情况,即零售商只下一次订单后的消息流向。但真实的系统,是零售商有许多订单,订单在不同时间被下达,而零售商、集成商、供应商也是交替着处理不同的订单。所以,本例子中使用了多线程,并考虑了安全问题。

    4.该系统是一个分布式的系统。零售商、集成商和供应商之间可以独立运行在不同的服务器上;它们之间的运行没有先后循序。比如,通常是消息生产者生成消息,然后消费者消费消息。但在JAVA消息服务中,消费者在消息生成之前,可以先建立监听;后者消费者没有建立监听,生产者的消息会发送到JMS的服务器上,当建立了监听后,消息仍然能到达。

三、代码(详细注释):

本文中,将首先解决单次订单下达后的运行问题,再扩展到多个订单的交互处理。

运行结果:

1329667034718:Retailer2:sent order message!messageName: orderMessage ,quantity:99 1
1329667034765:Vendor2: send sub order to Supplier1:99 2.1
1329667034781:Vendor2: send sub order to Supplier2:99 2.2
1329667034781:Vendor2:received message from Retailer2:99
1329667036031 :Supplier2: monitor received sub order. quantity: 99 3.1
1329667036125 :Supplier2: hard received sub order. quantity: 99  3.2
1329667036125 :Vendor2: received Supplier M's stocks message: 98 5.1
1329667036125 :Vendor2: received Supplier H's stocks message: 98 5.2
==========Vendor2: 1329667036125: ordered quantity: 99 ;supplierMonitorStock: 98 ;supplierHardStock: 98 ;accept order?: false ok 6.
1329667036140:Supplier2: monitor :send stocks message back to Vendor2. stockNumber:98  4.1
============1329667036140 : Retailer2: final message about order from Vendor2: cancel 7.
1329667036171:Supplier2: hard :send stocks message back to Vendor2. stockNumber:98  4.2

以下是本人实现该系统需求的方式:

零售商Retailer2--->集成商Vendor2---->供应商Supplier2--->运行VendorOrderSystemDemo

1.零售商Retailer2

import java.util.Date;
import java.util.Random;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
//零售商:发送订单消息到指定队列;接收集成商的回复消息

 public class Retailer2  {
 private String url;
 private String user;
 private String password;
    public Retailer2(String url, String user, String password) {
  this.url = url;
  this.user = user;
  this.password = password;
  publish();
 }
   
    //发布消息
 public void publish(){
  int quantity=0;
   ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory(user,password,url);//1创建连接工厂
   Session session=null;
   Connection connection=null;
   try {
   connection=connectionFactory.createConnection();//2创建连接
   session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//3创建(无事务)会话
  Destination retailerVendorQueue= session.createQueue("RetailerVendorQueue222");//4.1创建特定队列
  MessageProducer producer=session.createProducer(retailerVendorQueue);//5创建生产者 
   producer.setDeliveryMode(DeliveryMode.PERSISTENT);//持久模式
   connection.start();
    //1.发布订单
       TextMessage message=session.createTextMessage();
        quantity = new Random().nextInt(100);
     message.setText(quantity+"");
      producer.send(message);//7发送消息
    System.out.println(new Date().getTime()+":Retailer2:sent order message!messageName: orderMessage"+" ,quantity:"+quantity);
    Destination RetailerVendorQueueFinal= session.createQueue("RetailerVendorQueueFinal");//4.1创建与集成商联系的特定队列
    //设置消息监听,并接收从集成商那里的消息

    MessageConsumer consumer=session.createConsumer(RetailerVendorQueueFinal);
     consumer.setMessageListener(new MessageListener() {
     public void onMessage(Message message) {
      if (message != null && message instanceof TextMessage) {
        try {
        String fMesage=((TextMessage)(message)).getText();
        System.out.println("============Retailer2:"+new Date().getTime()+" : final message about order from Vendor2: "+fMesage);
       } catch (JMSException e1) {
        e1.printStackTrace();
       }
       try {
       } catch (Exception e) {
        e.printStackTrace();
       }
      }
     }
    });
    //connection.start();
   
   } catch (JMSException e) {
    e.printStackTrace();
   }
    }
 
 public static void main(String [] args){
  Retailer2 retailer=new Retailer2("tcp://localhost:61616","brightmart","12345");
 }
}

2.集成商

import java.util.Date;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

//集成商收取零售商订单,发送分别发送通知到两个供应商,处理信息并响应零售商
public class Vendor2 implements Runnable {
 private String url;
 private String user;
 private String password;
 private Order order;
 private String supplierMonitorStock=null;//供应商M的库存
 private String supplierHardStock=null;//供应商H的库存

 public Vendor2(String url, String user, String password) {
  this.url = url;
  this.user = user;
  this.password = password;
  init();
 }
 
 //处理两个供应商发送过来的库存信息,并发送订单确认信息到零售商
    public boolean processSupplierStocksMessage(){
     boolean flag=true;
     if(supplierMonitorStock!=null&&supplierHardStock!=null&&order!=null&&order.getQuantity()!=0){
      boolean receiveOrder=true;
      receiveOrder=(Integer.valueOf(supplierMonitorStock)>=Integer.valueOf(order.getQuantity()))
      &&(Integer.valueOf(supplierHardStock)>=Integer.valueOf(order.getQuantity()));
      System.out.println("==========Vendor2: "+new Date().getTime()+

     ": ordered quantity: "+order.getQuantity()+
        " ;supplierMonitorStock: "+supplierMonitorStock+
                                        " ;supplierHardStock: "+supplierHardStock+
                                        " ;accept order?: "+receiveOrder );
      
       flag=false;
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
     user, password, url);// 1创建连接工厂 try {
   Connection connection;
   try {
    connection = connectionFactory.createConnection();
    Session session = connection.createSession(false,
      Session.AUTO_ACKNOWLEDGE);// 3创建(无事务)会话 Queue
    Destination retailerVendorFinal = session
      .createQueue("RetailerVendorQueueFinal");// 与供应商联系的队列 
    MessageProducer producer=session.createProducer(retailerVendorFinal);
    TextMessage finalMessage=session.createTextMessage();
    finalMessage.setText(receiveOrder+"");
    producer.send(finalMessage);
    connection.start();
    producer.close();
   } catch (JMSException e) {
    e.printStackTrace();
   }
     }
      return flag;
    }
   
 // 建立监听,集成商收取零售商订单、收取供应商M、供应商H的消息
 public void init() {
  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    user, password, url);// 1创建连接工厂
  MessageConsumer consumer = null;
  MessageConsumer consumerSM = null;//接收供应商M发过来的消息
  MessageConsumer consumerSH = null;//接收供应商H发过来的消息
  try {
   final Connection connection =  connectionFactory.createConnection();;
   final Session session = connection.createSession(false,
     Session.AUTO_ACKNOWLEDGE);// 3创建(无事务)会话
   //=========================以下接收零售商的消息
   Destination retailerVendorQueue = session
     .createQueue("RetailerVendorQueue222");// 4创建特定队列)
   consumer = session.createConsumer(retailerVendorQueue);
   consumer.setMessageListener(new MessageListener() {//接收零售商消息
    public void onMessage(Message message) {
     if (message != null && message instanceof TextMessage) {
      String messageReceived = null;
      try {
       messageReceived = ((TextMessage) message).getText();
       order = new Order("computers", Integer
         .valueOf(messageReceived));
       //publish();
       //=======================集成商接收到,零售商的订单后,立即发送消息到两个供应商
       Destination VendorSupplierQueue1 = session.createQueue("VendorSupplierQueue1");

      //发送消息给供应商1的队列 
       Destination VendorSupplierQueue2 = session.createQueue("VendorSupplierQueue2");

      //发送消息给供应商2的队列   
       MessageProducer producerS1 = session.createProducer(VendorSupplierQueue1);// 5创建生产者
       MessageProducer producerS2 = session.createProducer(VendorSupplierQueue2);// 5创建生产者
       TextMessage messageS1 = session.createTextMessage();// 6创建消息
       TextMessage messageS2 = session.createTextMessage();// 6创建消息
       messageS1.setText(order.getQuantity()+"");
       messageS2.setText(order.getQuantity()+"");
       producerS1.send(messageS1);
       System.out.println(new Date().getTime()+

     ":Vendor2: send sub order to Supplier1:"+order.getQuantity()+"");
       producerS2.send(messageS2);
       System.out.println(new Date().getTime()+

     ":Vendor2: send sub order to Supplier2:"+order.getQuantity()+"");
       connection.start();
       producerS1.close();//关闭消息生产者
       producerS2.close();//关闭消息生产者
      } catch (JMSException e) {
       e.printStackTrace();
      }

      System.out
        .println(new Date().getTime()+":Vendor2:received message from Retailer2:"
          + messageReceived);
     }
    }
   });
   
   //=========================以下接收供应商M的消息
   Destination vendorSupplier2M = session
   .createQueue("VendorSupplierQueue22");// 4创建特定队列)
   consumerSM =session.createConsumer(vendorSupplier2M);
   consumerSM.setMessageListener(new MessageListener() {//接收供应商M的消息
    public void onMessage(Message message) {
     if (message != null && message instanceof TextMessage) {
      try {
       supplierMonitorStock = ((TextMessage) message).getText();
                   System.out.println(new Date().getTime()+

    " :Vendor2: received Supplier M's stocks message: "+supplierMonitorStock);
      } catch (JMSException e) {
       e.printStackTrace();
      }
     }
    }
   });
   
   //=========================以下接收供应商M的消息
   Destination vendorSupplier2H = session
   .createQueue("VendorSupplierQueue12");// 4创建特定队列) VendorSupplierQueue12
   consumerSH =session.createConsumer(vendorSupplier2H);
   consumerSH.setMessageListener(new MessageListener() {//接收供应商H的消息
    public void onMessage(Message message) {
     if (message != null && message instanceof TextMessage) {
      try {
       supplierHardStock = ((TextMessage) message).getText();
                   System.out.println(new Date().getTime()+

     " :Vendor2: received Supplier H's stocks message: "+supplierHardStock);

      } catch (JMSException e) {
       e.printStackTrace();
      }
     }
    }
   });
   connection.start();
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }

 public void run() {
  do{
  }while(processSupplierStocksMessage());
  
 }
}

3.供应商

import java.util.Date;
import java.util.Random;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
//供应商:1获取集成商发送的订单信息,2查询库存信息,3并发送信息到集成商

 public class Supplier2 {
 private String url;
 private String user;
 private String password;
    private String supplierType;//供应商类型
    private int messageReceivedValue;//从集成商处收到的消息:子订单数量
 public Supplier2(String url, String user, String password,String supplierType) {
  this.url = url;
  this.user = user;
  this.password = password;
  this.supplierType=supplierType;
  init();
 }

    //开启监听,并接收集成商消息(订单数量) public void init() {
  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    user, password, url);// 1创建连接工厂
  MessageConsumer consumer = null;
  try {
   final Connection  connection = connectionFactory.createConnection();
   Session session = connection.createSession(false,
     Session.AUTO_ACKNOWLEDGE);// 3创建(无事务)会话
   String queueName=null;
   if("monitor".equals(supplierType)){
    queueName="VendorSupplierQueue2";
   }else{
    queueName="VendorSupplierQueue1";
   }
   Destination vendorSupplierQueue = session
     .createQueue(queueName);// 4创建特定队列)

   consumer = session.createConsumer(vendorSupplierQueue);
   consumer.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {
     if (message != null && message instanceof TextMessage) {
      String messageReceived = null;
      try {
       messageReceived = ((TextMessage) message).getText();
       messageReceivedValue=Integer.valueOf(messageReceived);
       System.out.println(new Date().getTime()+" :Supplier2: "+supplierType+

      " received sub order. quantity: "+messageReceivedValue);  
       //=======================接收到之后,就发送出去========================
       Session session = connection.createSession(false,
         Session.AUTO_ACKNOWLEDGE);// 3创建(无事务)会话
       String queueName=null;
       if("monitor".equals(supplierType)){
        queueName="VendorSupplierQueue22";
       }else{
        queueName="VendorSupplierQueue12";
       }
       Destination vendorSupplierQueue = session
       .createQueue(queueName);// 4创建特定队列)
        MessageProducer producer=session.createProducer(vendorSupplierQueue);
        TextMessage stocksMessage=session.createTextMessage();
       //检查库存,并返回库存数据给集成商
        int rNumber=new Random().nextInt(2);
       String stocksNumber=(messageReceivedValue+(rNumber==1?1:-1))+"";
           stocksMessage.setText(stocksNumber+"");
        producer.send(stocksMessage);
        System.out.println(new Date().getTime()+":Supplier2: "+supplierType+

       " :send stocks message back to Vendor2. stockNumber:"+stocksNumber);
        connection.start();
        producer.close();
         //===================接收到之后,就发送出去=============================
      } catch (JMSException e) {
       e.printStackTrace();
      }
     }
    }
   });
   connection.start();
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }
}

4.运行

//运行集成商订单处理系统
public class VendorOrderSystemDemo {
 public static void main(String[] args) {
  Retailer2 retailer = new Retailer2("tcp://localhost:61616","brightmart", "12345");//1零售商
  Vendor2 vendor = new Vendor2("tcp://localhost:61616", "brightmart","12345");//2集成商
  new Thread(vendor).start();
  Supplier2 supplierMonitor=new Supplier2("tcp://localhost:61616","brightmart", "12345","monitor");//3.1供应商M
  Supplier2 supplierHard=new Supplier2("tcp://localhost:61616","brightmart", "12345","hard");//3.2供应商H
 }
}

阅读 评论 收藏 转载 喜欢 打印举报
已投稿到:
  • 评论加载中,请稍候...
发评论

       

    验证码: 请点击后输入验证码 收听验证码

    发评论

    以上网友发言只代表其个人观点,不代表新浪网的观点或立场。

      

    新浪BLOG意见反馈留言板 不良信息反馈 电话:4006900000 提示音后按1键(按当地市话标准计费) 欢迎批评指正

    新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 会员注册 | 产品答疑

    新浪公司 版权所有