OneComputeWpfClient\ViewModels\MainViewModel.cs
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Data;
using DNV.One.Compute.Core.EnvironmentVariables;
using DNV.One.Compute.Core.FlowModel;
using DNV.One.Compute.Core.Scheduling;
using DNV.One.Compute.Scheduling.AzureBatch.Credentials;
using DNV.One.Compute.Scheduling.Scheduler;
using DNV.One.Compute.Scheduling.WorkScheduling;
using OneComputeCommon;
using OneComputeWpfClient.Azure;
using OneComputeWpfClient.Services;
using OneComputeWpfClient.Support;
namespace OneComputeWpfClient.ViewModels
{
/// <summary>
/// The view model for the main view.
/// </summary>
/// <seealso cref="NotifyPropertyChangedBase" />
public class MainViewModel : NotifyPropertyChangedBase
{
/// <summary>
/// Gets the title.
/// </summary>
public string Title => "OneCompute WPF Client Application";
/// <summary>
/// Gets the OneCompute services.
/// </summary>
private OneComputeServices OneComputeServices { get; } = new OneComputeServices();
/// <summary>
/// Gets or sets the cancellation token source.
/// </summary>
private CancellationTokenSource CancellationTokenSource { get; set; }
/// <summary>
/// Gets or sets the job.
/// </summary>
private Job Job { get; set; }
/// <summary>
/// Gets or sets the job scheduler.
/// </summary>
private IJobScheduler JobScheduler { get; set; }
/// <summary>
/// Gets or sets the job monitor.
/// </summary>
private IJobMonitor JobMonitor { get; set; }
/// <summary>
/// Indicates if the calculation is executing.
/// </summary>
private bool _isExecuting;
/// <summary>
/// Gets or sets a value indicating whether the calculation is executing.
/// </summary>
public bool IsExecuting
{
get => _isExecuting;
set
{
if (Set(ref _isExecuting, value))
{
RunCommand.RaiseCanExecuteChanged();
CancelCommand.RaiseCanExecuteChanged();
}
}
}
/// <summary>
/// The run command.
/// </summary>
private ParameterlessRelayCommand _runCommand;
/// <summary>
/// Gets the run command.
/// </summary>
public ParameterlessRelayCommand RunCommand =>
_runCommand ?? (_runCommand = new ParameterlessRelayCommand(RunAsync, CanRun));
/// <summary>
/// The cancel command.
/// </summary>
private ParameterlessRelayCommand _cancelCommand;
/// <summary>
/// Gets the cancel command.
/// </summary>
public ParameterlessRelayCommand CancelCommand =>
_cancelCommand ?? (_cancelCommand = new ParameterlessRelayCommand(Cancel, CanCancel));
/// <summary>
/// The log entries.
/// </summary>
private ObservableCollection<LogEntryViewModel> _logEntries;
/// <summary>
/// The synchronization lock for the <see cref="LogEntries"/> observable collection." />.
/// </summary>
private readonly object _logEntriesLock = new object();
/// <summary>
/// Gets the log entries.
/// </summary>
public ObservableCollection<LogEntryViewModel> LogEntries
{
get
{
if (_logEntries == null)
{
_logEntries = new ObservableCollection<LogEntryViewModel>();
BindingOperations.EnableCollectionSynchronization(_logEntries, _logEntriesLock);
}
return _logEntries;
}
}
/// <summary>
/// Execute the work.
/// </summary>
private async void RunAsync()
{
IsExecuting = true;
await Task.Run(async () =>
{
InitializeRun();
CreateJob();
DefineWorkForJob();
DefineApplicationPackagesForJob();
await SubmitJobAsync();
FinalizeRun();
}).ConfigureAwait(true);
IsExecuting = false;
}
/// <summary>
/// Determines whether the job can be run.
/// </summary>
/// <returns>
/// <c>true</c> if the job can be run; otherwise, <c>false</c>.
/// </returns>
private bool CanRun() =>
!IsExecuting;
/// <summary>
/// Cancels the job.
/// </summary>
private void Cancel()
{
JobScheduler.CancelJobAsync(Job.JobId);
CancellationTokenSource.Cancel();
LogMessage("Job cancelled.");
}
/// <summary>
/// Determines whether the job can be cancelled.
/// </summary>
/// <returns>
/// <c>true</c> if the job can be cancelled; otherwise, <c>false</c>.
/// </returns>
private bool CanCancel() =>
IsExecuting && !(CancellationTokenSource?.IsCancellationRequested ?? false);
/// <summary>
/// Initializes the run.
/// </summary>
private void InitializeRun()
{
LogEntries.Clear();
LogMessage("Running...");
CancellationTokenSource = new CancellationTokenSource();
}
/// <summary>
/// Finalizes the run.
/// </summary>
private void FinalizeRun()
{
JobMonitor.JobStatusChanged -= OnJobStatusChanged;
JobMonitor.JobProgressChanged -= OnJobProgressChanged;
JobMonitor.WorkItemStatusChanged -= OnWorkItemStatusChanged;
JobMonitor.WorkItemProgressChanged -= OnWorkItemProgressChanged;
CancellationTokenSource?.Dispose();
CancellationTokenSource = null;
JobScheduler = null;
JobMonitor = null;
Job = null;
}
/// <summary>
/// Creates a new job.
/// </summary>
private void CreateJob()
{
// TODO: Define your user Id, which must be an email address.
const string userId = "first.last@dnvgl.com";
Job = new Job(userId);
}
/// <summary>
/// Creates the job scheduler.
/// </summary>
/// <returns>
/// A new <see cref="IJobScheduler"/> instance.
/// </returns>
private IJobScheduler CreateJobScheduler()
{
// TODO: Define your preferred task monitoring update frequency.
const int taskMonitoringIntervalInSeconds = 5;
var logger = new TraceLogger();
var workScheduler = CreateWorkScheduler();
// Create job scheduler that will co-ordinate the actioning of the job.
var jobScheduler = new JobScheduler(
logger,
workScheduler,
OneComputeServices.WorkItemStatusService,
OneComputeServices.JobStatusService,
OneComputeServices.WorkItemStorageService)
{
TaskMonitoringInterval = taskMonitoringIntervalInSeconds,
};
return jobScheduler;
}
/// <summary>
/// Creates the work scheduler.
/// </summary>
/// <returns>
/// A new <see cref="IWorkScheduler"/> instance. />
/// </returns>
private IWorkScheduler CreateWorkScheduler()
{
var batchCredentials = BatchCredentials.ConfigureBatchAccountAccessKeyCredentials(
AzureBatchConstants.BatchAccountKey,
AzureBatchConstants.BatchAccountName,
AzureBatchConstants.BatchAccountUrl);
var workScheduler = new AzureBatchWorkItemScheduler(AzureBatchConstants.PoolId, batchCredentials, OneComputeServices);
workScheduler.OverrideWorkerStorageConnectionString(AzureStorageConstants.StorageConnectionString);
return workScheduler;
}
/// <summary>
/// Defines Azure Batch application packages to be used for the job.
/// </summary>
private void DefineApplicationPackagesForJob()
{
// TODO: Define your Azure Batch application package Id's.
const string applicationPackageId = "xxxxxx";
var deploymentModel = new DeploymentModel();
deploymentModel.AddApplicationPackage(applicationPackageId);
Job.DeploymentModel = deploymentModel;
}
/// <summary>
/// Defines the work for the job.
/// </summary>
private void DefineWorkForJob()
{
// In this simple example, we specify a set of numbers to be squared.
var numbersToSquare = Enumerable.Range(0, 5).ToList().AsReadOnly();
LogJobInputData(numbersToSquare);
// TODO: Define your job's work to be done.
var workItems = numbersToSquare.Select(CreateWorkUnit).Cast<WorkItem>().ToList();
// TODO: Define your job in terms of parallel and/or sequential work.
// In this simple example, all tasks can be executed in parallel.
var work = new ParallelWork
{
WorkItems = workItems,
};
Job.Work = work;
}
/// <summary>
/// Submits the job.
/// </summary>
/// <returns>
/// A <see cref="Task"/>.
/// </returns>
private async Task SubmitJobAsync()
{
// Create and initialize a scheduler to deploy the job on to the worker host in Azure Batch.
JobScheduler = CreateJobScheduler();
// Submit the job for scheduling.
var jobId = Job?.JobId;
JobMonitor = await JobScheduler.SubmitJobAsync(Job);
LogMessage($"Job '{jobId}' submitted.");
JobMonitor.JobStatusChanged += OnJobStatusChanged;
JobMonitor.JobProgressChanged += OnJobProgressChanged;
JobMonitor.WorkItemStatusChanged += OnWorkItemStatusChanged;
JobMonitor.WorkItemProgressChanged += OnWorkItemProgressChanged;
await JobMonitor.AwaitTerminationAsync(jobId);
}
/// <summary>
/// Logs the job input data.
/// </summary>
/// <param name="numbersToSquare">The numbers to square.</param>
private void LogJobInputData(IEnumerable<int> numbersToSquare)
{
var messageData = numbersToSquare.Aggregate(
new StringBuilder(),
(sb, i) => sb.Append($"'{i}', "),
sb => sb.Remove(sb.Length - 2, 2).ToString());
LogMessage($"Numbers to be squared: {messageData}");
}
/// <summary>
/// Creates the work unit for running on an Azure Batch host.
/// </summary>
/// <param name="numberToSquare">The number to be squared.</param>
/// <returns>The work unit.</returns>
private WorkUnit CreateWorkUnit(int numberToSquare)
{
// TODO: Define your preferred work unit status update frequency.
const int workUnitStatusUpdateFrequencyInSeconds = 5;
var myInput = new MyInput
{
NumberToSquare = numberToSquare,
};
var workUnit = new WorkUnit(myInput)
{
StatusUpdateFrequency = workUnitStatusUpdateFrequencyInSeconds,
Properties = { { EnvironmentVariablesNames.OneComputeStorageConnectionString, AzureStorageConstants.StorageConnectionString } },
};
return workUnit;
}
/// <summary>
/// Called when the job status changes.
/// </summary>
/// <param name="sender">The sender.</param>
/// <param name="e">The <see cref="JobEventArgs"/> instance containing the event data.</param>
/// <exception cref="ArgumentNullException">
/// sender
/// or
/// e
/// </exception>
private void OnJobStatusChanged(object sender, JobEventArgs e)
{
if (sender == null) throw new ArgumentNullException(nameof(sender));
if (e == null) throw new ArgumentNullException(nameof(e));
var jobId = e.JobId;
switch (e.WorkStatus)
{
case WorkStatus.Faulted:
LogMessage($"OnJobStatusChanged: Job '{jobId}' faulted. Details: '{e.Message}'.");
break;
case WorkStatus.Aborted:
LogMessage($"OnJobStatusChanged: Job '{jobId}' aborted.");
break;
case WorkStatus.Completed:
LogMessage($"OnJobStatusChanged: Job '{jobId}' completed.");
LogResults();
break;
}
}
/// <summary>
/// Called when the job progress changes.
/// </summary>
/// <param name="sender">The sender.</param>
/// <param name="e">The <see cref="JobEventArgs"/> instance containing the event data.</param>
/// <exception cref="ArgumentNullException">
/// sender
/// or
/// e
/// </exception>
private void OnJobProgressChanged(object sender, JobEventArgs e)
{
if (sender == null) throw new ArgumentNullException(nameof(sender));
if (e == null) throw new ArgumentNullException(nameof(e));
LogMessage($"OnJobProgressChanged: Progress = {e.Progress:P0}, Message = {e.Message}");
}
/// <summary>
/// Called when a work item's status changes.
/// </summary>
/// <param name="sender">The sender.</param>
/// <param name="e">The <see cref="WorkItemEventArgs"/> instance containing the event data.</param>
/// <exception cref="System.ArgumentNullException">
/// sender
/// or
/// e
/// </exception>
private void OnWorkItemStatusChanged(object sender, WorkItemEventArgs e)
{
if (sender == null) throw new ArgumentNullException(nameof(sender));
if (e == null) throw new ArgumentNullException(nameof(e));
LogMessage($"OnWorkItemStatusChanged: Progress = {e.Progress:P0}, Message = {e.Message}");
}
/// <summary>
/// Called when a work item's progress changes.
/// </summary>
/// <param name="sender">The sender.</param>
/// <param name="e">The <see cref="WorkItemEventArgs"/> instance containing the event data.</param>
/// <exception cref="System.ArgumentNullException">
/// sender
/// or
/// e
/// </exception>
private void OnWorkItemProgressChanged(object sender, WorkItemEventArgs e)
{
if (sender == null) throw new ArgumentNullException(nameof(sender));
if (e == null) throw new ArgumentNullException(nameof(e));
LogMessage($"OnWorkItemProgressChanged: Progress = {e.Progress:P0}, Message = {e.Message}");
}
/// <summary>
/// Logs a message to be displayed to the user.
/// </summary>
/// <param name="message">The message to be logged.</param>
private void LogMessage(string message)
{
LogEntries.Add(new LogEntryViewModel(message));
}
/// <summary>
/// Logs the results.
/// </summary>
private void LogResults()
{
var results = OneComputeServices.ResultStorageService.GetItems(Job.JobId).ToList();
var tuples = results.Select(result =>
{
var workUnit = OneComputeServices.WorkItemStorageService.GetItem(Job.JobId, result.WorkItemId) as WorkUnit;
var myInput = workUnit?.GetInput<MyInput>();
var myResult = result.GetResult<MyResult>();
if (myInput == null || myResult == null)
{
throw new InvalidOperationException("Error retrieving input and result data.");
}
return new
{
myInput.NumberToSquare,
Result = myResult.Value,
};
})
.OrderBy(tuple => tuple.NumberToSquare);
foreach (var tuple in tuples)
{
LogMessage($"The square of {tuple.NumberToSquare} is {tuple.Result}.");
}
}
}
}