Sanket Tarun Shah
Sanket Tarun Shah

Reputation: 637

Is there any way to monitor Azure Synapse Pipelines execution?

In my project, I've a need where I need to show how Pipeline is progressing on custom Web Portal built in PHP. Is there any way in any language such as C# or Java through which I can list pipelines and monitor the progress or even log into Application Insights?

Upvotes: 1

Views: 1533

Answers (3)

user15117824
user15117824

Reputation:

Powershell

Using Powershell scripts you can trace all the pipelines which are in running/queued state and also it confirm whether they are Running/Queued more than 30 minutes

Import-Module AzureAD
Install-module Az
Connect-AzAccount  -UseDeviceAuthentication

$workspaceName = "your_synapse_workspaceName"
$runStartedAfter = (Get-Date).AddMinutes(-30)
$runStartedBefore = (Get-Date)
$pipelineNames = @("pipeline_name1", "pipeline_name2", "pipeline_name3")

# Get pipeline runs for each pipeline
$filteredPipelineRuns = foreach ($pipelineName in $pipelineNames) {
    Get-AzSynapsePipelineRun -PipelineName $pipelineName -WorkspaceName $workspaceName -RunStartedAfter $runStartedAfter -RunStartedBefore $runStartedBefore |
    Where-Object { $_.Status -in @("InProgress", "Queued") -and $_.RunStart -lt (Get-Date).AddMinutes(-30) } |
    Select-Object PipelineName, RunId, Status, @{
        Name='Duration';
        Expression={($_.RunStart - (Get-Date)).TotalMinutes}
    } |
    Sort-Object PipelineName |
    Format-Table
}

# Display the results
$filteredPipelineRuns

Java

Using Java Code

import com.microsoft.azure.AzureEnvironment;
import com.microsoft.azure.credentials.ApplicationTokenCredentials;
import com.microsoft.azure.management.synapse.v2021_11_01.Run;
import com.microsoft.azure.management.synapse.v2021_11_01.SynapseManagementClientBuilder;
import com.microsoft.azure.management.synapse.v2021_11_01.implementation.PipelineRunInner;
import com.microsoft.rest.LogLevel;

import java.io.IOException;
import java.util.List;

public class AzureSynapsePipeline {

    public static void main(String[] args) {
        // Azure AD Authentication
        String clientId = "your_client_id";
        String tenantId = "your_tenant_id";
        String clientSecret = "your_client_secret";
        ApplicationTokenCredentials credentials = new ApplicationTokenCredentials(clientId, tenantId, clientSecret, AzureEnvironment.AZURE);

        // Synapse Pipeline parameters
        String workspaceName = "your_synapse_workspaceName";
        String[] pipelineNames = {"pipeline_name1", "pipeline_name2", "pipeline_name3"};
        String runStartedAfter = "some_iso_datetime"; // Replace with the actual datetime format

        // Create Synapse Management Client
        SynapseManagementClientBuilder synapseManagementClientBuilder = SynapseManagementClientBuilder
                .newBuilder()
                .withLogLevel(LogLevel.BODY_AND_HEADERS)
                .withCredentials(credentials);

        // Get pipeline runs for each pipeline
        for (String pipelineName : pipelineNames) {
            List<PipelineRunInner> pipelineRuns = synapseManagementClientBuilder.build().getPipelineRuns().listByWorkspace(workspaceName)
                    .listByPipeline(workspaceName, pipelineName, runStartedAfter);

            // Filter and display the results
            for (PipelineRunInner pipelineRun : pipelineRuns) {
                if (("InProgress".equals(pipelineRun.status()) || "Queued".equals(pipelineRun.status())) &&
                        pipelineRun.runStart().isBefore(runStartedAfter)) {
                    System.out.println("PipelineName: " + pipelineRun.pipelineName() +
                            ", RunId: " + pipelineRun.runId() +
                            ", Status: " + pipelineRun.status() +
                            ", Duration: " + calculateDuration(pipelineRun.runStart()));
                }
            }
        }
    }

    private static String calculateDuration(String runStart) {
        // Implement your logic to calculate duration based on runStart and current time
        return "SomeDuration"; // Replace with the actual duration
    }
}

Upvotes: 0

Muggin
Muggin

Reputation: 1291

        try
        {
            PipelineRunClient pipelineRunClient = new(new Uri(_Settings.SynapseExtractEndpoint), new DefaultAzureCredential());
            run = await pipelineRunClient.GetPipelineRunAsync(runId);

            while(run.Status == "InProgress" || run.Status == "Queued")
            {
                _Logger.LogInformation($"!!Pipeline {run.PipelineName} {runId} Status: {run.Status}");
                Task.Delay(30000).Wait();
                run = await pipelineRunClient.GetPipelineRunAsync(runId);
            }
            _Logger.LogInformation($"!!Pipeline {run.PipelineName} {runId} Status: {run.Status} Runtime: {run.DurationInMs} Message: {run.Message}");

        }

Upvotes: 1

Ron Dunn
Ron Dunn

Reputation: 3078

Are you labelling your queries with the OPTION (LABEL='MY LABEL') syntax?

This will make it easy to monitor the progress of your pipeline by querying sys.dm_pdw_exec_requests to pick individual queries (see last paragraph under link heading), and if you use a naming convention like 'pipeline_query' you can probably achieve what you want.

Upvotes: 1

Related Questions