OneComputeWpfClient\ViewModels\MainViewModel.cs

using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Data;

using DNV.One.Compute.Core.EnvironmentVariables;
using DNV.One.Compute.Core.FlowModel;
using DNV.One.Compute.Core.Scheduling;
using DNV.One.Compute.Scheduling.AzureBatch.Credentials;
using DNV.One.Compute.Scheduling.Scheduler;
using DNV.One.Compute.Scheduling.WorkScheduling;

using OneComputeCommon;
using OneComputeWpfClient.Azure;
using OneComputeWpfClient.Services;
using OneComputeWpfClient.Support;

namespace OneComputeWpfClient.ViewModels
{
    /// <summary>
    /// The view model for the main view.
    /// </summary>
    /// <seealso cref="NotifyPropertyChangedBase" />
    public class MainViewModel : NotifyPropertyChangedBase
    {
        /// <summary>
        /// Gets the title.
        /// </summary>
        public string Title => "OneCompute WPF Client Application";

        /// <summary>
        /// Gets the OneCompute services.
        /// </summary>
        private OneComputeServices OneComputeServices { get; } = new OneComputeServices();

        /// <summary>
        /// Gets or sets the cancellation token source.
        /// </summary>
        private CancellationTokenSource CancellationTokenSource { get; set; }

        /// <summary>
        /// Gets or sets the job.
        /// </summary>
        private Job Job { get; set; }

        /// <summary>
        /// Gets or sets the job scheduler.
        /// </summary>
        private IJobScheduler JobScheduler { get; set; }

        /// <summary>
        /// Gets or sets the job monitor.
        /// </summary>
        private IJobMonitor JobMonitor { get; set; }

        /// <summary>
        /// Indicates if the calculation is executing.
        /// </summary>
        private bool _isExecuting;

        /// <summary>
        /// Gets or sets a value indicating whether the calculation is executing.
        /// </summary>
        public bool IsExecuting
        {
            get => _isExecuting;
            set
            {
                if (Set(ref _isExecuting, value))
                {
                    RunCommand.RaiseCanExecuteChanged();
                    CancelCommand.RaiseCanExecuteChanged();
                }
            }
        }

        /// <summary>
        /// The run command.
        /// </summary>
        private ParameterlessRelayCommand _runCommand;

        /// <summary>
        /// Gets the run command.
        /// </summary>
        public ParameterlessRelayCommand RunCommand => 
            _runCommand ?? (_runCommand = new ParameterlessRelayCommand(RunAsync, CanRun));

        /// <summary>
        /// The cancel command.
        /// </summary>
        private ParameterlessRelayCommand _cancelCommand;

        /// <summary>
        /// Gets the cancel command.
        /// </summary>
        public ParameterlessRelayCommand CancelCommand => 
            _cancelCommand ?? (_cancelCommand = new ParameterlessRelayCommand(Cancel, CanCancel));

        /// <summary>
        /// The log entries.
        /// </summary>
        private ObservableCollection<LogEntryViewModel> _logEntries;

        /// <summary>
        /// The synchronization lock for the <see cref="LogEntries"/> observable collection." />.
        /// </summary>
        private readonly object _logEntriesLock = new object();

        /// <summary>
        /// Gets the log entries.
        /// </summary>
        public ObservableCollection<LogEntryViewModel> LogEntries
        {
            get
            {
                if (_logEntries == null)
                {
                    _logEntries = new ObservableCollection<LogEntryViewModel>();
                    BindingOperations.EnableCollectionSynchronization(_logEntries, _logEntriesLock);
                }

                return _logEntries;
            }
        }

        /// <summary>
        /// Execute the work.
        /// </summary>
        private async void RunAsync()
        {
            IsExecuting = true;

            await Task.Run(async () =>
            {
                InitializeRun();
                CreateJob();
                DefineWorkForJob();
                DefineApplicationPackagesForJob();
                await SubmitJobAsync();
                FinalizeRun();
            }).ConfigureAwait(true);

            IsExecuting = false;
        }

        /// <summary>
        /// Determines whether the job can be run.
        /// </summary>
        /// <returns>
        ///   <c>true</c> if the job can be run; otherwise, <c>false</c>.
        /// </returns>
        private bool CanRun() => 
            !IsExecuting;

        /// <summary>
        /// Cancels the job.
        /// </summary>
        private void Cancel()
        {
            JobScheduler.CancelJobAsync(Job.JobId);
            CancellationTokenSource.Cancel();
            LogMessage("Job cancelled.");
        }

        /// <summary>
        /// Determines whether the job can be cancelled.
        /// </summary>
        /// <returns>
        ///   <c>true</c> if the job can be cancelled; otherwise, <c>false</c>.
        /// </returns>
        private bool CanCancel() => 
            IsExecuting && !(CancellationTokenSource?.IsCancellationRequested ?? false);

        /// <summary>
        /// Initializes the run.
        /// </summary>
        private void InitializeRun()
        {
            LogEntries.Clear();
            LogMessage("Running...");

            CancellationTokenSource = new CancellationTokenSource();
        }

        /// <summary>
        /// Finalizes the run.
        /// </summary>
        private void FinalizeRun()
        {
            JobMonitor.JobStatusChanged -= OnJobStatusChanged;
            JobMonitor.JobProgressChanged -= OnJobProgressChanged;
            JobMonitor.WorkItemStatusChanged -= OnWorkItemStatusChanged;
            JobMonitor.WorkItemProgressChanged -= OnWorkItemProgressChanged;

            CancellationTokenSource?.Dispose();
            CancellationTokenSource = null;

            JobScheduler = null;
            JobMonitor = null;
            Job = null;
        }

        /// <summary>
        /// Creates a new job.
        /// </summary>
        private void CreateJob()
        {
            // TODO: Define your user Id, which must be an email address.
            const string userId = "first.last@dnvgl.com";

            Job = new Job(userId);
        }

        /// <summary>
        /// Creates the job scheduler.
        /// </summary>
        /// <returns>
        /// A new <see cref="IJobScheduler"/> instance.
        /// </returns>
        private IJobScheduler CreateJobScheduler()
        {
            // TODO: Define your preferred task monitoring update frequency.
            const int taskMonitoringIntervalInSeconds = 5;

            var logger = new TraceLogger();
            var workScheduler = CreateWorkScheduler();

            // Create job scheduler that will co-ordinate the actioning of the job.
            var jobScheduler = new JobScheduler(
                logger,
                workScheduler,
                OneComputeServices.WorkItemStatusService,
                OneComputeServices.JobStatusService,
                OneComputeServices.WorkItemStorageService)
            {
                TaskMonitoringInterval = taskMonitoringIntervalInSeconds,
            };

            return jobScheduler;
        }

        /// <summary>
        /// Creates the work scheduler.
        /// </summary>
        /// <returns>
        /// A new <see cref="IWorkScheduler"/> instance. />
        /// </returns>
        private IWorkScheduler CreateWorkScheduler()
        {
            var batchCredentials = BatchCredentials.ConfigureBatchAccountAccessKeyCredentials(
                AzureBatchConstants.BatchAccountKey,
                AzureBatchConstants.BatchAccountName,
                AzureBatchConstants.BatchAccountUrl);

            var workScheduler = new AzureBatchWorkItemScheduler(AzureBatchConstants.PoolId, batchCredentials, OneComputeServices);
            workScheduler.OverrideWorkerStorageConnectionString(AzureStorageConstants.StorageConnectionString);

            return workScheduler;
        }

        /// <summary>
        /// Defines Azure Batch application packages to be used for the job.
        /// </summary>
        private void DefineApplicationPackagesForJob()
        {
            // TODO: Define your Azure Batch application package Id's.
            const string applicationPackageId = "xxxxxx";

            var deploymentModel = new DeploymentModel();
            deploymentModel.AddApplicationPackage(applicationPackageId);
            Job.DeploymentModel = deploymentModel;
        }

        /// <summary>
        /// Defines the work for the job.
        /// </summary>
        private void DefineWorkForJob()
        {
            // In this simple example, we specify a set of numbers to be squared.
            var numbersToSquare = Enumerable.Range(0, 5).ToList().AsReadOnly();
            LogJobInputData(numbersToSquare);

            // TODO: Define your job's work to be done.
            var workItems = numbersToSquare.Select(CreateWorkUnit).Cast<WorkItem>().ToList();

            // TODO: Define your job in terms of parallel and/or sequential work.
            // In this simple example, all tasks can be executed in parallel.
            var work = new ParallelWork
            {
                WorkItems = workItems,
            };

            Job.Work = work;
        }

        /// <summary>
        /// Submits the job.
        /// </summary>
        /// <returns>
        /// A <see cref="Task"/>.
        /// </returns>
        private async Task SubmitJobAsync()
        {
            // Create and initialize a scheduler to deploy the job on to the worker host in Azure Batch.
            JobScheduler = CreateJobScheduler();

            // Submit the job for scheduling.
            var jobId = Job?.JobId;
            JobMonitor = await JobScheduler.SubmitJobAsync(Job);
            LogMessage($"Job '{jobId}' submitted.");

            JobMonitor.JobStatusChanged += OnJobStatusChanged;
            JobMonitor.JobProgressChanged += OnJobProgressChanged;
            JobMonitor.WorkItemStatusChanged += OnWorkItemStatusChanged;
            JobMonitor.WorkItemProgressChanged += OnWorkItemProgressChanged;

            await JobMonitor.AwaitTerminationAsync(jobId);
        }

        /// <summary>
        /// Logs the job input data.
        /// </summary>
        /// <param name="numbersToSquare">The numbers to square.</param>
        private void LogJobInputData(IEnumerable<int> numbersToSquare)
        {
            var messageData = numbersToSquare.Aggregate(
                new StringBuilder(),
                (sb, i) => sb.Append($"'{i}', "),
                sb => sb.Remove(sb.Length - 2, 2).ToString());

            LogMessage($"Numbers to be squared: {messageData}");
        }

        /// <summary>
        /// Creates the work unit for running on an Azure Batch host.
        /// </summary>
        /// <param name="numberToSquare">The number to be squared.</param>
        /// <returns>The work unit.</returns>
        private WorkUnit CreateWorkUnit(int numberToSquare)
        {
            // TODO: Define your preferred work unit status update frequency.
            const int workUnitStatusUpdateFrequencyInSeconds = 5;

            var myInput = new MyInput
            {
                NumberToSquare = numberToSquare,
            };

            var workUnit = new WorkUnit(myInput)
            {
                StatusUpdateFrequency = workUnitStatusUpdateFrequencyInSeconds,
                Properties = { { EnvironmentVariablesNames.OneComputeStorageConnectionString, AzureStorageConstants.StorageConnectionString } },
            };

            return workUnit;
        }

        /// <summary>
        /// Called when the job status changes.
        /// </summary>
        /// <param name="sender">The sender.</param>
        /// <param name="e">The <see cref="JobEventArgs"/> instance containing the event data.</param>
        /// <exception cref="ArgumentNullException">
        /// sender
        /// or
        /// e
        /// </exception>
        private void OnJobStatusChanged(object sender, JobEventArgs e)
        {
            if (sender == null) throw new ArgumentNullException(nameof(sender));
            if (e == null) throw new ArgumentNullException(nameof(e));

            var jobId = e.JobId;

            switch (e.WorkStatus)
            {
                case WorkStatus.Faulted:
                    LogMessage($"OnJobStatusChanged: Job '{jobId}' faulted. Details: '{e.Message}'.");
                    break;
                case WorkStatus.Aborted:
                    LogMessage($"OnJobStatusChanged: Job '{jobId}' aborted.");
                    break;
                case WorkStatus.Completed:
                    LogMessage($"OnJobStatusChanged: Job '{jobId}' completed.");
                    LogResults();
                    break;
            }
        }

        /// <summary>
        /// Called when the job progress changes.
        /// </summary>
        /// <param name="sender">The sender.</param>
        /// <param name="e">The <see cref="JobEventArgs"/> instance containing the event data.</param>
        /// <exception cref="ArgumentNullException">
        /// sender
        /// or
        /// e
        /// </exception>
        private void OnJobProgressChanged(object sender, JobEventArgs e)
        {
            if (sender == null) throw new ArgumentNullException(nameof(sender));
            if (e == null) throw new ArgumentNullException(nameof(e));

            LogMessage($"OnJobProgressChanged: Progress = {e.Progress:P0}, Message = {e.Message}");
        }

        /// <summary>
        /// Called when a work item's status changes.
        /// </summary>
        /// <param name="sender">The sender.</param>
        /// <param name="e">The <see cref="WorkItemEventArgs"/> instance containing the event data.</param>
        /// <exception cref="System.ArgumentNullException">
        /// sender
        /// or
        /// e
        /// </exception>
        private void OnWorkItemStatusChanged(object sender, WorkItemEventArgs e)
        {
            if (sender == null) throw new ArgumentNullException(nameof(sender));
            if (e == null) throw new ArgumentNullException(nameof(e));

            LogMessage($"OnWorkItemStatusChanged: Progress = {e.Progress:P0}, Message = {e.Message}");
        }

        /// <summary>
        /// Called when a work item's progress changes.
        /// </summary>
        /// <param name="sender">The sender.</param>
        /// <param name="e">The <see cref="WorkItemEventArgs"/> instance containing the event data.</param>
        /// <exception cref="System.ArgumentNullException">
        /// sender
        /// or
        /// e
        /// </exception>
        private void OnWorkItemProgressChanged(object sender, WorkItemEventArgs e)
        {
            if (sender == null) throw new ArgumentNullException(nameof(sender));
            if (e == null) throw new ArgumentNullException(nameof(e));

            LogMessage($"OnWorkItemProgressChanged: Progress = {e.Progress:P0}, Message = {e.Message}");
        }

        /// <summary>
        /// Logs a message to be displayed to the user.
        /// </summary>
        /// <param name="message">The message to be logged.</param>
        private void LogMessage(string message)
        {
            LogEntries.Add(new LogEntryViewModel(message));
        }

        /// <summary>
        /// Logs the results.
        /// </summary>
        private void LogResults()
        {
            var results = OneComputeServices.ResultStorageService.GetItems(Job.JobId).ToList();

            var tuples = results.Select(result =>
            {
                var workUnit = OneComputeServices.WorkItemStorageService.GetItem(Job.JobId, result.WorkItemId) as WorkUnit;
                var myInput = workUnit?.GetInput<MyInput>();
                var myResult = result.GetResult<MyResult>();

                if (myInput == null || myResult == null)
                {
                    throw new InvalidOperationException("Error retrieving input and result data.");
                }

                return new
                {
                    myInput.NumberToSquare,
                    Result = myResult.Value,
                };
            })
            .OrderBy(tuple => tuple.NumberToSquare);

            foreach (var tuple in tuples)
            {
                LogMessage($"The square of {tuple.NumberToSquare} is {tuple.Result}.");
            }
        }
    }
}
  • Edit this page
In this article
Back to top Copyright © DNV AS. All rights reserved.