一、添加用戶和虛擬主機
點擊虛擬主機名稱,可以給用戶授權當前主機
二、Hellow World
1、引入相關依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.5.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency>
2、創建連接的工具類
package com.epoint.cn.util;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/* 用於創建連接的工具類* */public class ConnectionUtil {public static Connection getConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("lly"); connectionFactory.setPassword("lly"); connectionFactory.setVirtualHost("llyhost");return connectionFactory.newConnection(); } }
3、生產者
package com.epoint.cn.service;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Sender {private static final String QUEUE="hellowTest";public static void main(String[] args) throws Exception{//獲取連接Connection connection=ConnectionUtil.getConnection();//創建通道Channel channel=connection.createChannel();//聲明隊列,如果隊列存在什麼都不做,否則創建隊列 //參數1:隊列的名稱 //參數2:是否持久化隊列,我們的隊列模式是在內存中的,如果rabbitmq重啟會消失,如果為true會自動保存到erlang自帶的資料庫 //參數3:是否排外,倆個作用,一個是當我們的連接關閉後是否自動刪除隊列,第二個作用是是否私有當前隊列,如果私有,其他通道不可以訪問 //參數4:是否自動刪除 //參數5:一些其他參數channel.queueDeclare(QUEUE,false,false,false,null);//發送內容channel.basicPublish("",QUEUE,null,"淚流雲".getBytes());//關閉連接channel.close(); } }
4、消費者
package com.epoint.cn.service;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.QueueingConsumer;public class Receiver {private static final String QUEUE = "hellowTest";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); QueueingConsumer consumer = new QueueingConsumer(channel);//接收消息,參數2是自動確認channel.basicConsume(QUEUE, true, consumer);while (true) {//獲取消息QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String s = new String(delivery.getBody()); System.out.print(s); } } }
三、Work模式
一個生產者,一個消息隊列,2個消費者
1、 生產者
package com.epoint.cn.service;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Sender {private static final String QUEUE="work";public static void main(String[] args) throws Exception{//獲取連接Connection connection=ConnectionUtil.getConnection();//創建通道Channel channel=connection.createChannel();//聲明隊列,如果隊列存在什麼都不做,否則創建隊列 //參數1:隊列的名稱 //參數2:是否持久化隊列,我們的隊列模式是在內存中的,如果rabbitmq重啟會消失,如果為true會自動保存到erlang自帶的資料庫 //參數3:是否排外,倆個作用,一個是當我們的連接關閉後是否自動刪除隊列,第二個作用是是否私有當前隊列,如果私有,其他通道不可以訪問 //參數4:是否自動刪除 //參數5:一些其他參數channel.queueDeclare(QUEUE,false,false,false,null);//發送內容for(int i=0;i<100;i++) { channel.basicPublish("",QUEUE,null,("淚流雲"+i).getBytes()); }//關閉連接channel.close(); connection.close(); } }
2、 消費者1
package com.epoint.cn.service;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;public class Receiver1 {private static final String QUEUE = "work";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); channel.basicQos(1);//告訴伺服器在我們沒有確認當前消息完成之前,不要給我發新的消息DefaultConsumer consumer=new DefaultConsumer(channel) { @Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//當我們接收到消息調用System.out.println("消費者1的消息內容是:"+new String(body));//模擬耗時try { Thread.sleep(10); }catch (Exception e){ }//確認channel.basicAck(envelope.getDeliveryTag(),false);//參數2為false表示確認收到消息,true為拒絕收到消息} };//註冊消費者,參數2手動確認,代表我們收到消息後需要手動告訴伺服器我收到消息了channel.basicConsume(QUEUE,false,consumer); } }
3、 消費者2
package com.epoint.cn.service;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;public class Receiver2 {private static final String QUEUE = "work";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); channel.basicQos(1);//告訴伺服器在我們沒有確認當前消息完成之前,不要給我發新的消息DefaultConsumer consumer=new DefaultConsumer(channel) { @Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//當我們接收到消息調用System.out.println("消費者2的消息內容是:"+new String(body));//模擬耗時try { Thread.sleep(200); }catch (Exception e){ }//確認channel.basicAck(envelope.getDeliveryTag(),false);//參數2為false表示確認收到消息,true為拒絕收到消息} };//註冊消費者,參數2手動確認,代表我們收到消息後需要手動告訴伺服器我收到消息了channel.basicConsume(QUEUE,false,consumer); } }
四、消費訂閱模式
發送消息到交換機,倆個隊列綁定到同一個交換機,從交換機獲取到相同的消息
1、 生產者
package com.epoint.cn.publish;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Sender {private static final String EXCHANGE_NAME = "testexchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();//聲明交換機channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//定義一個交換機,類型是fanout,即發布訂閱模式channel.basicPublish(EXCHANGE_NAME, "", null, "發布訂閱模式-".getBytes());//關閉連接channel.close(); connection.close(); } }
2、 消費者1
package com.epoint.cn.publish;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;public class Recever1 {private static final String EXCHANGE_NAME = "testexchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel(); channel.queueDeclare("testqueue1", false, false, false, null);//綁定隊列到交換機channel.queueBind("testqueue1", EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者111" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume("testqueue1", false, defaultConsumer); } }
3、 消費者2
package com.epoint.cn.publish;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;public class Recever2 {private static final String EXCHANGE_NAME = "testexchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel(); channel.queueDeclare("testqueue2", false, false, false, null);//綁定隊列到交換機channel.queueBind("testqueue2", EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者222" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume("testqueue2", false, defaultConsumer); } }
五、route路由模式
發送者發送消息到交換機,可以有多個隊列關聯到交換機,然而跟消費訂閱模式不一樣,不是每個消費者都能獲取到消息,而需要根據key來判斷。
1、 生產者
package com.epoint.cn.route;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Sender {private static final String EXCHANGE_NAME = "routeexchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();//聲明交換機channel.exchangeDeclare(EXCHANGE_NAME, "direct");//定義路由格式的交換機channel.basicPublish(EXCHANGE_NAME, "key3", null, "路由模式-".getBytes());//關閉連接channel.close(); connection.close(); } }
2、 消費者1
package com.epoint.cn.route;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;public class Recever1 {private static final String EXCHANGE_NAME = "routeexchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel(); channel.queueDeclare("routequeue1", false, false, false, null);//綁定隊列到交換機channel.queueBind("routequeue1", EXCHANGE_NAME, "key1"); channel.queueBind("routequeue1", EXCHANGE_NAME, "key2"); channel.basicQos(1); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者111" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume("routequeue1", false, defaultConsumer); } }
3、 消費者2
package com.epoint.cn.route;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;public class Recever2 {private static final String EXCHANGE_NAME = "routeexchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel(); channel.queueDeclare("routequeue2", false, false, false, null);//綁定隊列到交換機channel.queueBind("routequeue2", EXCHANGE_NAME, "key1"); channel.queueBind("routequeue2", EXCHANGE_NAME, "key3"); channel.basicQos(1); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者2222" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume("routequeue2", false, defaultConsumer); } }
六、topic模式
Topic模式跟路由模式的區別是key的對應引入了通配符,而路由模式需要key值完全一致,用「*」和「#」匹配,「*」只能匹配一個詞,而「#可以匹配多個」
1、生產者
package com.epoint.cn.topic;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Sender {private static final String EXCHANGE_NAME = "topicexchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();//聲明交換機channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.basicPublish(EXCHANGE_NAME, "key.3.1", null, "topic模式11-".getBytes());//關閉連接channel.close(); connection.close(); } }
2、消費者1
package com.epoint.cn.topic;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;public class Recever1 {private static final String EXCHANGE_NAME = "topicexchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel(); channel.queueDeclare("topicqueue1", false, false, false, null);//綁定隊列到交換機channel.queueBind("topicqueue1", EXCHANGE_NAME, "key.#"); channel.queueBind("topicqueue1", EXCHANGE_NAME, "asa"); channel.basicQos(1); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者111" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume("topicqueue1", false, defaultConsumer); } }
3、消費者2
package com.epoint.cn.topic;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;public class Recever2 {private static final String EXCHANGE_NAME = "topicexchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel(); channel.queueDeclare("topicqueue2", false, false, false, null);//綁定隊列到交換機channel.queueBind("topicqueue2", EXCHANGE_NAME, "key.*"); channel.queueBind("topicqueue2", EXCHANGE_NAME, "asa.1"); channel.basicQos(1); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者222" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume("topicqueue2", false, defaultConsumer); } }
七、Spring的整合
1、pom依賴
<!--rabbitmq依賴 --><dependency><groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.3.7.RELEASE</version> </dependency>
2、applicationContext
<!--1、定義連接工廠--><rabbit:connection-factory id="connectionFactory" host="127.0.0.1" username="lly" port="5672" password="lly"virtual-host="llyhost"></rabbit:connection-factory><!--消息是發送到交換機還是隊列--><!--定義rabbitmq的模板,如果發送到隊列則寫隊列,如果發送到交換機則寫交換機--><rabbit:template id="template" exchange="fanoutexchange" connection-factory="connectionFactory"></rabbit:template> <rabbit:admin connection-factory="connectionFactory"></rabbit:admin><!--定義隊列--><rabbit:queue name="myqueue" auto-declare="true"></rabbit:queue><!--定義交換機--><rabbit:fanout-exchange name="fanoutexchange" auto-delete="true"><!--將隊列綁定到交換機--><rabbit:bindings> <rabbit:binding queue="myqueue"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange><!--定義監聽器,當收到消息的時候會執行內部的配置--><rabbit:listener-container connection-factory="connectionFactory"><!--定義哪個類裡面的什麼方法用於收到的消息--><rabbit:listener ref="consumer" method="test" queue-names="myqueue"></rabbit:listener> </rabbit:listener-container><!--定義消費者--><bean id="consumer" class="com.epoint.cn.spring.MyConsumer"></bean>
3、消費者類
package com.epoint.cn.spring;public class MyConsumer {public void test(String message){ System.out.println(message); } }
4、 測試類
package com.epoint.cn.spring;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class MyTest {public static void main(String[] args) {ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); RabbitTemplate rabbitTemplate=applicationContext.getBean(RabbitTemplate.class); rabbitTemplate.convertAndSend("hellow!"); ((ClassPathXmlApplicationContext) applicationContext).destroy(); } }
八、rabbitmq持久化
消息發出去之後,萬一還沒有獲取到,ranbbitmq伺服器被重啟會導致數據丟失,這時候就需要持久化
1、生產者
package com.epoint.cn.route;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.MessageProperties;public class Sender {private static final String EXCHANGE_NAME = "exchangesf";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();//聲明持久化交換機channel.exchangeDeclare(EXCHANGE_NAME, "direct",true,false,null);//定義路由格式的交換機channel.basicPublish(EXCHANGE_NAME, "key1", MessageProperties.PERSISTENT_TEXT_PLAIN, "持久化消息1".getBytes());//關閉連接channel.close(); connection.close(); } }
2、消費者
package com.epoint.cn.route;import com.epoint.cn.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;public class Recever1 {private static final String EXCHANGE_NAME = "exchangesf";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct",true,false,null); channel.queueDeclare("routequeue", true, false, false, null);//綁定隊列到交換機channel.queueBind("routequeue", EXCHANGE_NAME, "key1"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("收到消息" + new String(body)); } }; channel.basicConsume("routequeue", true, defaultConsumer); } }