Skip to content

Commit

Permalink
Implemented get error details for Synapse #21
Browse files Browse the repository at this point in the history
  • Loading branch information
mrpaulandrew committed Dec 11, 2020
1 parent b671122 commit 43effbe
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 34 deletions.
54 changes: 28 additions & 26 deletions Functions/Services/AzureDataFactoryService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ public class AzureDataFactoryService : PipelineService
public AzureDataFactoryService(PipelineRequest request, ILogger logger)
{
_logger = logger;
_logger.LogInformation("Creating ADF connectivity client.");
_logger.LogInformation("Creating ADF connectivity clients.");

//Auth details
var context = new AuthenticationContext("https://login.windows.net/" + request.TenantId);
var cc = new ClientCredential(request.ApplicationId, request.AuthenticationKey);
var result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
var cred = new TokenCredentials(result.AccessToken);

//Management Client
_adfManagementClient = new DataFactoryManagementClient(cred)
{
SubscriptionId = request.SubscriptionId
Expand Down Expand Up @@ -56,9 +58,8 @@ public override PipelineDescription ValidatePipeline(PipelineRequest request)
ActivityCount = pipelineResource.Activities.Count
};
}
catch (Exception ex)
catch (Microsoft.Rest.Azure.CloudException) //expected exception when pipeline doesnt exist
{
_logger.LogInformation(ex.Message);
return new PipelineDescription()
{
PipelineExists = "False",
Expand All @@ -68,6 +69,12 @@ public override PipelineDescription ValidatePipeline(PipelineRequest request)
ActivityCount = 0
};
}
catch (Exception ex) //other unknown issue
{
_logger.LogInformation(ex.Message);
_logger.LogInformation(ex.GetType().ToString());
throw new InvalidRequestException("Failed to validate pipeline. ", ex);
}
}

public override PipelineRunStatus ExecutePipeline(PipelineRequest request)
Expand Down Expand Up @@ -205,7 +212,7 @@ public override PipelineRunStatus GetPipelineRunStatus(PipelineRunRequest reques

public override PipelineFailStatus GetPipelineRunActivityErrors(PipelineRunRequest request)
{
var pipelineRun = _adfManagementClient.PipelineRuns.Get
PipelineRun pipelineRun = _adfManagementClient.PipelineRuns.Get
(
request.ResourceGroupName,
request.OrchestratorName,
Expand All @@ -232,7 +239,7 @@ public override PipelineFailStatus GetPipelineRunActivityErrors(PipelineRunReque
);

//Create initial output content
var output = new PipelineFailStatus()
PipelineFailStatus output = new PipelineFailStatus()
{
PipelineName = request.PipelineName,
ActualStatus = pipelineRun.Status,
Expand All @@ -244,11 +251,11 @@ public override PipelineFailStatus GetPipelineRunActivityErrors(PipelineRunReque
_logger.LogInformation("Activities found in pipeline response: " + queryResponse.Value.Count.ToString());

//Loop over activities in pipeline run
foreach (var activity in queryResponse.Value)
foreach (ActivityRun activity in queryResponse.Value)
{
if (string.IsNullOrEmpty(activity.Error.ToString()))
if (activity.Error == null)
{
continue; //just incase
continue; //only want errors
}

//Parse error output to customise output
Expand All @@ -257,25 +264,20 @@ public override PipelineFailStatus GetPipelineRunActivityErrors(PipelineRunReque
string errorType = outputBlockInner?.failureType;
string errorMessage = outputBlockInner?.message;

//Get output details
if (!string.IsNullOrEmpty(errorCode))
_logger.LogInformation("Activity run id: " + activity.ActivityRunId);
_logger.LogInformation("Activity name: " + activity.ActivityName);
_logger.LogInformation("Activity type: " + activity.ActivityType);
_logger.LogInformation("Error message: " + errorMessage);

output.Errors.Add(new FailedActivity()
{
_logger.LogInformation("Activity run id: " + activity.ActivityRunId);
_logger.LogInformation("Activity name: " + activity.ActivityName);
_logger.LogInformation("Activity type: " + activity.ActivityType);
_logger.LogInformation("Error message: " + errorMessage);

output.Errors.Add(new FailedActivity()
{
ActivityRunId = activity.ActivityRunId,
ActivityName = activity.ActivityName,
ActivityType = activity.ActivityType,
ErrorCode = errorCode,
ErrorType = errorType,
ErrorMessage = errorMessage
}
);
}
ActivityRunId = activity.ActivityRunId,
ActivityName = activity.ActivityName,
ActivityType = activity.ActivityType,
ErrorCode = errorCode,
ErrorType = errorType,
ErrorMessage = errorMessage
});
}
return output;
}
Expand Down
76 changes: 68 additions & 8 deletions Functions/Services/AzureSynapseService.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Threading;
using System.Collections.Generic;
using Newtonsoft.Json;
using Microsoft.Rest;
using Microsoft.Extensions.Logging;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
Expand All @@ -9,7 +11,6 @@
using Azure.Analytics.Synapse.Artifacts;
using Azure.Analytics.Synapse.Artifacts.Models;
using mrpaulandrew.azure.procfwk.Helpers;
using Azure;

namespace mrpaulandrew.azure.procfwk.Services
{
Expand All @@ -25,12 +26,13 @@ public AzureSynapseService(PipelineRequest request, ILogger logger)
_logger = logger;
_logger.LogInformation("Creating SYN connectivity clients.");

//Management Client
//Auth details
var context = new AuthenticationContext("https://login.windows.net/" + request.TenantId);
var cc = new ClientCredential(request.ApplicationId, request.AuthenticationKey);
var result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
var cred = new TokenCredentials(result.AccessToken);

//Management Client
_synManagementClient = new SynapseManagementClient(cred)
{
SubscriptionId = request.SubscriptionId
Expand Down Expand Up @@ -96,14 +98,10 @@ public override PipelineDescription ValidatePipeline(PipelineRequest request)
ActivityCount = 0
};
}
catch (Exception ex) //unknown issue
catch (Exception ex) //other unknown issue
{
_logger.LogInformation(ex.Message);
_logger.LogInformation(ex.GetType().ToString());
_logger.LogInformation(ex.StackTrace);
_logger.LogInformation(ex.Source);
_logger.LogInformation(ex.HelpLink);

throw new InvalidRequestException("Failed to validate pipeline. ", ex);
}
}
Expand Down Expand Up @@ -229,7 +227,69 @@ public override PipelineRunStatus GetPipelineRunStatus(PipelineRunRequest reques

public override PipelineFailStatus GetPipelineRunActivityErrors(PipelineRunRequest request)
{
throw new NotImplementedException();
PipelineRun pipelineRun = _pipelineRunClient.GetPipelineRun
(
request.RunId
);

//Defensive check
PipelineNameCheck(request.PipelineName, pipelineRun.PipelineName);

_logger.LogInformation("Create pipeline Activity Runs query filters.");
RunFilterParameters filterParams = new RunFilterParameters
(
request.ActivityQueryStart,
request.ActivityQueryEnd
);

_logger.LogInformation("Querying SYN pipeline for Activity Runs.");
ActivityRunsQueryResponse queryResponse = _pipelineRunClient.QueryActivityRuns
(
request.PipelineName,
request.RunId,
filterParams
);

//Create initial output content
PipelineFailStatus output = new PipelineFailStatus()
{
PipelineName = request.PipelineName,
ActualStatus = pipelineRun.Status,
RunId = request.RunId,
ResponseCount = queryResponse.Value.Count
};

_logger.LogInformation("Pipeline status: " + pipelineRun.Status);
_logger.LogInformation("Activities found in pipeline response: " + queryResponse.Value.Count.ToString());

//Loop over activities in pipeline run
foreach (ActivityRun activity in queryResponse.Value)
{
if (activity.Error == null)
{
continue; //only want errors
}

//Parse error output to customise output
var json = JsonConvert.SerializeObject(activity.Error);
Dictionary<string, object> errorContent = JsonConvert.DeserializeObject<Dictionary<string, object>>(json);

_logger.LogInformation("Activity run id: " + activity.ActivityRunId);
_logger.LogInformation("Activity name: " + activity.ActivityName);
_logger.LogInformation("Activity type: " + activity.ActivityType);
_logger.LogInformation("Error message: " + errorContent["message"].ToString());

output.Errors.Add(new FailedActivity()
{
ActivityRunId = activity.ActivityRunId,
ActivityName = activity.ActivityName,
ActivityType = activity.ActivityType,
ErrorCode = errorContent["errorCode"].ToString(),
ErrorType = errorContent["failureType"].ToString(),
ErrorMessage = errorContent["message"].ToString()
});
}
return output;
}

public override void Dispose()
Expand Down

0 comments on commit 43effbe

Please sign in to comment.