ども、cloudpackかっぱ@inokara)です。

はじめに

RabbitMQ について改めて勉強したくなったので各種ドキュメントを斜め読みして纏めてみました。間違い等あれば修正リクエスト頂ければ幸いです。


RabbitMQ と AMQP

RabbitMQ がサポートしている AMQP のバージョン

RabbitMQ がサポートしている AMQP のバージョンは

  • AMQP 0-9-1
  • AMQP 0-9
  • AMQP 0-9

となります。AMQP 0-9-1 の利用が推奨されています。

RabbitMQ がサポートしているメッセージングプロトコル

以下のようなメッセージングプロトコルをサポートしているようです。

尚、各メッセージングプロトコルはプラグインという形で提供されています。HTTP 自体はメッセージングプロトコルではありませんが、RabbitMQ では以下の方法で HTTP を介してメッセージを送信することが出来ます。

  • management plugin
  • Web-STOMP plugin
  • JSON-RPC channel plugin

AMQP エンティティと Exchange

参考

上記を参考に纏めていきたいと思います。

はじめに

AMQP によるメッセージ配信は以下の三つの機能で提供されています。

機能 詳細
Exchange 受け取ったメッセージを幾つかのパターンに基いて Binding する機能
Binding Exchange から受け取ったメッセージを Queue に定義付けする機能
Queue Binding されたメッセージをキューインしておいてコンシューマ(Subscriber)にメッセージを渡す機能

Exchange のパターンについて以下で少し詳しく見ていきたいと思います。図に関してはこちらを参考に理解を含める意味で書いてみました。


Exchange のパターン

Exchange には以下のようなパターンがあります。

Name Default pre-declared names
Direct exchange (Empty string) and amq.direct
Fanout exchange amq.fanout
Topic exchange amq.topic
Headers exchange amq.match (and amq.headers in RabbitMQ)
Direct exchange

RabbitMQ AMQP エンティティと Exchange: Direct exchange

  • デフォルトの Exchange
  • binding key と message に付与されている routing key が一致する message queue に配送する
  • Direct exchange はラウンドロビン方式で複数の同一アプリケーションインスタンス間でタスク分散を行う際に利用される
Fanout exchange

RabbitMQ AMQP エンティティと Exchange: Fanout exchange

  • routing key は無視されて bind されている message queue 全てに配送する
  • メッセージのブロードキャストルーティングに適している
Topic exchange

RabbitMQ AMQP エンティティと Exchange: Topic exchange

  • Topic exchange を通ったメッセージは . で区切られた Routing Key を持つ
  • Binding key も同様に . で区切られた形式である必要がある
  • Direct exchange 同様に Routing Key と Binding Key を評価して一致した message queue に配送する
  • 但し、. で区切られた文字を *# の文字列で置き換えることで部分一致で配送先を指定することが可能

*# は以下のような意味がある。

記号 意味
* 正確に一つの単語を置き換える
ゼロ個以上の単語を置き換える

Topic についてはこちらも合わせて読むと良いと思います。

そういえば、昨日くらいから自分の中でホットキーワードになっている MQTT でも Topic がありますよね。MQTT の Topic では 機は . 区切りではなく / で区切っています。また、* ではなく #+ を利用するようです。詳しくはこちら

Headers exchange

RabbitMQ AMQP エンティティと Exchange: Headers exchange

  • メッセージの配送に Routing Key を利用せず、ヘッダのアトリビュートを利用する
  • ヘッダに部分一致させるか、完全一致させるかは x-match というヘッダの値に依存する(any 又は all
  • 特定のキューにおける Subscriber への配送ルールは Direct exchange と同じ
  • Headers exchange に関してはこちらのソースが解りやすい

Bunny ガールの出番です

はじめに

Ruby から RabbitMQ を扱う Bunny という gem を触ってみたいと思います。

インストール

簡単です。

sudo gem install bunny --no-ri --no-rdoc -V

Topic Exchange を使ってみる

こちらを参考にサクッと触ってみます。

Subscriber

pry を起動して以下を貼り付けます。

require "bunny"
conn = Bunny.new("amqp://guest:guest@xxx.xxx.xxx.xxx:5672")
conn.start
ch = conn.create_channel
x = ch.topic("topic_logs")
q = ch.queue("", :exclusive => true)
q.bind(x, :routing_key => "anonymous.*")
q.subscribe(:block => true) do |delivery_info, properties, body|
  puts " [x] #{delivery_info.routing_key}:#{body}"
end

但し、Bunny.new("amqp://guest:guest@xxx.xxx.xxx.xxx:5672") は適宜環境に応じて変更して下さい。デフォルトでは localhost5672 ポートでユーザー名とパスワードは guestguest となります。

貼り付けると RabbitMQ に接続されてメッセージの購読が開始されます。ポイントは q.bind(x, :routing_key => "anonymous."):routing_key をハッシュで指定しており anonymous. で指定されているので、以下で指定している anonymous.info のメッセージはマッチするのでキューに配信されます。

publisher

別の端末で pry を起動して以下を貼り付けます。

require "bunny"
conn = Bunny.new("amqp://guest:guest@xxx.xxx.xxx.xxx:5672")
conn.start
ch = conn.create_channel
x = ch.topic("topic_logs")
severity = "anonymous.info"
msg = "Hello World!"
x.publish(msg, :routing_key => severity)
conn.close

x.publish(msg, :routing_key => severity) でメッセージを publish します。ポイントはハッシュ指定の :routing_keyanonymous.info を指定してメッセージを publish しています。

デモ

動画とかで見せられると良いのですが…静止画で…

Bunny のデモ (1)

別の端末側では…

Bunny のデモ (2)

おお、ちゃんと Topic Exchange になってますなあ。


最後に

RabbitMQ に関しては実は色々なところで使われているにも関わらず日本語の情報が少ないのがちょっと辛いですが、改めてドキュメントを読んだり、ソースコード(Ruby から RabbitMQ を扱う Bunny)を眺めてみてちょっと理解が深まった気がします。

元記事はこちらです。
RabbitMQ のドキュメントを拾い読みしたのでメモと Ruby から RabbitMQ を使う Bunny を試してみた