ども、cloudpack の かっぱ(@inokara)です。
はじめに
RabbitMQ について改めて勉強したくなったので各種ドキュメントを斜め読みして纏めてみました。間違い等あれば修正リクエスト頂ければ幸いです。
RabbitMQ と AMQP
RabbitMQ がサポートしている AMQP のバージョン
RabbitMQ がサポートしている AMQP のバージョンは
- AMQP 0-9-1
- AMQP 0-9
- AMQP 0-9
となります。AMQP 0-9-1 の利用が推奨されています。
RabbitMQ がサポートしているメッセージングプロトコル
以下のようなメッセージングプロトコルをサポートしているようです。
- STOMP(Simple Text Oriented Messaging Protocol)
- MQTT
- AMQP 1.0
- HTTP
尚、各メッセージングプロトコルはプラグインという形で提供されています。HTTP 自体はメッセージングプロトコルではありませんが、RabbitMQ では以下の方法で HTTP を介してメッセージを送信することが出来ます。
- management plugin
- Web-STOMP plugin
- JSON-RPC channel plugin
AMQP エンティティと Exchange
参考
- AMQP 0-9-1 Model Explained
- AMQPによるメッセージング
- Working with RabbitMQ exchanges and publishing messages from Ruby with Bunny
上記を参考に纏めていきたいと思います。
はじめに
AMQP によるメッセージ配信は以下の三つの機能で提供されています。
機能 | 詳細 |
---|---|
Exchange | 受け取ったメッセージを幾つかのパターンに基いて Binding する機能 |
Binding | Exchange から受け取ったメッセージを Queue に定義付けする機能 |
Queue | Binding されたメッセージをキューインしておいてコンシューマ(Subscriber)にメッセージを渡す機能 |
Exchange のパターンについて以下で少し詳しく見ていきたいと思います。図に関してはこちらを参考に理解を含める意味で書いてみました。
Exchange のパターン
Exchange には以下のようなパターンがあります。
Name | Default pre-declared names |
---|---|
Direct exchange | (Empty string) and amq.direct |
Fanout exchange | amq.fanout |
Topic exchange | amq.topic |
Headers exchange | amq.match (and amq.headers in RabbitMQ) |
Direct exchange
- デフォルトの Exchange
- binding key と message に付与されている routing key が一致する message queue に配送する
- Direct exchange はラウンドロビン方式で複数の同一アプリケーションインスタンス間でタスク分散を行う際に利用される
Fanout exchange
- routing key は無視されて bind されている message queue 全てに配送する
- メッセージのブロードキャストルーティングに適している
Topic exchange
- Topic exchange を通ったメッセージは
.
で区切られた Routing Key を持つ - Binding key も同様に
.
で区切られた形式である必要がある - Direct exchange 同様に Routing Key と Binding Key を評価して一致した message queue に配送する
- 但し、
.
で区切られた文字を*
や#
の文字列で置き換えることで部分一致で配送先を指定することが可能
*
と #
は以下のような意味がある。
記号 | 意味 |
---|---|
* | 正確に一つの単語を置き換える |
# | ゼロ個以上の単語を置き換える |
Topic についてはこちらも合わせて読むと良いと思います。
そういえば、昨日くらいから自分の中でホットキーワードになっている MQTT でも Topic がありますよね。MQTT の Topic では 機は .
区切りではなく /
で区切っています。また、*
ではなく #
や +
を利用するようです。詳しくはこちら。
Headers exchange
- メッセージの配送に Routing Key を利用せず、ヘッダのアトリビュートを利用する
- ヘッダに部分一致させるか、完全一致させるかは
x-match
というヘッダの値に依存する(any
又はall
) - 特定のキューにおける Subscriber への配送ルールは Direct exchange と同じ
- Headers exchange に関してはこちらのソースが解りやすい
Bunny ガールの出番です
はじめに
Ruby から RabbitMQ を扱う Bunny という gem を触ってみたいと思います。
インストール
簡単です。
sudo gem install bunny --no-ri --no-rdoc -V
Topic Exchange を使ってみる
こちらを参考にサクッと触ってみます。
Subscriber
pry を起動して以下を貼り付けます。
require "bunny" conn = Bunny.new("amqp://guest:guest@xxx.xxx.xxx.xxx:5672") conn.start ch = conn.create_channel x = ch.topic("topic_logs") q = ch.queue("", :exclusive => true) q.bind(x, :routing_key => "anonymous.*") q.subscribe(:block => true) do |delivery_info, properties, body| puts " [x] #{delivery_info.routing_key}:#{body}" end
但し、Bunny.new("amqp://guest:guest@xxx.xxx.xxx.xxx:5672")
は適宜環境に応じて変更して下さい。デフォルトでは localhost
の 5672
ポートでユーザー名とパスワードは guest
と guest
となります。
貼り付けると RabbitMQ に接続されてメッセージの購読が開始されます。ポイントは q.bind(x, :routing_key => "anonymous.")
で :routing_key
をハッシュで指定しており anonymous.
で指定されているので、以下で指定している anonymous.info
のメッセージはマッチするのでキューに配信されます。
publisher
別の端末で pry を起動して以下を貼り付けます。
require "bunny" conn = Bunny.new("amqp://guest:guest@xxx.xxx.xxx.xxx:5672") conn.start ch = conn.create_channel x = ch.topic("topic_logs") severity = "anonymous.info" msg = "Hello World!" x.publish(msg, :routing_key => severity) conn.close
x.publish(msg, :routing_key => severity)
でメッセージを publish します。ポイントはハッシュ指定の :routing_key
で anonymous.info
を指定してメッセージを publish しています。
デモ
動画とかで見せられると良いのですが…静止画で…
別の端末側では…
おお、ちゃんと Topic Exchange になってますなあ。
最後に
RabbitMQ に関しては実は色々なところで使われているにも関わらず日本語の情報が少ないのがちょっと辛いですが、改めてドキュメントを読んだり、ソースコード(Ruby から RabbitMQ を扱う Bunny)を眺めてみてちょっと理解が深まった気がします。
元記事はこちらです。
「RabbitMQ のドキュメントを拾い読みしたのでメモと Ruby から RabbitMQ を使う Bunny を試してみた」