Monday, November 12, 2012

Getting Started Message Broker with C#

During my internship at WSO2 I got a chance to contribute to WSO2 Message Broker (MB) by implementing a C# client for WSO2 MB.

WSO2 Message Broker didn’t have any samples in order to connect to WSO2 MB from a C# client. Since WSO2 MB gets the core support from Apache Qpid, it should be possible to connect to WSO2 MB from a C# or .Net client because C# client can connect to Apache Qpid. Apache Qpid is a cross-platform Enterprise Messaging system which implements the Advanced Message Queuing Protocol (AMQP), providing message brokers written in C++ and Java, along with clients for C++, Java JMS, .Net, Python, and Ruby. Even though there is a possibility to get the thing done, no one has tried out it with WSO2 MB.

The rest of this article will describe how to send a message to a queue in WSO2 MB using a C# client and consume that message using a C# client.

WSO2 Message Broker supports JMS, WS-Eventing, Amazon SQS and AMQP. By using AMQP we can create a .Net program to send a message to a queue in MB and retrieve that message from a .Net client or a Java client. In the rest of this tutorial I'm going to explain how to implement a .Net client to send messages to a queue and retrieve those messages using a .Net client and a Java client. When you run these programs, first run the Consumer program and then the Producer program.

Exchanges Queues and bindings in AMQP


Like any other messaging protocol AMQP also uses producers and consumers when sending and receiving messages. The responsibility of the producer is to produce the message and send it to the destination. The consumer consumes that message and processes it. It is the responsibility of the message broker to deliver the message which is produced by the producer, to the consumer.

AMQP uses two components in order to complete this task. They are exchanges and queues. Producer produces messages and delivers to the exchange. Consumer consumes messages from the queue. We use bindings to connect these two components each other. Publisher and the consumer are recognized each other via the exchange name. Usually either the publisher or the consumer create the exchange and make the name of the exchange public. The Queue should be attached to the exchange after declaring the queue. Then broker has to match the messages received by the exchange to the queue. That is done using bindings. Typically queue binding is happening at the client's side. The following diagram explains what I described above.


In order to run this code sample, you need to download and add RabbitMQ.Client.dll file as a reference in your .net project. You can download that dll file from this website http://www.rabbitmq.com/dotnet.html

.Net Publisher:

  
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;------------------------------------------------------------------------1
namespace RabbitMQ
{
    class Publisher
    {
        static void Main(string[] args)
        {
            Publisher p=new Publisher();
                p.PublishMessage("This is a Test "+i);
                Console.WriteLine("Sent Message "+i);
                Console.ReadLine();
        }
             public void PublishMessage(string message)
        {
//Setup the connection with the message broker
            ConnectionFactory factory = new ConnectionFactory();-------------------------------2
            IProtocol protocol = Protocols.AMQP_0_8_QPID;
            factory.VirtualHost = "/carbon";
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.HostName = "localhost";
            factory.Port = 5672;
            factory.Protocol = protocol;
                using (IConnection conn = factory.CreateConnection())
            {
                using (IModel ch = conn.CreateModel())
                {
                 //Declare the exchange for the publisher.Here the exchange type is direct.
ch.ExchangeDeclare("amq.direct", "direct");--------------------------------------3
                 //Publish the message
        ch.BasicPublish("amq.direct", "test-queue", null, Encoding.UTF8.GetBytes(message));--------4

                }
            }
        }
    }

import the RabbitMQ.Client.dll file to the program.
Create a connection to the MB. Here I have specified the VirtualHost as “carbon”. If it is not specified it will go the default VirtualHost, “test”.
Declare the exchange type for the publisher. The queue is interested in messages from this exchange. Here the exchange type is direct. That means a message will go to the queues only if the binding key exactly matches the routing key of the message.
Publish the message. Here we need to specify the exchange type and the routing key. In this example the routing key is similar to the queue name which is “test-queue”. The next parameter is IbasicProperties. I kept it as null for this scenario. Last parameter is the message body.

QueueConsumer:


This consumer program fetches messages from the queue and display them in the console. In order to run this code sample, you need to download and add RabbitMQ.Client.dll file as a reference in your .net project as mentioned above.

 using System;

using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;------------------------------------------5

namespace QueueConsumer

{
    class QueueConsumer
    {
        static void Main(string[] args)
        {
            QueueConsumer qConsumer = new QueueConsumer();
            qConsumer.getMessage();

        }
        public void getMessage()
        {
    //Setup the connection with the message broker

            ConnectionFactory factory = new ConnectionFactory();-----------6
            IProtocol protocol = Protocols.AMQP_0_8_QPID;
            factory.VirtualHost = "/carbon";
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.HostName = "localhost";
            factory.Port = 5672;
            factory.Protocol = protocol;
            using (IConnection conn = factory.CreateConnection())
            {
                using (IModel ch = conn.CreateModel())

                {

//Declare a queue to retrieve messages.

                    ch.QueueDeclare("test-queue", true, false, false, null);----7       

 //Create the binding between queue and the exchance

                    ch.QueueBind("test-queue", "amq.direct", "test-queue");-----8
                    QueueingBasicConsumer consumer = new
                                     QueueingBasicConsumer(ch);-----9
                    ch.BasicConsume("test-queue",false,consumer);-------10
               
                    while (true)-------------------------------------11
                    {
                        try
                        {
                            RabbitMQ.Client.Events.BasicDeliverEventArgs e =
                            (RabbitMQ.Client.Events.BasicDeliverEventArgs)
                                     consumer.Queue.Dequeue();
                            byte[] body = e.Body;
                            string message = Encoding.UTF8.GetString(body);
                            Console.WriteLine(message);
                            ch.BasicAck(e.DeliveryTag,false);
                        }
                        catch (OperationCanceledException e)
                        {
                            Console.WriteLine(e);
                            break;
                        }
                    }
                }
            }
        }   
    }
}
1. Import the dll file as mentioned above.

2. Create a connection with the message broker.

3. Declare a queue to receive messages. The first parameter is the queue name. The second parameter is durable. I kept it as true. The newt two parameters are exclusive and auto-delete. I kept it as false. The last parameter is IDictionary arguments. It is null here.

4. HereI create the binding for the queue and the exchange. Simply a binding is a relationship between an exchange and a queue. The first parameter is the queue that we want to bind. The next parameter is the exchange that we are binding with the queue and the last parameter is the routing key. Here the routing key is similar to the queue name.

5. Create a consumer to consume the messages from the queue.
6. Gets the message from the queue using the consumer created in the previous step. The first parameter is the name of the queue. I gave the name as “test-queue”. The next parameter is noAck. I kept it as false. For the next parameter I’m passing the consumer object which I created in the previous step.

7. In this infinite while loop, we listen for messages, decoding each one, and printing its body in the console.

The message which I published by the C# client can be consumed by an ordinary Java client also.

13 comments:

  1. This comment has been removed by the author.

    ReplyDelete
  2. I am using the following
    Windows 7 Pro, VS 2012 Premium
    wso2esb-4.6.0 and the server started up successfully

    ReplyDelete
  3. Hi sgadre,

    Here I used WSO2 Message Broker 1.0.2. Why did you started the ESB. That is not clear to me. Can you explain it further since your scenario is not clear.

    Thanks.

    ReplyDelete
    Replies
    1. new to wso2 - i thought the broker was wrapped up in the esb - my fault. i downloaded the MB and it worked fine. thanks for looking at this

      Delete
    2. Do you have an example with Exhanges

      Delete
    3. Hi,

      Here the publisher class is an example for exchanges.

      Thanks.

      Delete
  4. Found some strange behavior using this App. In a given Session I can only create One Exchange and associate one queue to it. Creating another exchange and binding another Queue causes an exception. Using the same exchange and associating a new queue works (no errors) but messages dropped on that queue are not picked up. Let me know if you know any Docs that shed light (beyond Rabbits docs which are limited) on the server and client actually behave.

    ReplyDelete
    Replies
    1. Hi,

      You can have multiple queues with a single channel. but if you use multiple exchanges then you should have one channel for each exchange since a single channel can only send one or receive one message at once.

      Thanks.

      Delete
  5. Thanks for the example - I worked it into some code I was doing as a message abstraction layer and it worked fine. The problem for me is I need a pub/sub situation and when I try to implement it that way it bombs out on me. Specifically, the consumer code when I try to do a channel.QueueBind(queueName, topic, "") I get this error:

    The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=504, text="org.wso2.andes.AMQSecurityException: Permission denied: binding [error code 403: access refused]", classId=50, methodId=20, cause=

    In my topic listener code I do the following:

    channel.ExchangeDeclare(topic, "fanout");
    string queueName = channel.QueueDeclare();
    channel.QueueBind(queueName, topic, "");

    I've searched the internet for solutions and I've seen where a couple of other people have come across the same error but there have not been any solutions. Disclaimer: While I've been programming for quite a while this stuff - WSO2 & AMQP - is new to me.

    Thanks in advance for any feedback.

    ReplyDelete
  6. Hi,

    In WSO2 Message Broker the exchanges will not be created dynamically for any name given.Therefore to get the sample work for Topics, we need to use "amq.topic", the pre-defined default topic exchange name of WSO2 MB in there, or if there is new topic exchange needs to be created it needs to be pre-declared in qpid-virtualhosts.xml file.

    You can refer following post to get this works with Topics.
    http://isharapremadasa.blogspot.com/2013/08/using-wso2-message-broker-with-rabbitmq.html

    ReplyDelete
  7. Great Information sharing .. I am very happy to read this article .. thanks for giving us go through info.Fantastic nice. I appreciate this post. south african trading account

    ReplyDelete
  8. Eyal Nachum is a fintech guru and a director at Bruc Bond. Eyal is the architect of the software that SMEs use to do cross-border payments.

    ReplyDelete
  9. The cross-border payment service (CIPS) is a payment system which offers clearing and settlement services for its participants in cross-border RMB payments and trade. It is a significant financial market infrastructure in China. As planned, CIPS will be developed in two phases.

    ReplyDelete