Gabriele Sciurti
Gabriele Sciurti

Reputation: 71

Read straming data from Azure Blob storage into Databricks

I'm trying to read files from an blob storage in databricks, make some computation through dataframe and write the dataframe on cassandra. At the moment file are of append blob type storage so, like said in the documentation page https://learn.microsoft.com/en-us/azure/databricks/kb/data-sources/wasb-check-blob-types, I cannot mount those in the DBFS.

When working on the local machine I'm able to download it through the Azure Storage SDK for Python, but if I try to do same thing on databricks I receve the following error:

Unable to stream download: ("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer'))

the code I'm running is the following

file_dir = data.strftime("%Y%m")
file_name = data.strftime("%Y%m%d") + ".csv"

local_path = f"dbfs:/FileStore/{file_dir}"
local_file = f"{local_path}/{file_name}"
blob_name = f"{file_dir}/{file_name}"

blob_service_client_instance = BlobServiceClient(account_url="https://"+STORAGEACCOUNTURL+".blob.core.windows.net", credential=STORAGEACCOUNTKEY)
blob_client_instance = blob_service_client_instance.get_blob_client(CONTAINERNAME, blob_name, snapshot=None)

with open(local_file, "wb") as my_blob:
           blob_data = blob_client_instance.download_blob(max_concurrency=10)
           blob_data.readinto(my_blob)

on the other note if I try to write it not in dbfs:/FileStore/{file_dir} but in the working category of databricks using os.getcwd() I'm able to write it, but not able to read the file (and probably reading from here could be a good workaround).

Upvotes: 0

Views: 708

Answers (1)

Gabriele Sciurti
Gabriele Sciurti

Reputation: 71

Changed approach and streamed the file directly into a dataframe. The answer was due to this post Read csv from Azure blob Storage and store in a DataFrame, the answer from "lsl__".

from io import BytesIO
import pandas as pd

blob_service_client = BlobServiceClient(account_url="https://"+STORAGEACCOUNTURL+".blob.core.windows.net", credential=STORAGEACCOUNTKEY)
container_client = blob_service_client.get_container_client(CONTAINERNAME)
blob_client = container_client.get_blob_client(blob_name)

                
with BytesIO() as input_blob:
  blob_client.download_blob().download_to_stream(input_blob)
  input_blob.seek(0)
  df = pd.read_csv(input_blob)

Upvotes: 1

Related Questions