Using Message Queues for Message-based Communication between Workers
Messages and Message Queues
When creating a OneCompute Job, the client may specify a set of message queues that will be dedicated to the job, enabling workers to communicate by sending and receiving messages. Any message queue used for communication must be declared on the job by listing it in Job.MessageQueues.
Notice that a message queue is a 1-1 communication mechanism. This means that a message sent on a message queue will never be read by more than one recipient..
Message queues declared on the job will be managed (created and deleted) by OneCompute. The queues will be created by the OneCompute Job Scheduler when the job is scheduled and deleted after the job has terminated. Any attempt to communicate on any other queue will throw an exception.
What is a message?
Any instance of a type that is serializable to JSON can be used as a message. The only concern is the serialized size of the instance. When running in Azure, the Azure Queue Storage has a size limit of 64 KB, see Azure Queue Storage.
How to send messages on a message queue
To send a message on a message queue, the IMessageSender.SendMessageAsync method should be used. A code example on how to do it is shown in the Worker section below.
How to receive messages on a message queue
To receive a message on a message queue, the IMessageReceiver.GetMessageAsync method should be used. If there are no messages on the queue, this method will return default (null). A code example on how to do it is shown in the Worker section below.
The Worker
The following code example shows the Worker constructor for a worker that needs to do message-based communication on message queues dedicated to the job.
[ImportingConstructor]
public GameWorker(IMessageSender messageSender, IMessageReceiver messageReceiver)
{
this.messageSender = messageSender;
this.messageReceiver = messageReceiver;
}
In this example both the sender and receiver interfaces are injected. The following code example shows how to send messages:
var resultMessage = new MyResultMessage(result);
await this.messageSender.SendMessageAsync("result-queue", resultMessage);
For the worker receiving this message, the code would look like this:
while (true)
{
var resultMessage = await this.messageReceiver.GetMessageAsync<MyResultMessage>("result-queue");
if (resultMessage == null)
{
await Task.Delay(TimeSpan.FromSeconds(1));
continue;
}
// Process the result
...
See also
Concepts:
Types: