JMS是Java消息服务的缩写,它提供了一种以松散耦合、灵活的方式集成应用程序的机制。JMS在存储和转发的基础上跨应用程序异步传递数据。应用程序通过面向消息的中间件(MOM)进行通信,MOM充当中介而不直接通信。
JMS体系结构
JMS的主要组成部分是:
有几个JMS提供者是可用的,比如Apache ActiveMQ和OpenMQ。在这里,我使用了Apache主动式MQ。
在windows上安装和启动Apache ActiveMQ
activemq
启动ActiveMQ后,您可以使用http://localhost:8161/admin/访问管理控制台并执行管理任务
JMS消息传递模型
JMS有两种消息传递模型,点对点消息传递模型和发布者订阅者消息传递模型。
点对点消息传递模型
生产者将消息发送到JMS提供者中的指定队列,并且只有一个监听该队列的消费者接收到该消息。
点对点模型示例
示例1和示例2几乎相似,唯一的区别是示例1在程序中创建队列,示例2使用jndi.properties用于命名查找和创建队列的文件。
例1
package com.eviac.blog.jms;
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.log4j.BasicConfigurator;
public class Producer {
public Producer() throws JMSException, NamingException {
// Obtain a JNDI connection
InitialContext jndi = new InitialContext();
// Look up a JMS connection factory
ConnectionFactory conFactory = (ConnectionFactory) jndi
.lookup("connectionFactory");
Connection connection;
// Getting JMS connection from the server and starting it
connection = conFactory.createConnection();
try {
connection.start();
// JMS messages are sent and received using a Session. We will
// create here a non-transactional session object. If you want
// to use transactions you should set the first parameter to 'true'
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = (Destination) jndi.lookup("MyQueue");
// MessageProducer is used for sending messages (as opposed
// to MessageConsumer which is used for receiving them)
MessageProducer producer = session.createProducer(destination);
// We will send a small text message saying 'Hello World!'
TextMessage message = session.createTextMessage("Hello World!");
// Here we are sending the message!
producer.send(message);
System.out.println("Sent message '" + message.getText() + "'");
} finally {
connection.close();
}
}
public static void main(String[] args) throws JMSException {
try {
BasicConfigurator.configure();
new Producer();
} catch (NamingException e) {
e.printStackTrace();
}
}
}
package com.eviac.blog.jms;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.BasicConfigurator;
public class Consumer {
// URL of the JMS server
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
// Name of the queue we will receive messages from
private static String subject = "MYQUEUE";
public static void main(String[] args) throws JMSException {
BasicConfigurator.configure();
// Getting JMS connection from the server
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
// Creating session for seding messages
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// Getting the queue
Destination destination = session.createQueue(subject);
// MessageConsumer is used for receiving (consuming) messages
MessageConsumer consumer = session.createConsumer(destination);
// Here we receive the message.
// By default this call is blocking, which means it will wait
// for a message to arrive on the queue.
Message message = consumer.receive();
// There are many types of Message and TextMessage
// is just one of them. Producer sent us a TextMessage
// so we must cast to it to get access to its .getText()
// method.
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message '" + textMessage.getText()
+ "'");
}
connection.close();
}
}
例2
jndi.properties
# START SNIPPET: jndi
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
# use the following property to configure the default connector
java.naming.provider.url = vm://localhost
# use the following property to specify the JNDI name the connection factory
# should appear as.
#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry
# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.MyQueue = example.MyQueue
# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
topic.MyTopic = example.MyTopic
# END SNIPPET: jndi
package com.eviac.blog.jms;
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.log4j.BasicConfigurator;
public class Producer {
public Producer() throws JMSException, NamingException {
// Obtain a JNDI connection
InitialContext jndi = new InitialContext();
// Look up a JMS connection factory
ConnectionFactory conFactory = (ConnectionFactory) jndi
.lookup("connectionFactory");
Connection connection;
// Getting JMS connection from the server and starting it
connection = conFactory.createConnection();
try {
connection.start();
// JMS messages are sent and received using a Session. We will
// create here a non-transactional session object. If you want
// to use transactions you should set the first parameter to 'true'
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = (Destination) jndi.lookup("MyQueue");
// MessageProducer is used for sending messages (as opposed
// to MessageConsumer which is used for receiving them)
MessageProducer producer = session.createProducer(destination);
// We will send a small text message saying 'Hello World!'
TextMessage message = session.createTextMessage("Hello World!");
// Here we are sending the message!
producer.send(message);
System.out.println("Sent message '" + message.getText() + "'");
} finally {
connection.close();
}
}
public static void main(String[] args) throws JMSException {
try {
BasicConfigurator.configure();
new Producer();
} catch (NamingException e) {
e.printStackTrace();
}
}
}
发布者订阅者模型
发布者将消息发布到JMS提供者中的指定主题,订阅该主题的所有订阅者都会收到消息。请注意,只有活动的订户才会收到消息。
点对点模型示例
package com.eviac.blog.jms;
import javax.jms.*;
import javax.naming.*;
import org.apache.log4j.BasicConfigurator;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class DemoPublisherSubscriberModel implements javax.jms.MessageListener {
private TopicSession pubSession;
private TopicPublisher publisher;
private TopicConnection connection;
/* Establish JMS publisher and subscriber */
public DemoPublisherSubscriberModel(String topicName, String username,
String password) throws Exception {
// Obtain a JNDI connection
InitialContext jndi = new InitialContext();
// Look up a JMS connection factory
TopicConnectionFactory conFactory = (TopicConnectionFactory) jndi
.lookup("topicConnectionFactry");
// Create a JMS connection
connection = conFactory.createTopicConnection(username, password);
// Create JMS session objects for publisher and subscriber
pubSession = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicSession subSession = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
// Look up a JMS topic
Topic chatTopic = (Topic) jndi.lookup(topicName);
// Create a JMS publisher and subscriber
publisher = pubSession.createPublisher(chatTopic);
TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);
// Set a JMS message listener
subscriber.setMessageListener(this);
// Start the JMS connection; allows messages to be delivered
connection.start();
// Create and send message using topic publisher
TextMessage message = pubSession.createTextMessage();
message.setText(username + ": Howdy Friends!");
publisher.publish(message);
}
/*
* A client can register a message listener with a consumer. A message
* listener is similar to an event listener. Whenever a message arrives at
* the destination, the JMS provider delivers the message by calling the
* listener's onMessage method, which acts on the contents of the message.
*/
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException jmse) {
jmse.printStackTrace();
}
}
public static void main(String[] args) {
BasicConfigurator.configure();
try {
if (args.length != 3)
System.out
.println("Please Provide the topic name,username,password!");
DemoPublisherSubscriberModel demo = new DemoPublisherSubscriberModel(
args[0], args[1], args[2]);
BufferedReader commandLine = new java.io.BufferedReader(
new InputStreamReader(System.in));
// closes the connection and exit the system when 'exit' enters in
// the command line
while (true) {
String s = commandLine.readLine();
if (s.equalsIgnoreCase("exit")) {
demo.connection.close();
System.exit(0);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}