RabbitMQ Tutorials – Java版-01-“Hello World!”实现

RabbitMQ Tutorials - Java版-01-"Hello World!"实现

英文原文网址:http://www.rabbitmq.com/getstarted.html
翻译 by 刘岳峰

前提 本教程要求已安装RabbitMQ,并且运行在localhost的标准端口(5672)。如果您使用了不同的host,端口或者凭证,需要调整连接设置。

简介

RabbitMQ是一个消息代理。本质上, 它从生产者接受消息,并传送给消费者。在这两者之间, 它可以根据你给它的规则来路由,缓冲和持久化消息。

RabbitMQ和消息这里使用了一些术语。

  • 所谓生产,即发送。 一个发送消息的程序就是一个生产者。我们用一个大写的P把它画成这样:

    这里写图片描述

  • 一个队列(queue)就是一个邮箱的名字。它存活在RabbitMQ内部。尽管消息流经RabbitMQ和你的应用,它们只能被存储在一个队列中。一个queue没有被绑定任何限制,你可以在里面存储任意多的消息 – 其本质上是一个无限缓冲区。许多生产者可以发送消息到一个队列 – 许多消费者可以从一个队列里接收数据。队列可以被画成这样,上面即为它的名字:

    这里写图片描述

  • 消费与接收具有相似的含义。一个消费者就是一个多数时间用来等待接收消息的程序。我们用一个大写的C来描述它:

    这里写图片描述

要注意的是生产者,消费者和代理无需驻留在同一台机器上;事实上在大部分应用上,它们都没有这样。

“Hello World” (使用Java Client)

在这个部分,我们会用Java写两段程序;一个发送单个消息的生产者,和一个接收并打印单个消息的消费者。请不要在意这些Java API的细节,重点是从简单的代码里入门。这是个消息机制的”Hello World”。

下面的图里面,”P”是我们的生产者而”C”是我们的消费者。中间的方框是一个队列 – 即一个消息缓冲区,RabbitMQ代表消费者持有它。
这里写图片描述

Java客户端库

RabbitMQ支持多种协议。本教程使用AMQP 0-9-1,这是一个针对消息的开放,通用协议。RabbitMQ有很多用不同语言写的客户端。我们将使用RabbitMQ提供的Java客户端。

下载客户端库包,并检查它的签名。解压到你的工作目录并从里面获取JAR文件:
$ unzip rabbitmq-java-client-bin-*.zip
$ cp rabbitmq-java-client-bin-*/*.jar./

(Maven的库里也有RabbitMQ的Java客户端,groupId是com.rabbitmq,artifactId是
amqp-client。)

现在我们已经获得Java客户端和它的依赖,是时候写一些代码了。

发送

这里写图片描述

我们将称消息发送者命名为Send,而将消息接收者命名为Recv。发送者会连接到RabbitMQ,发送单个消息,然后退出。

在Send.java里,我们需要导入一些类:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

设置类并命名队列:

public class Send {
   
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException {
      ...
  }
}

然后我们建立到server的连接:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

connection是socket连接的抽象,它为我们负责协议版本协商和认证等。在这里我们连接到一个本地亦即localhost的代理。如果我们想连接到其它机器的代理商,我们只需在这里简单的指定它的名字或者IP地址即可。

下一步我们建立一个channel。使任务完成得大部分的API都由该channel提供。

为了发送,我们必须声明一个队列作为目的地;然后我们会发布一个消息到这个队列:

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

声明一个队列是幂等的 – 它只在不存在时被创建。消息内容是一个byte数组,因此你可以编码任意内容。

最后,我们关闭channel和连接;

    channel.close();
    connection.close();

这里是完整的Send.java类。

发送无法工作!

如果这是你第一次使用RabbitMQ,当你看不到”Sent”时,你可能抓破头也搞不清哪里出错了。这可能是由于代理启动时没有足够的磁盘空间(默认它需要至少1Gb空闲),因此会拒绝接受消息。检查代理的logfile以确认并在必要时降低这一限制。配置文件的文档可以帮你设置disk_free_limit。

接收
发送端代码就是这样。RabbitMQ会将消息推送到我们的接收端,因此不同于发布单个消息的发送端,接收端需要一直保持运行来监听消息并打印它们。
这里写图片描述
Recv.java的代码拥有和Send类似的导入:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

额外的DefaultConsumer 类用来实现Consumer接口,我们用该类缓存server端推送过来的消息。

设置部分和sender一样;我们打开一个连接和channel,然后声明一个队列,在这里我们可以得到要消费的消息。注意这里的队列需要和发布端的队列匹配。

public class Recv {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException,
             java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    ...
    }
}

值得注意的是我们在这里也声明了队列。这是因为我们可能会在启动发送端之前启动接收端,我们需要在试图消费队列里的消息前确保队列已经存在。

我们要告知server从队列里将消息发送给我们。因为消息会被server异步的传输给我们,所以我们提供一个callback对象,用来在准备好使用这些消息之前先缓冲它们。这就是DefaultConsumer子类所负责的。

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
          throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(QUEUE_NAME, true, consumer);

这里是完整的Recv.java类。

整合到一起

保证classpath中有RabbitMQ的java客户端库,然后你就可以编译这些类:

$ javac -cp rabbitmq-client.jar Send.java Recv.java

要运行它们,你需要在classpath中添加rabbitmq-client.jar 及其依赖。然后在一个终端上,运行sender:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send

接着,运行Recv:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv

在Windows上,使用分号而非引号来间隔classpath。

接收端将打印它通过RabbitMQ从发送端收到的消息。接收端会保持运行,等待消息(使用Ctrl-C来停止它),因此最好在其它的终端运行sender。

如果你想确认队列,可以使用rabbitmqctl list_queues。

hello

是时候前进到part2,建立一个简单的工作队列(work queue)了。

小提示

为了节省输入,可以如下例为classpath设置环境变量。

 $ export CP=.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
 $ java -cp $CP Send

或者在Windows上:

> set CP=.;commons-io-1.2.jar;commons-cli-1.1.jar;rabbitmq-client.jar
> java -cp %CP% Send
「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
0 条回复 A 作者 M 管理员
    所有的伟大,都源于一个勇敢的开始!
欢迎您,新朋友,感谢参与互动!欢迎您 {{author}},您在本站有{{commentsCount}}条评论