RabbitMQ を OSX + Homebrew + Java で使ってみる

RabbitMQ というメッセージ指向ミドルウェアが便利らしい、という話を聞いて使ってみました。メッセージ指向ミドルウェアそのものの説明は他におまかせして、公式の Tutorial を試して便利に思った機能について書いてみます。

準備

RabbitMQ は使う前にマシンへのインストールが必要です。今回は OSX で試しました。インストールには Homebrew を使いました。

$ brew install rabbitmq

RabbitMQ のサーバは rabbitmq-server コマンドで実行します。

$ rabbitmq-server

Hello World

まずは単純にキューにメッセージを入れて取り出すだけのプログラムです。

キューにメッセージを入れるプログラムです。RabbitMQ (AMQP?) の用語でプロデューサというようです。

package test.rabbitmq.helloworld;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Send {

	public static void main(String[] args) throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.queueDeclare("hello", false, false, false, null);
		String message = "Hello World!";
		channel.basicPublish("", "hello", null, message.getBytes());
		System.out.println(" [x] Sent '" + message + "'");

		channel.close();
		connection.close();
	}
}

やっていることは localhost で動作する RabbitMQ の "hello" という名前のキューに対して "Hello, World!!" という文字列のメッセージを詰めるだけです。

次にキューからメッセージを取り出して表示するプログラムです。こちらはコンシューマというようです。

package test.rabbitmq.helloworld;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

	public static void main(String[] argv) throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.queueDeclare("hello", false, false, false, null);
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume("hello", true, consumer);

		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println(" [x] Received '" + message + "'");
		}
	}
}

こちらも、やっていることは "hello" という名前のキューからメッセージを取り出して String に変換してコンソールに出力しているだけです。

プロジェクトは Maven で作ったのでプログラムの実行には exec-maven-plugin プラグインを使いました。pom.xml を貼っておきます。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>test.rabbitmq.helloworld</groupId>
	<artifactId>test-rabbitmq-helloworld</artifactId>
	<version>1.0-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>test-rabbitmq-helloworld</name>
	<url>http://maven.apache.org</url>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.3.2</version>
				<configuration>
					<source>1.6</source>
					<target>1.6</target>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.codehaus.mojo</groupId>
				<artifactId>exec-maven-plugin</artifactId>
				<version>1.2.1</version>
			</plugin>
		</plugins>
	</build>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>2.7.1</version>
		</dependency>
	</dependencies>
</project>

あとは Recv (コンシューマ) を実行した状態で Send (プロデューサ) を実行するだけです。もちろん RabbitMQ を動作させておくのもお忘れなく。

まずは Recv を実行します。

$ mvn clean compile exec:java -Dexec.mainClass=test.rabbitmq.helloworld.Recv

別のコンソールで Send を実行します。

$ mvn clean compile exec:java -Dexec.mainClass=test.rabbitmq.helloworld.Send

Recv を実行した側のコンソールに "Hello, World!!" が表示されれば成功です。

 ---
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'

Round-robin dispatching

処理を複数のサーバに振り分けて負荷を分散させるのも、自分で実現するとなると大変です。なんと RabbitMQ では先ほどの Recv を 2 つ実行しておくだけでキューに入ったメッセージをラウンドロビンでディスパッチしてくれます。すごい!

Fair dispatch

Round-robin dispatching ではコンシューマへ常に均等にメッセージをディスパッチします。メッセージ毎の処理時間が均等なら問題ないかもしれませんが、なかなかそうはいかないものです。RabbitMQ では、処理の空いたコンシューマに優先的にメッセージをディスパッチして負荷を分散することもできます。

負荷に応じてディスパッチされていることを確認するため、先ほど作ったプロデューサに手を加えました。

package test.rabbitmq.helloworld;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Send {

	public static void main(String[] args) throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.queueDeclare("hello", false, false, false, null);
		String message = getMessage(args);
		channel.basicPublish("", "hello", null, message.getBytes());
		System.out.println(" [x] Sent '" + message + "'");

		channel.close();
		connection.close();
	}
	
	private static String getMessage(String[] args) {
		if (args.length < 1) {
			return "Hello, World!!";
		}
		return args[0];
	}
}

修正後は、実行時の引数でメッセージの内容を変更できるようにしてあります。

続いてコンシューマの Recv にも手を加えます。

package test.rabbitmq.helloworld;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

	public static void main(String[] argv) throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.queueDeclare("hello", false, false, false, null);
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

		channel.basicQos(1);

		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume("hello", false, consumer);

		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println(" [x] Received '" + message + "'");
			doWork(message);
			System.out.println(" [x] Done '" + message + "'");
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		}
	}

	private static void doWork(String task) throws InterruptedException {
		for (char ch : task.toCharArray()) {
			if (ch == '.') {
				Thread.sleep(1000);
			}
		}
	}
}

時間のかかる処理を表現するために、キューから取り出したメッセージに含まれる"." (ドット) 1 つ毎に 1 秒間のスリープをかけるようにしてあります。

先ほどと同様に 2 つ Recv を実行した上で Send を実行します。今回は、まず時間のかかる処理を入れてから時間のかからない処理を複数入れます。

$ mvn clean compile exec:java -Dexec.mainClass=test.rabbitmq.helloworld.Send -Dexec.args="...................."
$ mvn clean compile exec:java -Dexec.mainClass=test.rabbitmq.helloworld.Send -Dexec.args="1"
$ mvn clean compile exec:java -Dexec.mainClass=test.rabbitmq.helloworld.Send -Dexec.args="2"
$ mvn clean compile exec:java -Dexec.mainClass=test.rabbitmq.helloworld.Send -Dexec.args="3"
$ mvn clean compile exec:java -Dexec.mainClass=test.rabbitmq.helloworld.Send -Dexec.args="4"
$ mvn clean compile exec:java -Dexec.mainClass=test.rabbitmq.helloworld.Send -Dexec.args="5"
$ mvn clean compile exec:java -Dexec.mainClass=test.rabbitmq.helloworld.Send -Dexec.args="6"

Recv 側の出力は以下のようになりました。

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received '....................'
 [x] Done '....................'
 [x] Received '6'
 [x] Done '6'
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received '1'
 [x] Done '1'
 [x] Received '2'
 [x] Done '2'
 [x] Received '3'
 [x] Done '3'
 [x] Received '4'
 [x] Done '4'
 [x] Received '5'
 [x] Done '5'

最初にキューに入った時間のかかる処理を片方の Recv が処理している間、他方の Recv が時間のかからない処理を複数こなしていることが分かります。上手く負荷が分散されているみたいです。

プログラムのポイントは Recv の以下のコードです。

		channel.basicQos(1);
...
		channel.basicConsume("hello", false, consumer);
…
		channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

Channel#basicQos() ではコンシューマが処理するメッセージの最大数を設定します。Channel#basicConsume() は第二引数を true から false に変更しています。これは自動でメッセージの確認応答 (ACK) を返すかを示す真偽値です。これを false にするときは、自分で確認応答を伝えないといけません。確認応答は Channel#basicAck() で伝えます。

確認応答を返すまではメッセージの処理中とみなされるようです。コンシューマが処理するメッセージの最大数を制限した上で、自分で処理の完了時に確認応答を返すようにすることでメッセージの処理中に新たなメッセージがディスパッチされないようになるようです。

Message acknowledgment

確認応答を自分で返すようにすると、更に良いことがあります。コンシューマが処理中に何らかの原因で落ちてしまった場合にも、確認応答を返していない限りはメッセージが失われることがありません。

先ほどのプログラムで確認してみます。先ほどと同様に Recv を 2 つ実行した上で Send で時間のかかる処理をキューに入れます。処理中の Recv を Ctrl+C で落としてみます。

Send を実行します。

$ mvn clean compile exec:java -Dexec.mainClass=test.rabbitmq.helloworld.Send -Dexec.args=".........."

メッセージがディスパッチされた Recv を処理中に Ctrl+C で落とします。

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received '..........'
^C%                    

もう一つの Recv を見るとメッセージがディスパッチされ直していることが分かります。

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received '..........'
 [x] Done '..........'

処理に失敗してもメッセージが消失しないのは安心できますね。

Message durability

先ほどの例ではコンシューマが落ちたときもメッセージが消失しないことを確認しました。とはいえ RabbitMQ のサーバ自体が落ちるケースもあります。サーバが落ちている間に処理できないのは仕方ないとしても、既にキューに入ったメッセージが消失してしまうのは困ります

RabbitMQ のサーバが落ちたとき、キューに入ったメッセージが消失しないようにするには永続化の機構を使います。

メッセージを永続化するようにした Send です。

package test.rabbitmq.helloworld;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class Send {

	public static void main(String[] args) throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.queueDeclare("durable", true, false, false, null);
		String message = getMessage(args);
		channel.basicPublish("", "durable", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
		System.out.println(" [x] Sent '" + message + "'");

		channel.close();
		connection.close();
	}
	
	private static String getMessage(String[] args) {
		if (args.length < 1) {
			return "Hello, World!!";
		}
		return args[0];
	}
}

プログラムのポイントは以下です。

	channel.queueDeclare("durable", true, false, false, null);
	...
	channel.basicPublish("", "durable", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

Channel#queueDeclare() の第二引数を false から true に変更しています。この真偽値はキューがメッセージを永続化するか示しています。そして Channel#basicPublish() の第三引数で永続化の方式を指定しています。キューの名前が変わっているのは、既に定義済みのキューを途中から永続化するように変更できない制約があるためです。

Recv はキューの名前を変更した以外に変わっていません。

package test.rabbitmq.helloworld;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

	public static void main(String[] argv) throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.queueDeclare("durable", true, false, false, null);
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

		channel.basicQos(1);

		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume("durable", false, consumer);

		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println(" [x] Received '" + message + "'");
			doWork(message);
			System.out.println(" [x] Done '" + message + "'");
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		}
	}

	private static void doWork(String task) throws InterruptedException {
		for (char ch : task.toCharArray()) {
			if (ch == '.') {
				Thread.sleep(1000);
			}
		}
	}
}

今度は Send でメッセージをキューに入れた上で RabbitMQ のサーバを再起動します。メッセージが永続化されていれば、サーバを落としてもキューにメッセージが残り続けるため、サーバが再起動した後に Recv を実行すればメッセージがディスパッチされるはずです。

Send を実行します。

$ mvn clean compile exec:java -Dexec.mainClass=test.rabbitmq.helloworld.Send

RabbitMQ を再起動します。

broker running
^C
BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded
       (v)ersion (k)ill (D)b-tables (d)istribution
a
...
$ rabbitmq-server

Recv を実行します。

$ mvn clean compile exec:java -Dexec.mainClass=test.rabbitmq.helloworld.Recv

メッセージがディスパッチされました!

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello, World!!'
 [x] Done 'Hello, World!!'

まとめ

システムのコンポーネント間でいかにデータをやり取りするかはいつも悩みの種です。そうした時、可用性や完全性を保ちつつスケーラビリティの高いシステムを構築する上で RabbitMQ (AMQP) は、力強い手助けになるのではないでしょうか。