ohh
ohh

Reputation: 193

PySpark stored procedure in BigQuery throws a syntax error

I'm trying to create a stored procedure in BigQuery using inline PySpark code like this:

https://cloud.google.com/bigquery/docs/spark-procedures#use-inline-code

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

But it throws an error:

Syntax error: Expected end of input but got keyword FROM at [8:1]

Not sure why, any ideas?

Upvotes: 0

Views: 144

Answers (1)

Println
Println

Reputation: 594

I am able to create the same stored procedure by using the above code. Just with some changes please find the below SQL code. Please create the spark connection before running the query and follow the documentation for the permissions https://cloud.google.com/bigquery/docs/spark-procedures

CREATE OR REPLACE PROCEDURE  `project_name`.sample.replace_spark_proc()
WITH CONNECTION `project_name.us-central1.spark_connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

Upvotes: 0

Related Questions