NOTA: Actualizado para usar la nueva versión del API de AMQP (evitando usar la clase
MQ
). Gracias a @michaelklishin por el feedback
Tengo el «capricho» de crear una estructura escalable para procesar tweets.
Básicamente hay dos cajas:
- el cliente de twitter: se conecta a la streaming API y su trabajo es leer tweets tan rápido como pueda.
- el procesador de tweets: hace la lógica pesada, quizá ayudado por una base de datos.
Para que el cliente de tweets pueda leer rápido, tengo que desacoplar los dos sistemas. Pero me gustaría que el procesador de tweets recibiese trabajo tan rápido como sea posible (en cuanto haya un tweet, nada de tareas periódicas). Vamos, lo que viene siendo un pub/sub de toda la vida.
Y aquí es dónde RabbitMQ (o, más concretamente, el protocolo AMQP) entra en juego, ya que es capaz de pasar los mensajes del publicador al consumidor al momento y el soporte para clientes Ruby es, según todas las referencias que he visto, eggscelente.
A mancharse las manos…
El servidor
Instalamos RabbitMQ usando port
sudo port install rabbitmq-server
Dejamos que se cocine un rato y cuando nos devuelva el control:
sudo rabbitmq-server
Activating RabbitMQ plugins ... 0 plugins activated:
+---+ +---+ | | | | | | | | | | | | | +---+ +-------+ | | | RabbitMQ +---+ | | | | | | v2.5.1 +---+ | | | +-------------------+ AMQP 0-9-1 / 0-9 / 0-8 ... broker running
¡Sí! Great Success!! :-)
El consumidor
Vamos a dejarlo muy muy simple.
Creamos una gema:
bundle gem countwer
Añadimos la dependencia en AMQP a countwer.gemspec
para poder comunicarnos con RabbitMQ
s.add_runtime_dependency('amqp', ">= 0.7.1")
Y creamos nuestro cliente en lib/countwer.rb
. Dejémoslo simple…
require "countwer/version" require "amqp"
AMQP.start(:host => 'localhost') do |connection| AMQP::Channel.new(connection).queue('tweets').subscribe do |h,m| puts "tweet: #{m}" end end
El código está conectando con el servidor en localhost
y nos suscribimos a la cola tweets
. Cada vez que se envíe un mensaje a la cola se ejecutará el bloque e imprimiremos el mensaje.
Antes de seguir, no olvidar el bundle install
El productor
El productor es nuestro lector de tweets. Es una gema creada con bundle gem
que depende de twitter-stream para leer los tweets.
Ojo!: No me meto en los detalles de twitter-stream, pero el ejemplo es más que suficiente.
La gema se prepara igual que el cliente, pero con este código:
EventMachine::run {
connection = AMQP.connect(:host => "localhost")
queue = AMQP::Channel.new(connection).queue("tweets")
...
stream.each_item do |item|
queue.publish(item)
end
...
}
El código es distinto porque twitter-stream ya se ejecuta dentro de EventMachine (algo que amqp necesita), así que no hay que crear el bloque externo como en el cliente.
Sin embargo, el código hace casi lo mismo… conecta con el servidor local, obtiene la referencia a la cola tweets
y publica en ella cada uno de los tweets que recibe.
Antes de seguir, no olvidar el bundle install
Probando, probando
Tenemos el servidor ya levantado, levantamos un productor.
bundle exec ruby lib/producer.rb
De momento no hay nadie leyendo los mensajes. Puedes ver como se acumulan con:
sudo rabbitmqctl list_queues
Listing queues ...
tweets 65
...done.
Ahora lanzamos un cliente:
bundle exec ruby lib/countwer.rb
Y empezará a escupir los tweets en formato JSON
Conclusión
Me ha encantado. No sólo resuelve el problema si no que lo hace de forma muy sencilla (tanto en instalación como en API).
Bola Extra: Para no perder el norte… el objetivo era separar el lector de tweets de su procesador porque este hacía lógica pesada y si estuviesen acoplados el ritmo de lectura bajaría.
Es decir… lo único que hemos conseguido es acumular tweets en la cola de RabbitMQ porque el consumidor es más lento que el productor.
¿O no?
¡Pues no! (¡¡ta-chán!!) Sin cambiar una línea de código podemos lanzar varios procesadores que recibirán los mensajes de la cola. La única pega es que parece que RabbitMQ reparte la carga mediante round-robin así que si el tiempo de procesado no es más o menos homogéneo, me temo que seguiremos teniendo problemas.