Sander Vanstaen
Sander Vanstaen

Reputation: 1

@azure/storage-blob AbortError("the operation was aborted") while streaming file from azure storage

Recently we've been having some issues while streaming a file from our Azure Storage. Twice in the past week, never before that.

We have a large .csv file that we stream in batches of 1000 records. It takes about 7 seconds per batch.

Now this fails, throwing the "AbortError: The operation was aborted." error, without any details or further error handling. This happens randomly. Sometimes after batch 2, other times batch 22, etc...

Does anyone have any idea why this is?

I'm using the latest version of "@azure/storage-file-datalake" at the time of writing.

I added a few retries and extra error handling in my stream (which is not being caught since the abortError stops the process).

Upvotes: 0

Views: 77

Answers (1)

Venkatesan
Venkatesan

Reputation: 10490

throwing the "AbortError: The operation was aborted." error, without any details or further error handling. This happens randomly. Sometimes after batch 2, other times batch 22, et

The "AbortError: The operation was aborted." error in Azure Storage streaming often indicates that the connection was forcibly closed due to network instability, throttling, or an issue in how the stream is being consumed.

You can use the below code implementation for streaming a CSV file from Azure Data Lake Storage while handling retries, timeouts, and errors properly.

Code:

const { DataLakeFileClient } = require("@azure/storage-file-datalake");
const { DefaultAzureCredential } = require("@azure/identity");
const { Readable } = require("stream");
const csv = require("csv-parser");
const AbortController = require("abort-controller");

const AZURE_STORAGE_ACCOUNT_URL = "https://xxxx.dfs.core.windows.net";
const FILE_SYSTEM_NAME = "xxx";
const FILE_PATH = "xxxx.csv";
const credential = new DefaultAzureCredential();
const BATCH_SIZE = 1000;
const MAX_RETRIES = 5;

const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

async function processBatch(batch, batchNumber) {
  console.log(`Processing batch ${batchNumber} with ${batch.length} records...`);
  // Simulate processing time
  await delay(2000);
  console.log(`Batch ${batchNumber} processed successfully.`);
}

async function streamFileWithRetries() {
  const fileClient = new DataLakeFileClient(
    `${AZURE_STORAGE_ACCOUNT_URL}/${FILE_SYSTEM_NAME}/${FILE_PATH}`,
    credential
  );

  let offset = 0;
  let batchNumber = 1;
  let retries = 0;
  let controller = new AbortController();

  while (true) {
    try {
      console.log(`Fetching batch ${batchNumber} (offset: ${offset})...`);
      
      // Get file stream with timeout
      const response = await fileClient.read(offset, undefined, {
        abortSignal: controller.signal,
        timeout: 30000, // 30 seconds
      });

      const stream = response.readableStreamBody;
      if (!stream) throw new Error("Failed to read file stream.");

      let batch = [];
      const csvStream = stream.pipe(csv());

      for await (const row of csvStream) {
        batch.push(row);
        if (batch.length >= BATCH_SIZE) {
          await processBatch(batch, batchNumber);
          batch = [];
          batchNumber++;
        }
      }

      // Process remaining records
      if (batch.length > 0) {
        await processBatch(batch, batchNumber);
      }

      console.log("File processing completed.");
      break; // Exit loop on success

    } catch (error) {
      console.error(`Error processing batch ${batchNumber}: ${error.message}`);

      if (retries >= MAX_RETRIES) {
        console.error("Max retries reached. Aborting process.");
        throw error;
      }

      retries++;
      console.log(`Retrying batch ${batchNumber} (attempt ${retries})...`);
      controller.abort(); // Abort previous request
      controller = new AbortController(); // Create new controller
      await delay(1000 * retries); // Exponential backoff
    }
  }
}

// Run the function
streamFileWithRetries().catch((error) => {
  console.error("File processing failed:", error);
});

Output:

Fetching batch 1 (offset: 0)...
Processing batch 1 with 1000 records...
Batch 1 processed successfully.
Processing batch 2 with 1000 records...
.....
.....
..
Batch 100 processed successfully.
File processing completed.

enter image description here

The above Retries up to 5 times before failing and Processes CSV in batches and avoids memory overload.

Upvotes: 0

Related Questions