How to create an Application Worker
Create Worker Project
First create a new C# project. From Visual Studio, create it from the .NET Core Class Library template. Then edit the .csproj as follows:
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net462</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="DNV.One.Compute.WorkerHost.AzureBatch" Version="6.4.0" />
</ItemGroup>
</Project>
A comment is in order here. At runtime, Azure Batch will start the WorkerHost executable, an .exe residing in the DNV.One.Compute.WorkerHost.AzureBatch package referenced from the Worker project. The OneCompute WorkerHost then looks for classes that implement and export the IWorker interface using System.Composition, retrieves the WorkUnit from storage and invokes the IWorker.ExecuteAsync method to process the WorkUnit.
To target .NET, set the target framework to either net6.0 (.NET 6.0) or net7.0 (.NET 7.0).*
In the following code examples, we will use the example scenario and the data types defined in the Create a job section.
Data Flow
See the Understanding the Flow of Application Data between Client and Worker section for background.
Before showing how to build the Application Worker and the client application, let us review the data types that will carry input from the client application to the Application Worker and results back to the client application.
First, let us review the input to the worker. This input must be created by the client application and assigned to a WorkUnit:
/// <summary>
/// Example type containing input to the application worker.
/// </summary>
public class CalcInput
{
/// <summary>
/// Gets or sets the worker input.
/// </summary>
/// <value>
/// The worker input.
/// </value>
public double Input { get; set; }
}
Next, the worker will square the Input and return its square in the following type:
/// <summary>
/// Example type containing output from the application worker.
/// </summary>
public class CalcOutput
{
/// <summary>
/// Gets or sets the output from the application worker.
/// </summary>
/// <value>
/// The output.
/// </value>
public double Output { get; set; }
}
Finally, in the reduction step, the worker will calculate and return the sum of squares:
/// <summary>
/// Example type used to carry the result from a reduction step.
/// </summary>
public class CalcAggregatedResult
{
/// <summary>
/// Gets or sets the aggregated result value.
/// </summary>
/// <value>
/// The aggregated result value.
/// </value>
public double ResultValue { get; set; }
}
Add a Worker Class
Add a class for the application worker that inherits from DNV.One.Compute.Core.Worker.WorkerBase<TIn,TOut>. Make sure to add an System.Composition.AttributedModel.Export(typeof(IWorker)) class attribute. This attribute is used by the OneCompute WorkerHost to load the application worker.
using System.Composition;
using DNV.One.Compute.Core.FlowModel;
using DNV.One.Compute.Core.Worker;
[Export(typeof(IWorker))]
public class CalcWorker : WorkerBase<CalcInput,CalcOutput>, ISupportReduction
{
/// <summary>
/// Initializes a new instance of the <see cref="CalcWorker"/> class.
/// </summary>
/// <remarks>
/// Use the [ImportingConstructor] if you want to apply constructor injection of types.
/// This can be used to import application types that are exported using the [Export] attribute.
/// </remarks>
[ImportingConstructor]
public CalcWorker()
{
}
The ISupportReduction interface is only needed if the application worker needs to support reduction.
The above code example uses the generic WorkerBase<TIn,TOut> base class, which does not expose the WorkUnit directly. If there is a need to access the WorkUnit, e.g. retrieving properties from it, you may either inherit from the non-generic WorkerBase base class or implement IWorker directly.
Process the WorkUnit
The processing of a WorkUnit takes place in the ExecuteAsync method of the worker. Here is an example implementation that implements the WorkerBase<CalcInput,CalcOutput>.ProcessAsync method:
protected override async Task<CalcOutput> ProcessAsync(
IWorkerExecutionStatusNotificationService workerExecutionStatusNotificationService,
CalcInput calcInput,
IEnumerable<Result> dependencyResults)
{
var input = calcInput.Input;
return new CalcOutput { Output = input * input };
}
Optional: Support Reduction
If the workflow contains a reduction step, the worker must implement ISupportReduction. Here is an example implementation:
public async Task<object> ReduceAsync(
IWorkerExecutionStatusNotificationService workerExecutionStatusNotificationService,
IWorkUnit workUnit,
IEnumerable<IResult> dependencyResults)
{
var sumOfSquares = dependencyResults
.Select(r => r.GetResult<CalcOutput>().Output)
.Sum();
return new CalcAggregatedResult { ResultValue = sumOfSquares };
}
See also
Concepts:
Types: