ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);
String message = "hello world! ";
channel.basicPublish(EXCHANGE_NAME, "", null, (message+i).getBytes());
System.out.println("Sent msg finish");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routkey2", null);
System.out.println(" Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) { message = new String(body, "UTF-8"); catch (UnsupportedEncodingException e) System.out.println("Received msg='" + message + "'"); channel.basicConsume(QUEUE_NAME, true, consumer);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
String message = "hello world! ";
channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message+i).getBytes());
System.out.println("Sent msg is '" + message + "'");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null);
System.out.println(" Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel)
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
message = new String(body, "UTF-8");
catch (UnsupportedEncodingException e)
System.out.println("1 Received msg='" + message + "'");
channel.basicConsume(QUEUE_NAME, true, consumer);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);
String message = "hello world! ";
for (int i = 0; i < 100; i++)
channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message + i).getBytes());
System.out.println("Sent msg is '" + message + "'");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//绑定路由和队列// 把队列绑定到路由上并指定headers
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null);
System.out.println("1 Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel)
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) message = new String(body, "UTF-8");
catch (UnsupportedEncodingException e)
System.out.println("1 Received msg='" + message + "'");
channel.basicConsume(QUEUE_NAME, true, consumer);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "headers", true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
Map<String, Object> headers = new Hashtable<String, Object>();
// all:表示所有的键值对都匹配才能接受到消息
headers.put("x-match", "all");
headers.put("name", "jack");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", headers);
System.out.println(" Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel)
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException System.out.println("Received start --------------");
for (Entry<String, Object> entry : properties.getHeaders().entrySet())
System.out.println(entry.getKey() + "=" + entry.getValue());
String message = new String(body, "UTF-8");
System.out.println("msg='" + message + "'");
System.out.println("Received end --------------");
channel.basicConsume(QUEUE_NAME, true, consumer);