博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ
阅读量:4839 次
发布时间:2019-06-11

本文共 7419 字,大约阅读时间需要 24 分钟。

 

官网:http://activemq.apache.org/

下载windows版本

进入解压后的主文件下的bin下的win64目录,启动activemq.bat文件

访问:http://localhost:8161/ 用户名和密码 都是 admin

 

 

ActiveMQ 使用的是标准生产者和消费者模型

  有两种数据结构 Queue、Topic

1、 Queue 队列 ,生产者生产了一个消息,只能由一个消费者进行消费

2、 Topic 话题,生产者生产了一个消息,可以由多个消费者进行消费

 

使用Java程序操作activeMQ

引入maven坐标

org.apache.activemq
activemq-all
${activemq.version}

编写生产者代码

public class ActiveMqProducer {    @Test    public void testActive() throws Exception{        //连接工厂,使用默认的用户名,密码                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();        //获取一个连接        Connection connection = connectionFactory.createConnection();                //建立会话        Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE );                //创建队列或者话题对象        Queue queue = session.createQueue("HelloWorld");                //创建生产者或者消费者对象        MessageProducer producer = session.createProducer(queue);                for (int i = 0; i < 10; i++) {            //发送消息            producer.send(session.createTextMessage("你好,MQ"+i));        }        //提交事物        session.commit();    }}

 

使用默认tcp连接activeMQ端口61616可以看到

编写MQ消费者代码

  

public class ActiveMqConsumer {    @Test    public void testActive() throws Exception{        //连接工厂,使用默认的用户名,密码                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();        //获取一个连接        Connection connection = connectionFactory.createConnection();                //开启连接        connection.start();                //建立会话,是否使用事物,如果使用则要提交        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE );                //创建队列或者话题对象        Queue queue = session.createQueue("HelloWorld");                //创建生产者或者消费者对象        MessageConsumer consumer = session.createConsumer(queue);                while(true){            TextMessage message = (TextMessage) consumer.receive(1000);            if(message != null) {                System.out.println(message.getText());            }else {                break;            }        }    }}

再次查看发现已经被消费

 

使用监听器,监听消息的内容,进行消费

public class ActiveMqConsumerListener {    @Test    public void testActive() throws Exception{        //连接工厂,使用默认的用户名,密码                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();        //获取一个连接        Connection connection = connectionFactory.createConnection();        connection.start();        //建立会话        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE );                //创建队列或者话题对象        Queue queue = session.createQueue("HelloWorld");                //创建生产者或者消费者对象        MessageConsumer consumer = session.createConsumer(queue);                //设置监听器        consumer.setMessageListener(new MessageListener() {                        public void onMessage(Message message) {                TextMessage textMessage = (TextMessage) message;                try {                    System.out.println(textMessage.getText());                } catch (Exception e) {                    e.printStackTrace();                }            }        });        while(true){}    }}

 

 

spring整合ActiveMQ

引入maven坐标

    
org.apache.activemq
activemq-all
${activemq.version}
org.springframework
spring-jms
${spring.version}

 

引入spring整合activeMQ文件

编写spring整合active核心配置

  

    <!-- 扫描包 -->

    <context:component-scan base-package="com.learn.activemq" />

 query生产者

@Servicepublic class QueueSender {    // 注入jmsTemplate    @Autowired    @Qualifier("jmsQueueTemplate")    private JmsTemplate jmsTemplate;    public void send(String queueName, final String message) {        jmsTemplate.send(queueName, new MessageCreator() {            public Message createMessage(Session session) throws JMSException {                return session.createTextMessage(message);            }        });    }}

topic生产者

 

@Servicepublic class TopicSender {    // 注入jmsTemplate    @Autowired    @Qualifier("jmsTopicTemplate")    private JmsTemplate jmsTemplate;    public void send(String topicName, final String message) {        jmsTemplate.send(topicName, new MessageCreator() {            public Message createMessage(Session session) throws JMSException {                return session.createTextMessage(message);            }        });    }}

query消费者1和消费者2

@Servicepublic class QueueConsumer1 implements MessageListener {    public void onMessage(Message message) {        TextMessage textMessage = (TextMessage) message;        try {            System.out                    .println("消费者QueueConsumer1获取消息:" + textMessage.getText());        } catch (JMSException e) {            e.printStackTrace();        }    }}
@Servicepublic class QueueConsumer2 implements MessageListener {    public void onMessage(Message message) {        TextMessage textMessage = (TextMessage) message;        try {            System.out                    .println("消费者QueueConsumer2获取消息:" + textMessage.getText());        } catch (JMSException e) {            e.printStackTrace();        }    }}

topic消费者1和消费者2

@Servicepublic class TopicConsumer1 implements MessageListener {    public void onMessage(Message message) {        TextMessage textMessage = (TextMessage) message;        try {            System.out                    .println("消费者TopicConsumer1获取消息:" + textMessage.getText());        } catch (JMSException e) {            e.printStackTrace();        }    }}
@Servicepublic class TopicConsumer2 implements MessageListener {    public void onMessage(Message message) {        TextMessage textMessage = (TextMessage) message;        try {            System.out                    .println("消费者TopicConsumer2获取消息:" + textMessage.getText());        } catch (JMSException e) {            e.printStackTrace();        }    }}

 

配置 listener 监听器,在 applicationContext-mq-consumer.xml

生产者测试代码

@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations = "classpath:applicationContext-mq.xml")public class ProducerTest {    @Autowired    private QueueSender queueSender;    @Autowired    private TopicSender topicSender;    @Test    public void testSendMessage() {        queueSender.send("spring_queue", "queue");        topicSender.send("spring_topic", "topic");    }}

消费者测试代码

@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations = "classpath:applicationContext-mq-consumer.xml")public class ConsumerTest {    @Test    public void testConsumerMessage() {        while (true) {            // junit退出,防止进程死掉        }    }}

 

转载于:https://www.cnblogs.com/learnjfm/p/7422480.html

你可能感兴趣的文章
javascript逗号添加函数
查看>>
Codeforces Round #307 (Div. 2) E. GukiZ and GukiZiana 分块
查看>>
hdu 5452 Minimum Cut 树形dp
查看>>
perf4j @Profiled常用写法
查看>>
配置的热更新
查看>>
ios view的frame和bounds之区别(位置和大小)
查看>>
USB小白学习之路(11) Cy7c68013A驱动电路设计注意事项(转)
查看>>
Luogu 2530 化工厂装箱员
查看>>
自定义webUI实例
查看>>
用NSAttributedString实现简单的图文混排
查看>>
多语境的操作
查看>>
SNS营销——网商成功之道
查看>>
jqgrid 加载时第一页面只显示多少条数据
查看>>
magic模块 :Exception Value:failed to find libmagic. Check your installation
查看>>
C#小游戏(文字对战游戏)
查看>>
COGS2314. [HZOI 2015] Persistable Editor
查看>>
my college goal
查看>>
java switch case 枚举类型的反编译结果
查看>>
关于dubbo+shiro导致dubbo无法注入到Realm的问题解决方案
查看>>
entity framework使用技巧
查看>>