Abel Muiño

home

Jugando con RabbitMQ en Mac OSX

28 Jun 2011

NOTA: Actualizado para usar la nueva versión del API de AMQP (evitando usar la clase MQ). Gracias a @michaelklishin por el feedback

Tengo el «capricho» de crear una estructura escalable para procesar tweets.

Básicamente hay dos cajas:

Para que el cliente de tweets pueda leer rápido, tengo que desacoplar los dos sistemas. Pero me gustarí­a que el procesador de tweets recibiese trabajo tan rápido como sea posible (en cuanto haya un tweet, nada de tareas periódicas). Vamos, lo que viene siendo un pub/sub de toda la vida.

Y aquí­ es dónde RabbitMQ (o, más concretamente, el protocolo AMQP) entra en juego, ya que es capaz de pasar los mensajes del publicador al consumidor al momento y el soporte para clientes Ruby es, según todas las referencias que he visto, eggscelente.

A mancharse las manos…

El servidor

Instalamos RabbitMQ usando port

sudo port install rabbitmq-server

Dejamos que se cocine un rato y cuando nos devuelva el control:

sudo rabbitmq-server

Activating RabbitMQ plugins ...
0 plugins activated:


+---+   +---+
|   |   |   |
|   |   |   |
|   |   |   |
|   +---+   +-------+
|                   |
| RabbitMQ  +---+   |
|           |   |   |
|   v2.5.1  +---+   |
|                   |
+-------------------+
AMQP 0-9-1 / 0-9 / 0-8
...
broker running

¡Sí­! Great Success!! :-)

El consumidor

Vamos a dejarlo muy muy simple.

Creamos una gema:

bundle gem countwer

Añadimos la dependencia en AMQP a countwer.gemspec para poder comunicarnos con RabbitMQ

s.add_runtime_dependency('amqp', ">= 0.7.1")

Y creamos nuestro cliente en lib/countwer.rb. Dejémoslo simple…

require "countwer/version"
require "amqp"

AMQP.start(:host => 'localhost') do |connection|
  AMQP::Channel.new(connection).queue('tweets').subscribe do |h,m|
    puts "tweet: #{m}"
  end
end

El código está conectando con el servidor en localhost y nos suscribimos a la cola tweets. Cada vez que se enví­e un mensaje a la cola se ejecutará el bloque e imprimiremos el mensaje.

Antes de seguir, no olvidar el bundle install

El productor

El productor es nuestro lector de tweets. Es una gema creada con bundle gem que depende de twitter-stream para leer los tweets.

Ojo!: No me meto en los detalles de twitter-stream, pero el ejemplo es más que suficiente.

La gema se prepara igual que el cliente, pero con este código:

EventMachine::run {
  connection = AMQP.connect(:host => "localhost")
  queue = AMQP::Channel.new(connection).queue("tweets")
  ...
  stream.each_item do |item|
    queue.publish(item)
  end
  ...
}

El código es distinto porque twitter-stream ya se ejecuta dentro de EventMachine (algo que amqp necesita), así­ que no hay que crear el bloque externo como en el cliente.

Sin embargo, el código hace casi lo mismo… conecta con el servidor local, obtiene la referencia a la cola tweets y publica en ella cada uno de los tweets que recibe.

Antes de seguir, no olvidar el bundle install

Probando, probando

Tenemos el servidor ya levantado, levantamos un productor.

bundle exec ruby lib/producer.rb

De momento no hay nadie leyendo los mensajes. Puedes ver como se acumulan con:

sudo rabbitmqctl list_queues
Listing queues ...
tweets	65
...done.

Ahora lanzamos un cliente:

bundle exec ruby lib/countwer.rb

Y empezará a escupir los tweets en formato JSON

Conclusión

Me ha encantado. No sólo resuelve el problema si no que lo hace de forma muy sencilla (tanto en instalación como en API).

Bola Extra: Para no perder el norte… el objetivo era separar el lector de tweets de su procesador porque este hací­a lógica pesada y si estuviesen acoplados el ritmo de lectura bajarí­a.

Es decir… lo único que hemos conseguido es acumular tweets en la cola de RabbitMQ porque el consumidor es más lento que el productor.

¿O no?

¡Pues no! (¡¡ta-chán!!) Sin cambiar una lí­nea de código podemos lanzar varios procesadores que recibirán los mensajes de la cola. La única pega es que parece que RabbitMQ reparte la carga mediante round-robin así­ que si el tiempo de procesado no es más o menos homogéneo, me temo que seguiremos teniendo problemas.