Reputation: 882
I am trying to read a section of bigquery table using query in azure databricks spark.
table_id = str(project_id) + "." + str(schema) + "." + str(table_name)
I am able to read complete table data using the following query.
_data = spark.read.format("bigquery").option("credentials", ans). \
option("parentProject", project_id). \
option("project", project_id). \
option("table", table_id). \
option("dataset", schema).load()
But when i try to do the same using sql query in the following way,
_query = """select * from `{}` limit 2""".format(table_id)
_data = spark.read.format("bigquery").option("credentials", ans). \
option("parentProject", project_id). \
option("project", project_id). \
option("dataset", schema). \
load(_query)
total = _data.count()
IllegalArgumentException: Invalid Table ID 'select col1 from `proj-164408.schema.mytable` limit 2'. Must match '^(((\S+)[:.])?(\w+).)?([\S&&[^.:]]+)$$'
I tried with differnt types of table ids like proj-164408:schema.mytable , proj-164408:schema:mytable
Attaching the stacktrace information.
---------------------------------------------------------------------------
IllegalArgumentException Traceback (most recent call last)
<command-755248569207678> in <module>
88 option("parentProject", project_id). \
89 option("project", project_id).option("dataset", schema). \
---> 90 load(_query)
91
92 total = _data.count()
/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
176 self.options(**options)
177 if isinstance(path, basestring):
--> 178 return self._df(self._jreader.load(path))
179 elif path is not None:
180 if type(path) != list:
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
131 # Hide where the exception came from that shows a non-Pythonic
132 # JVM exception message.
--> 133 raise_from(converted)
134 else:
135 raise
/databricks/spark/python/pyspark/sql/utils.py in raise_from(e)
Databricks Run time version that i used: 7.3 LTS (includes Apache Spark 3.0.1, Scala 2.12)
There is already a question which talks about similar issue but could not apply that to my case. Refer BigQuery: Invalid table ID
Update 1:
Found one link for source code which throws the mentioned error. com.google.cloud.bigquery.connector.common.BigQueryUtil Line number: 106
It seems we can only give fully qualified table name,
private static final String PROJECT_PATTERN = "\\S+";
private static final String DATASET_PATTERN = "\\w+";
// Allow all non-whitespace beside ':' and '.'.
// These confuse the qualified table parsing.
private static final String TABLE_PATTERN = "[\\S&&[^.:]]+";
/**
* Regex for an optionally fully qualified table.
*
* <p>Must match 'project.dataset.table' OR the legacy 'project:dataset.table' OR 'dataset.table'
* OR 'table'.
*/
private static final Pattern QUALIFIED_TABLE_REGEX =
Pattern.compile(
format("^(((%s)[:.])?(%s)\\.)?(%s)$$", PROJECT_PATTERN, DATASET_PATTERN, TABLE_PATTERN));
Matcher matcher = QUALIFIED_TABLE_REGEX.matcher(rawTable);
if (!matcher.matches()) {
throw new IllegalArgumentException(
format("Invalid Table ID '%s'. Must match '%s'", rawTable, QUALIFIED_TABLE_REGEX));
Since i gave SQL query instead of fq table name, i am getting mentioned error.
Upvotes: 1
Views: 2415
Reputation: 882
As of now, we are using big query client along with spark to read the section of dataframe.
import json
import base64 as bs
from google.oauth2 import service_account
from google.cloud import bigquery
project_id = ""
schema = ""
table_name = ""
credentials_dict = {}
schema_table_name = str(project_id) + "." + str(schema) + "." + str(table_name)
s = json.dumps(credentials_dict)
res = bs.b64encode(s.encode('utf-8'))
ans = res.decode("utf-8")
credentials = service_account.Credentials.from_service_account_info(credentials_dict)
query = "SELECT * FROM `{}` where col_1 > 50;".format(schema_table_name)
client = bigquery.Client(credentials=credentials, project=project_id)
query_job = client.query(query)
query_job.result()
df = spark.read.format('bigquery') \
.option("credentials", ans) \
.option("parentProject", project_id) \
.option("project", project_id) \
.option('dataset', query_job.destination.dataset_id) \
.load(query_job.destination.table_id)
Upvotes: 1