mongolol
mongolol

Reputation: 971

Spark 2.0.2 PySpark failing to Import collect_list

I have a DataFrame of the form:

+--------------+------------+----+
|             s|variant_hash|call|
+--------------+------------+----+
|C1046::HG02024|    83779208|   0|
|C1046::HG02025|    83779208|   1|
|C1046::HG02026|    83779208|   0|
|C1047::HG00731|    83779208|   0|
|C1047::HG00732|    83779208|   1
              ...

I was hoping to leverage collect_list() to transform it into:

+--------------------+-------------------------------------+
|                   s|                       feature_vector|
+--------------------+-------------------------------------+
|      C1046::HG02024|[(83779208,   0), (68471259,   2)...]|
+--------------------+-------------------------------------+

Where the feature vector column is a list of tuples of the form (variant_hash, call). I was planning on leveraging groupBy and agg(collect_list()) to accomplish this result, but am receiving the following error:

Traceback (most recent call last):
  File "/tmp/ba6a891c-529b-4c75-a76f-8ab20f4377ba/ml_on_vds.py", line 43, in <module>
    vector_df = svc_df.groupBy('s').agg(func.collect_list(('variant_hash', 'call')))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 39, in _
  File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 323, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.collect_list. Trace:
py4j.Py4JException: Method collect_list([class java.util.ArrayList]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
        at py4j.Gateway.invoke(Gateway.java:274)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)

The below code shows my imports. I didn't think it was necessary to import HiveContext and enableHiveSupport in 2.0.2, but I had hoped doing so would resolve the issue. Sadly, no luck. Does anyone have any recommendations to resolve this import issue?

from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext, HiveContext
from pyspark.sql.functions import udf, hash, collect_list
from pyspark.sql.types import *
from hail import *
# Initialize the SparkSession
spark = (SparkSession.builder.appName("PopulationGenomics")
        .config("spark.sql.files.openCostInBytes", "1099511627776")
        .config("spark.sql.files.maxPartitionBytes", "1099511627776")
        .config("spark.hadoop.io.compression.codecs", "org.apache.hadoop.io.compress.DefaultCodec,is.hail.io.compress.BGzipCodec,org.apache.hadoop.io.compress.GzipCodec")
        .enableHiveSupport()
        .getOrCreate())

I am attempting run this code on a gcloud dataproc cluster.

Upvotes: 1

Views: 1571

Answers (1)

Pushkr
Pushkr

Reputation: 3619

so it throws error at this line -

vector_df = svc_df.groupBy('s').agg(func.collect_list(('variant_hash', 'call')))

you are calling collect_list as func.collect_list but you are importing functions as -

from pyspark.sql.functions import udf, hash, collect_list

may be you meant to import functions as 'func' like

from pyspark.sql import functions as func,

Upvotes: 1

Related Questions