Reputation: 73
I have a very simple script to persist a dataframe with two columns in MongoDB:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf
from datetime import datetime
sparkConf = SparkConf().setMaster("local").setAppName("Wiki-Analyzer").set("spark.app.id", "Wiki-Analyzer")
sparkConf.set("spark.mongodb.input.uri", "...")
sparkConf.set("spark.mongodb.output.uri", "...")
sc = SparkContext(conf=sparkConf)
sqlContext = SQLContext(sc)
charactersRdd = sc.parallelize([("Bilbo Baggins", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77), ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)])
characters = sqlContext.createDataFrame(charactersRdd, ["name", "age"])
characters.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").save()
But I get the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o91.apply.
: org.apache.spark.sql.AnalysisException: Cannot resolve column name "write" among (name, age);
at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:162)
at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:162)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:161)
at org.apache.spark.sql.DataFrame.col(DataFrame.scala:447)
at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:437)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
I am runing the script with:
spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.10:1.1.0 wiki-analyzer.py
Thank you in advance!
Upvotes: 0
Views: 332
Reputation: 18101
Spark 1.3.x is not supported but the MongoDB Spark Connector.
See the documentation:
+-----------------------------+---------------+-----------------+
| MongoDB Connector for Spark | Spark Version | MongoDB Version |
+-----------------------------+---------------+-----------------+
| 2.0.0 | 2.0.x | 2.6 or later |
| 1.1.0 | 1.6.x | 2.6 or later |
+-----------------------------+---------------+-----------------+
I would strongly suggest upgrading your Spark installation as there have been many improvements since 1.3
Upvotes: 1
Reputation: 13801
The problem here is that in
characters.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").save()
the .write
is being interpreted as selecting a column named "write". The reason for this is that you're using Spark 1.3.1, which doesn't support the .write
syntax in its generic load/save functions (see Spark 1.3.1 docs); that syntax is only supported in Spark 1.4.0+ (see Spark 1.4.0 docs).
If you must use Spark 1.3.x, try
characters.save(source="com.mongodb.spark.sql.DefaultSource", mode="overwrite")
(based on the DataFrame.save() Python API docs for Spark 1.3.x).
If at all possible, though, I'd recommend upgrading to a newer Spark version (1.6.x or 2.1.x).
Upvotes: 2