Reputation: 971
I have a DataFrame
of the form:
+--------------+------------+----+
| s|variant_hash|call|
+--------------+------------+----+
|C1046::HG02024| 83779208| 0|
|C1046::HG02025| 83779208| 1|
|C1046::HG02026| 83779208| 0|
|C1047::HG00731| 83779208| 0|
|C1047::HG00732| 83779208| 1
...
I was hoping to leverage collect_list()
to transform it into:
+--------------------+-------------------------------------+
| s| feature_vector|
+--------------------+-------------------------------------+
| C1046::HG02024|[(83779208, 0), (68471259, 2)...]|
+--------------------+-------------------------------------+
Where the feature vector column is a list of tuples of the form (variant_hash, call)
. I was planning on leveraging groupBy
and agg(collect_list())
to accomplish this result, but am receiving the following error:
Traceback (most recent call last):
File "/tmp/ba6a891c-529b-4c75-a76f-8ab20f4377ba/ml_on_vds.py", line 43, in <module>
vector_df = svc_df.groupBy('s').agg(func.collect_list(('variant_hash', 'call')))
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 39, in _
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 323, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.collect_list. Trace:
py4j.Py4JException: Method collect_list([class java.util.ArrayList]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
The below code shows my imports. I didn't think it was necessary to import HiveContext
and enableHiveSupport
in 2.0.2, but I had hoped doing so would resolve the issue. Sadly, no luck. Does anyone have any recommendations to resolve this import issue?
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext, HiveContext
from pyspark.sql.functions import udf, hash, collect_list
from pyspark.sql.types import *
from hail import *
# Initialize the SparkSession
spark = (SparkSession.builder.appName("PopulationGenomics")
.config("spark.sql.files.openCostInBytes", "1099511627776")
.config("spark.sql.files.maxPartitionBytes", "1099511627776")
.config("spark.hadoop.io.compression.codecs", "org.apache.hadoop.io.compress.DefaultCodec,is.hail.io.compress.BGzipCodec,org.apache.hadoop.io.compress.GzipCodec")
.enableHiveSupport()
.getOrCreate())
I am attempting run this code on a gcloud dataproc cluster.
Upvotes: 1
Views: 1571
Reputation: 3619
so it throws error at this line -
vector_df = svc_df.groupBy('s').agg(func.collect_list(('variant_hash', 'call')))
you are calling collect_list
as func.collect_list
but you are importing functions as -
from pyspark.sql.functions import udf, hash, collect_list
may be you meant to import functions as 'func' like
from pyspark.sql import functions as func
,
Upvotes: 1