During my internship at WSO2 I got a chance to contribute to WSO2 Message Broker (MB) by implementing a C# client for WSO2 MB.
WSO2 Message Broker didn’t have any samples in order to connect to WSO2 MB from a C# client. Since WSO2 MB gets the core support from Apache Qpid, it should be possible to connect to WSO2 MB from a C# or .Net client because C# client can connect to Apache Qpid. Apache Qpid is a cross-platform Enterprise Messaging system which implements the Advanced Message Queuing Protocol (AMQP), providing message brokers written in C++ and Java, along with clients for C++, Java JMS, .Net, Python, and Ruby. Even though there is a possibility to get the thing done, no one has tried out it with WSO2 MB.
The rest of this article will describe how to send a message to a queue in WSO2 MB using a C# client and consume that message using a C# client.
WSO2 Message Broker supports JMS, WS-Eventing, Amazon SQS and AMQP. By using AMQP we can create a .Net program to send a message to a queue in MB and retrieve that message from a .Net client or a Java client. In the rest of this tutorial I'm going to explain how to implement a .Net client to send messages to a queue and retrieve those messages using a .Net client and a Java client. When you run these programs, first run the Consumer program and then the Producer program.
Exchanges Queues and bindings in AMQP
Like any other messaging protocol AMQP also uses producers and consumers when sending and receiving messages. The responsibility of the producer is to produce the message and send it to the destination. The consumer consumes that message and processes it. It is the responsibility of the message broker to deliver the message which is produced by the producer, to the consumer.
AMQP uses two components in order to complete this task. They are exchanges and queues. Producer produces messages and delivers to the exchange. Consumer consumes messages from the queue. We use bindings to connect these two components each other. Publisher and the consumer are recognized each other via the exchange name. Usually either the publisher or the consumer create the exchange and make the name of the exchange public. The Queue should be attached to the exchange after declaring the queue. Then broker has to match the messages received by the exchange to the queue. That is done using bindings. Typically queue binding is happening at the client's side. The following diagram explains what I described above.
In order to run this code sample, you need to download and add RabbitMQ.Client.dll file as a reference in your .net project. You can download that dll file from this website
http://www.rabbitmq.com/dotnet.html
.Net Publisher:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;------------------------------------------------------------------------1
namespace RabbitMQ
{
class Publisher
{
static void Main(string[] args)
{
Publisher p=new Publisher();
p.PublishMessage("This is a Test "+i);
Console.WriteLine("Sent Message "+i);
Console.ReadLine();
}
public void PublishMessage(string message)
{
//Setup the connection with the message broker
ConnectionFactory factory = new ConnectionFactory();-------------------------------2
IProtocol protocol = Protocols.AMQP_0_8_QPID;
factory.VirtualHost = "/carbon";
factory.UserName = "admin";
factory.Password = "admin";
factory.HostName = "localhost";
factory.Port = 5672;
factory.Protocol = protocol;
using (IConnection conn = factory.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
//Declare the exchange for the publisher.Here the exchange type is direct.
ch.ExchangeDeclare("amq.direct", "direct");--------------------------------------3
//Publish the message
ch.BasicPublish("amq.direct", "test-queue", null, Encoding.UTF8.GetBytes(message));--------4
}
}
}
}
import the RabbitMQ.Client.dll file to the program.
Create a connection to the MB. Here I have specified the VirtualHost as “carbon”. If it is not specified it will go the default VirtualHost, “test”.
Declare the exchange type for the publisher. The queue is interested in messages from this exchange. Here the exchange type is direct. That means a message will go to the queues only if the binding key exactly matches the routing key of the message.
Publish the message. Here we need to specify the exchange type and the routing key. In this example the routing key is similar to the queue name which is “test-queue”. The next parameter is IbasicProperties. I kept it as null for this scenario. Last parameter is the message body.
QueueConsumer:
This consumer program fetches messages from the queue and display them in the console. In order to run this code sample, you need to download and add RabbitMQ.Client.dll file as a reference in your .net project as mentioned above.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;------------------------------------------5
namespace QueueConsumer
{
class QueueConsumer
{
static void Main(string[] args)
{
QueueConsumer qConsumer = new QueueConsumer();
qConsumer.getMessage();
}
public void getMessage()
{
//Setup the connection with the message broker
ConnectionFactory factory = new ConnectionFactory();-----------6
IProtocol protocol = Protocols.AMQP_0_8_QPID;
factory.VirtualHost = "/carbon";
factory.UserName = "admin";
factory.Password = "admin";
factory.HostName = "localhost";
factory.Port = 5672;
factory.Protocol = protocol;
using (IConnection conn = factory.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
//Declare a queue to retrieve messages.
ch.QueueDeclare("test-queue", true, false, false, null);----7
//Create the binding between queue and the exchance
ch.QueueBind("test-queue", "amq.direct", "test-queue");-----8
QueueingBasicConsumer consumer = new
QueueingBasicConsumer(ch);-----9
ch.BasicConsume("test-queue",false,consumer);-------10
while (true)-------------------------------------11
{
try
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e =
(RabbitMQ.Client.Events.BasicDeliverEventArgs)
consumer.Queue.Dequeue();
byte[] body = e.Body;
string message = Encoding.UTF8.GetString(body);
Console.WriteLine(message);
ch.BasicAck(e.DeliveryTag,false);
}
catch (OperationCanceledException e)
{
Console.WriteLine(e);
break;
}
}
}
}
}
}
}
1. Import the dll file as mentioned above.
2. Create a connection with the message broker.
3. Declare a queue to receive messages. The first parameter is the queue name. The second parameter is durable. I kept it as true. The newt two parameters are exclusive and auto-delete. I kept it as false. The last parameter is IDictionary arguments. It is null here.
4. HereI create the binding for the queue and the exchange. Simply a binding is a relationship between an exchange and a queue. The first parameter is the queue that we want to bind. The next parameter is the exchange that we are binding with the queue and the last parameter is the routing key. Here the routing key is similar to the queue name.
5. Create a consumer to consume the messages from the queue.
6. Gets the message from the queue using the consumer created in the previous step. The first parameter is the name of the queue. I gave the name as “test-queue”. The next parameter is noAck. I kept it as false. For the next parameter I’m passing the consumer object which I created in the previous step.
7. In this infinite while loop, we listen for messages, decoding each one, and printing its body in the console.
The message which I published by the C# client can be consumed by an ordinary Java client also.