Reputation: 437
I am trying to write a pyspark dataframe to a the data lake. This is the code I have:
spark.conf.set("fs.azure.account.oauth.provider.type", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", linked_service)
spark.sparkContext._jsc.hadoopConfiguration().set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", linked_service)
spark.sparkContext._jsc.hadoopConfiguration().set("spark.storage.synapse.linkedServiceName", "LS_abs_idodatalakedev")
adls_path = f"abfs://{container}@{storage_account}.dfs.core.windows.net/"
file = adls_path + file_location
spark.read.format('csv').load(file)
Results in the following error:
Py4JJavaError Traceback (most recent call last) Cell In [53], line 7 5 adls_path = f"abfs://{container}@{storage_account}.dfs.core.windows.net/" 6 file = adls_path + file_location
----> 7 spark.read.format('csv').load(file)
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py:177, in DataFrameReader.load(self, path, format, schema, **options) 175 self.options(**options) 176 if isinstance(path, str): --> 177 return self._df(self._jreader.load(path)) 178 elif path is not None: 179 if type(path) != list:
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.call(self, *args) 1315 command = proto.CALL_COMMAND_NAME +\ 1316 self.command_header +\ 1317 args_command +\ 1318 proto.END_COMMAND_PART 1320 answer = self.gateway_client.send_command(command) -> 1321 return_value = get_return_value( 1322 answer, self.gateway_client, self.target_id, self.name) 1324 for temp_arg in temp_args: 1325 temp_arg._detach()
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception..deco(*a, **kw) 188 def deco(*a: Any, **kw: Any) -> Any: 189 try: --> 190 return f(*a, **kw) 191 except Py4JJavaError as e: 192 converted = convert_exception(e.java_exception)
File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o4037.load. : Status code: -1 error code: null error message: Auth failure: HTTP Error -1CustomTokenProvider getAccessToken threw org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException : No LinkedServiceName provided in configuration. Please set spark.storage.synapse.linkedServiceNameorg.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator$HttpException: HTTP Error -1CustomTokenProvider getAccessToken threw org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException : No LinkedServiceName provided in configuration. Please set spark.storage.synapse.linkedServiceName at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:274) ...
The error raises the point that no linkedservicename is set. However, when I run:
spark.conf.get(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName")
I get the linkedservice name I provided earlier.
Also when I run:
spark.sparkContext.getConf().get(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName")
I get nothing.
Is the linkedservicename not rightly set?
Upvotes: 0
Views: 416
Reputation: 8040
There are two ways you can use a linked service for reading data.
%%pyspark
# Set the required configs
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<LINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.auth.type.{source_full_storage_account_name}", "SAS")
sc._jsc.hadoopConfiguration().set(f"fs.azure.sas.token.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")
# Python code
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')
df.show()
%%pyspark
# Python code
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<LINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.oauth.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')
df.show()
The error occurs because the linked service name is not configured properly.
Stop your current session and re-run your notebook cells with one of the above codes according to your linked service configuration.
Also, ensure that you have provided the correct linked service name.
Creating a new session removes all current configuration, and re-running the code configures the authentication values correctly.
Output:
Upvotes: 1