martes, 24 de diciembre de 2013

Extracción de mensajes de las colas del Azure Service Bus

En post anteriores hemos mostrado como sacar mensajes de un “queue” tanto desde Java como desde .NET. Sin embargo, el proceso de extracción de estos mensajes ha sido mediante un ciclo que constantemente trata de obtener los nuevos mensajes que residen en la cola seleccionada. Esto podría no ser ideal en ciertos escenarios en donde el flujo de mensajes no es constante o por ejemplo donde tenemos muchos mensajes pero queremos tener paralelismo a la hora de consumir los mensajes entrantes. Con el SDK 2.0 y superior Windows de Azure ahora tenemos la posibilidad de tener una forma alterna de consumir estos mensajes mediante notificación del evento de recibo por parte del Bus de Azure ( o Windows Service Bus ).

Extracción de mensajes con el método OnMessage

Este método nos permite asociar un método “callback” a la instancia de la cola creada el cual se va a ejecutar cada vez que un mensaje llega a la cola del bus.

Ejemplo

Para ilustrar el modelo de extracción de mensajes, vamos a trabajar en un ejemplo de envío y recibo de mensajes a través de una cola de mensajes en el bus. Como se puede ver en la siguiente figura, tenemos 6783 mensajes en el bus esperando a ser procesados. Para extraerlos no vamos a usar un ciclo de extracción, sino mas bien el método OnMessage.

image

Como se ilustra en el siguiente código, primero vamos a crear un método para configurar el callback al evento OnMessage. Primero creamos una instancia de la clase MessageOptions donde establecemos si queremos que el mensaje se marque como completo cuando se procesa y cuantas llamadas a la cola concurrentes pueden ser creadas. Luego establecemos el mecanismo para manejar las excepciones y por ultimo configuramos el método onMessage con el callback que viene de parámetro – que va a procesar cada mensaje que llega – y las opciones de recibo.

  public void GetMessagePump(Action<BrokeredMessage> pCallback)
        {
            var _messageOptions = new OnMessageOptions {AutoComplete = true, MaxConcurrentCalls = 5};
            _messageOptions.ExceptionReceived += _messageOptions_ExceptionReceived;
            GetQueueClient().OnMessage( pCallback, _messageOptions);
        }
        void _messageOptions_ExceptionReceived(object sender, ExceptionReceivedEventArgs e)
        {
            if (e != null && e.Exception != null)
            {
                Trace.WriteLine(">> Se presento la siguiente excepcion: {0}",
                    e.Exception.ToString());
            }
        }

Seguidamente procedemos a crear el método que va a manejar el mensaje y que viene de parámetro en el Callback del metodo GetMessagePump. Como se e ve en el código, el método recibe un mensaje del bus como parámetro y luego lo procesa. En este caso hacemos lo que necesitamos hacer con el mensaje, pero específicamente para este ejemplo vamos a loguear el recibo del mensaje en un TextBox de una aplicación Windows Forms – simplemente logueamos una propiedad de la clase BaseRecord la cual es un Guid.

 private void OnMessageArrived(BrokeredMessage pMessage)
        {
            if (pMessage != null)
            {
                txtEstadoProceso.Invoke((MethodInvoker) delegate
                {
BaseRecord _baseRecord = pMessage.GetBody<BaseRecord>();
                    txtEstadoProceso.Text +=
                        string.Format(
                            "Procesando registro {0} a las {1} \n" + Environment.NewLine,
                            _baseRecord.TransactionId.ToString(),
                            DateTime.Now.ToString());
                });

pMessage.Complete();

            }
        }

Por ultimo en el evento del botón obtener mensajes agregamos el siguiente código que activa la recepción de mensajes. En este caso en particular, los métodos anteriores – excepto el de OnMessageArrived – fueron escritos en una clase que encapsula la interacción con el bus de Azure llamada ServiceBusManager.

        private void btnRecibirPorNotificacion_Click(object sender, EventArgs e)
        {
            var _serviceBus = new ServiceBusManager();
            _serviceBus.GetMessagePump(OnMessageArrived);
        }

Si ejecutamos el código anterior podemos ver como se van extrayendo los mensajes vía el método OnMessage a través del Callback OnMessageArrived.


image


Como podemos en la imagen anterior, los registros se procesan de 5 en 5 ( así se configuro en el MessageOptions ) y todos tienen el Id de la transacción igual porque son mensajes correlacionados. Esta misma operación puede ser llevada a cabo en el Windows Service Bus.


Etiquetas de Technorati:

No hay comentarios:

Publicar un comentario