Herwini
Herwini

Reputation: 437

From Azure synapse read/write from/to an Azure data lake

I am trying to write a pyspark dataframe to a the data lake. This is the code I have:

spark.conf.set("fs.azure.account.oauth.provider.type", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", linked_service)
spark.sparkContext._jsc.hadoopConfiguration().set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", linked_service)
spark.sparkContext._jsc.hadoopConfiguration().set("spark.storage.synapse.linkedServiceName", "LS_abs_idodatalakedev")

adls_path = f"abfs://{container}@{storage_account}.dfs.core.windows.net/"
file = adls_path + file_location
spark.read.format('csv').load(file)

Results in the following error:

Py4JJavaError                             Traceback (most recent call last) Cell In [53], line 7
  5 adls_path = f"abfs://{container}@{storage_account}.dfs.core.windows.net/"
  6 file = adls_path + file_location

----> 7 spark.read.format('csv').load(file)

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py:177, in DataFrameReader.load(self, path, format, schema, **options) 175 self.options(**options) 176 if isinstance(path, str): --> 177 return self._df(self._jreader.load(path)) 178 elif path is not None: 179 if type(path) != list:

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.call(self, *args) 1315 command = proto.CALL_COMMAND_NAME +\ 1316 self.command_header +\ 1317 args_command +\ 1318 proto.END_COMMAND_PART 1320 answer = self.gateway_client.send_command(command) -> 1321 return_value = get_return_value( 1322 answer, self.gateway_client, self.target_id, self.name) 1324 for temp_arg in temp_args: 1325 temp_arg._detach()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception..deco(*a, **kw) 188 def deco(*a: Any, **kw: Any) -> Any: 189 try: --> 190 return f(*a, **kw) 191 except Py4JJavaError as e: 192 converted = convert_exception(e.java_exception)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o4037.load. : Status code: -1 error code: null error message: Auth failure: HTTP Error -1CustomTokenProvider getAccessToken threw org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException : No LinkedServiceName provided in configuration. Please set spark.storage.synapse.linkedServiceNameorg.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator$HttpException: HTTP Error -1CustomTokenProvider getAccessToken threw org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException : No LinkedServiceName provided in configuration. Please set spark.storage.synapse.linkedServiceName at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:274) ...

The error raises the point that no linkedservicename is set. However, when I run:

spark.conf.get(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName")

I get the linkedservice name I provided earlier.

Also when I run:

spark.sparkContext.getConf().get(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName")

I get nothing.

Is the linkedservicename not rightly set?

Upvotes: 0

Views: 416

Answers (1)

JayashankarGS
JayashankarGS

Reputation: 8040

There are two ways you can use a linked service for reading data.

  1. When the linked service authentication method is set to Account Key, use the SAS key by specifying the provider as LinkedServiceBasedSASProvider.
%%pyspark
# Set the required configs
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<LINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.auth.type.{source_full_storage_account_name}", "SAS")
sc._jsc.hadoopConfiguration().set(f"fs.azure.sas.token.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

# Python code
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')

df.show()
  1. If the linked service authentication method is set to Managed Identity or Service Principal, the linked service will use the Managed Identity or Service Principal token with the LinkedServiceBasedTokenProvider provider.
%%pyspark
# Python code
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<LINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.oauth.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')

df.show()

The error occurs because the linked service name is not configured properly.

Stop your current session and re-run your notebook cells with one of the above codes according to your linked service configuration.

Also, ensure that you have provided the correct linked service name.

enter image description here

Creating a new session removes all current configuration, and re-running the code configures the authentication values correctly.

Output:

enter image description here

Upvotes: 1

Related Questions