(转)spring JMS、activemq中消费者收不到生产者发送的消息的原因解析
标签:
杂谈 |
我们使用jms一般是使用spring-jms和activemq相结合,通过spring的JmsTemplate发送消息到指定的Destination。
- <bean
id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" -
destroy-method="stop"> -
<propertyname="connectionFactory"> -
<beanclass="org.apache.activemq.ActiveMQConnectionFactor y" > -
<propertyname="brokerURL" -
value="failover:(tcp://192.168.20.23:61616?wireFormat.maxInactivityDuration=0)&maxReconnectDelay=1000"/> -
</bean> -
</property> -
<propertyname="maxConnections" value="1"></property> - </bean>
定义jmsTempalte的实例:
- <bean
id="oamTmpTopic" class="org.apache.activemq.command.ActiveMQTopic"> -
<constructor-argvalue="oamTmpTopic" /> - </bean>
-
- <bean
id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> -
<propertyname="connectionFactory" ref="connectionFactory" /> -
<propertyname="defaultDestination" ref="oamTmpTopic" /> -
<propertyname="explicitQosEnabled" value="true" /> -
<propertyname="deliveryMode" value="1" /> - </bean>
- import
javax.jms.JMSException; - import
javax.jms.Message; - import
javax.jms.Session; - import
javax.jms.TextMessage; - import
javax.jms.Topic; -
- import
org.springframework.jms.core.JmsTemplate; - import
org.springframework.jms.core.MessageCreator; -
- public
class SendMessage { -
-
privateJmsTemplate jmsTemplate; -
-
privateString topicName; -
-
privateTopic topic; -
-
publicvoid setJmsTemplate(JmsTemplate jmsTemplate) { -
this.jmsTemplate= jmsTemplate; -
} -
-
publicvoid setTopicName(String topicName) { -
this.topicName= topicName; -
} -
-
publicvoid sendMessage( finalString message) { -
-
try{ -
if(topic null)== { -
topic = jmsTemplate.getConnectionFactory().createConnection() -
false,.createSession( Session.AUTO_ACKNOWLEDGE) -
.createTopic(topicName); -
} -
newjmsTemplate.send(topic, MessageCreator() { -
-
@Override -
publicMessage createMessage(Session session) -
throwsJMSException { -
-
TextMessage textMessage = session -
.createTextMessage(message); -
returntextMessage; -
} -
}); -
catch} (JMSException e) { -
e.printStackTrace(); -
} -
} - }
定义消费者TestListener.java:
- import
javax.jms.JMSException; - import
javax.jms.Message; - import
javax.jms.MessageListener; - import
javax.jms.Session; - import
javax.jms.Topic; -
- import
org.springframework.jms.core.JmsTemplate; - import
org.springframework.jms.listener.DefaultMessageListenerCo ntainer; -
- public
class TestListener implementsMessageListener{ -
-
privateJmsTemplate jmsTemplate; -
-
privateString topicName; -
-
publicTestListener(JmsTemplate jmsTemplate,String topicName){ -
-
this.jmsTemplate= jmsTemplate; -
-
this.topicName= topicName; -
-
Topic topic; -
try{ -
this.jmsTemplate.getConnectionFactory().createConnection().createSession(false,topic = -
this.topicName);Session.AUTO_ACKNOWLEDGE).createTopic( -
-
newDefaultMessageListenerCo ntainer dmc = DefaultMessageListenerCo ntainer(); -
true);dmc.setPubSubDomain( -
dmc.setDestination(topic); -
this.jmsTemplate.getConnectionFactory());dmc.setConnectionFactory( -
true);dmc.setPubSubNoLocal( -
this);dmc.setMessageListener( -
dmc.setSessionAcknowledgeMod e(Session.AUTO_ACKNOWLEDGE); -
dmc.initialize(); -
dmc.start(); -
catch} (JMSException e) { -
e.printStackTrace(); -
} -
} -
-
@Override -
publicvoid onMessage(Message message) { -
-
System.out.println(message); -
} -
- }
- <bean
id="testListener" class="net.kentop.test.jms.TestListener"> - <constructor-arg
ref="jmsTemplate"></constructor-arg> - <constructor-arg
value="testTopic"></constructor-arg> - </bean>
-
- <bean
id="sendMessage" class="net.kentop.test.jms.SendMessage"> - <property
name="jmsTemplate" ref="jmsTemplate"></property> - <property
name="topicName" value="testTopic"></property> - </bean>
- import
org.springframework.context.ApplicationContext; - import
org.springframework.context.support.ClassPathXmlApplicationC ontext; -
- public
class BeanTest { -
-
publicstatic ApplicationContext newcontext = ClassPathXmlApplicationC "infrastructure-config.xml");ontext( -
-
publicstatic void main(String args[]){ -
-
"sendMessage");SendMessage sendMessage = (SendMessage) context.getBean( -
-
"hahahha,我来测试了");sendMessage.sendMessage( -
"dfsdfsfsdfsdfsdf");sendMessage.sendMessage( -
"comesendMessage.sendMessage( on );baby!" -
"hahahha,我来测试了2");sendMessage.sendMessage( -
"dfsdfsfsdfsdfsdf2");sendMessage.sendMessage( -
"comesendMessage.sendMessage( on );baby!2" -
"hahahha,我来测试了3");sendMessage.sendMessage( -
"dfsdfsfsdfsdfsdf3");sendMessage.sendMessage( -
"comesendMessage.sendMessage( on );baby!3" -
"hahahha,我来测试了4");sendMessage.sendMessage( -
"dfsdfsfsdfsdfsdf4");sendMessage.sendMessage( -
"comesendMessage.sendMessage( on );baby!4" -
} - }
- DefaultMessageListenerCo
ntainer newdmc = DefaultMessageListenerCo ntainer(); - dmc.setPubSubDomain(true);
- dmc.setDestination(topic);
- dmc.setConnectionFactory(this.jmsTemplate2.getConnectionFactory());
-
true);dmc.setPubSubNoLocal( -
this);dmc.setMessageListener( - dmc.setSessionAcknowledgeMod
e(Session.AUTO_ACKNOWLEDGE); - dmc.initialize();
- dmc.start();
- dmc.setPubSubNoLocal(true);
- dmc.setPubSubDomain(true);
-
<SPAN
style= "FONT-SIZE:small" >dmc.setPubSubDomain(true);</SPAN>
dmc.setPubSubDomain(true);
- dmc.setPubSubNoLocal(true);
- <bean
id="connectionFactory2" class="org.apache.activemq.pool.PooledConnectionFactory" - destroy-method="stop">
- <property
name="connectionFactory"> -
<beanclass="org.apache.activemq.ActiveMQConnectionFactor y" > -
<propertyname="brokerURL" -
value="failover:(tcp://192.168.20.23:61616?wireFormat.maxInactivityDuration=0)&maxReconnectDelay=1000"/> -
</bean> - </property>
- <property
name="maxConnections" value="1"></property> - lt;/bean>
- <bean
id="jmsTemplate2" class="org.springframework.jms.core.JmsTemplate"> - <property
name="connectionFactory" ref="connectionFactory2" /> - <property
name="defaultDestination" ref="oamTmpTopic" /> - <property
name="explicitQosEnabled" value="true" /> - <property
name="deliveryMode" value="1" /> - lt;/bean>
- import
javax.jms.JMSException; - import
javax.jms.Message; - import
javax.jms.MessageListener; - import
javax.jms.Session; - import
javax.jms.Topic; -
- import
org.springframework.jms.core.JmsTemplate; - import
org.springframework.jms.listener.DefaultMessageListenerCo ntainer; -
- public
class TestListener implementsMessageListener{ -
-
privateJmsTemplate jmsTemplate; -
-
privateJmsTemplate jmsTemplate2; -
-
privateString topicName; -
-
publicTestListener(JmsTemplate jmsTemplate,String topicName,JmsTemplate jmsTemplate2){ -
-
this.jmsTemplate= jmsTemplate; -
-
this.topicName= topicName; -
-
this.jmsTemplate2= jmsTemplate2; -
-
Topic topic; -
try{ -
this.jmsTemplate.getConnectionFactory().createConnection().createSession(false,topic = -
this.topicName);Session.AUTO_ACKNOWLEDGE).createTopic( -
-
newDefaultMessageListenerCo ntainer dmc = DefaultMessageListenerCo ntainer(); -
true);dmc.setPubSubDomain( -
dmc.setDestination(topic); -
this.jmsTemplate2.getConnectionFactory());dmc.setConnectionFactory( -
true);dmc.setPubSubNoLocal( -
this);dmc.setMessageListener( -
dmc.setSessionAcknowledgeMod e(Session.AUTO_ACKNOWLEDGE); -
dmc.initialize(); -
dmc.start(); -
catch} (JMSException e) { -
e.printStackTrace(); -
} -
} -
-
@Override -
publicvoid onMessage(Message message) { -
-
System.out.println(message); -
} -
- }
- <bean
id="testListener" class="net.kentop.test.jms.TestListener"> - <constructor-arg
ref="jmsTemplate"></constructor-arg> - <constructor-arg
value="testTopic"></constructor-arg> - <constructor-arg
ref="jmsTemplate2"></constructor-arg> - </bean>

加载中…