OneComputeWorker\Worker.cs
using System;
using System.Collections.Generic;
using System.Composition;
using System.Threading;
using System.Threading.Tasks;
using DNV.One.Compute.Core.FlowModel;
using DNV.One.Compute.Core.Scheduling;
using DNV.One.Compute.Core.Worker;
using OneComputeCommon;
namespace OneComputeWorker
{
/// <summary>
/// A custom worker.
/// </summary>
/// <seealso cref="DNV.One.Compute.Core.Worker.IWorker" />
/// <seealso cref="DNV.One.Compute.Core.Worker.ISupportCancellation" />
/// <seealso cref="DNV.One.Compute.Core.Worker.ISupportProgress" />
[Export(typeof(IWorker))]
public class Worker : IWorker, ISupportCancellation, ISupportProgress
{
/// <summary>
/// Gets or sets the cancellation token.
/// </summary>
private CancellationToken CancellationToken { get; set; }
/// <summary>
/// Gets the calculator.
/// </summary>
private Calculator Calculator { get; } = new Calculator();
/// <summary>
/// Gets or sets the status notification service.
/// </summary>
private IWorkerExecutionStatusNotificationService StatusNotificationService { get; set; }
/// <summary>
/// Executes the task work unit.
/// </summary>
/// <param name="statusNotificationService">The status notification service.</param>
/// <param name="workUnit">The work unit.</param>
/// <param name="dependencyResults">The dependency results.</param>
/// <returns>
/// A <see cref="MyResult"/> instance.
/// </returns>
/// <exception cref="System.ArgumentNullException">statusNotificationService</exception>
/// <exception cref="System.Exception">Work unit data not found.</exception>
public async Task<object> ExecuteAsync(IWorkerExecutionStatusNotificationService statusNotificationService,
IWorkUnit workUnit, IEnumerable<Result> dependencyResults)
{
StatusNotificationService = statusNotificationService ??
throw new ArgumentNullException(nameof(statusNotificationService));
var myInput = workUnit.GetInput<MyInput>();
if (myInput == null)
{
throw new Exception("Work unit data not found.");
}
var numberToSquare = myInput.NumberToSquare;
UpdateStatus(0.0, $"Calculation of square of '{numberToSquare}' is started.");
var result = await SquareAsync(numberToSquare);
if (result.Cancelled)
{
UpdateStatus(0.0, $"Calculation of square of '{numberToSquare}' was cancelled.", WorkStatus.Aborted);
}
else
{
UpdateStatus(1.0, $"Calculation of square of '{numberToSquare}' was completed. Result is '{result.Result}'.", WorkStatus.Completed);
}
return result.Result;
}
/// <summary>
/// Calculate the square of the specified number.
/// </summary>
/// <param name="numberToSquare">The number to square.</param>
/// <returns>
/// </returns>
private async Task<(MyResult Result, bool Cancelled)> SquareAsync(int numberToSquare)
{
const int numberOfIterations = 10;
MyResult myResult = null;
var cancelled = false;
// Simulate a cancellable operation by artificially breaking it in to steps.
// Obviously this is not necessary just to calculate the square of a number.
for (var i = 1; !cancelled && i <= numberOfIterations; i++)
{
// Simulate slow-running operation.
await Task.Delay(TimeSpan.FromMilliseconds(200));
// Assume result can only be calculated on last iteration.
if (i == numberOfIterations)
{
var result = Calculator.Square(numberToSquare);
myResult = new MyResult(result);
}
var progress = (double)i / numberOfIterations;
UpdateStatus(progress, $"Calculation of square of '{numberToSquare}' is '{progress:P0}' complete.");
cancelled = CancellationToken.IsCancellationRequested;
}
return (myResult, cancelled);
}
/// <summary>
/// Provides status updates to the <see cref="IWorkerExecutionStatusNotificationService"/>.
/// </summary>
/// <param name="progress">The progress.</param>
/// <param name="message">The message.</param>
/// <param name="workStatus">The work status.</param>
private void UpdateStatus(double progress, string message, WorkStatus workStatus = WorkStatus.Executing)
{
StatusNotificationService.AddWorkItemStatus(workStatus, progress, message);
}
/// <summary>
/// Implement ISupportCancellation
/// Accepts the cancellation token from the caller.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
public void SetCancellationToken(CancellationToken cancellationToken)
{
CancellationToken = cancellationToken;
}
}
}