Fabian Peña
Fabian Peña

Reputation: 73

Error in Spark 1.3.1 (PySpark) and MongoDB 3.4

I have a very simple script to persist a dataframe with two columns in MongoDB:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf
from datetime import datetime


sparkConf = SparkConf().setMaster("local").setAppName("Wiki-Analyzer").set("spark.app.id", "Wiki-Analyzer")
sparkConf.set("spark.mongodb.input.uri", "...")
sparkConf.set("spark.mongodb.output.uri", "...")

sc = SparkContext(conf=sparkConf)
sqlContext = SQLContext(sc)    

charactersRdd = sc.parallelize([("Bilbo Baggins",  50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77), ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)])
    characters = sqlContext.createDataFrame(charactersRdd, ["name", "age"])
    characters.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").save()

But I get the following error:

py4j.protocol.Py4JJavaError: An error occurred while calling o91.apply.
: org.apache.spark.sql.AnalysisException: Cannot resolve column name "write" among (name, age);
        at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:162)
        at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:162)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:161)
        at org.apache.spark.sql.DataFrame.col(DataFrame.scala:447)
        at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:437)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)

I am runing the script with:

spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.10:1.1.0 wiki-analyzer.py

Thank you in advance!

Upvotes: 0

Views: 332

Answers (2)

Ross
Ross

Reputation: 18101

Spark 1.3.x is not supported but the MongoDB Spark Connector.

See the documentation:

+-----------------------------+---------------+-----------------+
| MongoDB Connector for Spark | Spark Version | MongoDB Version |
+-----------------------------+---------------+-----------------+
|                       2.0.0 | 2.0.x         | 2.6 or later    |
|                       1.1.0 | 1.6.x         | 2.6 or later    |
+-----------------------------+---------------+-----------------+

I would strongly suggest upgrading your Spark installation as there have been many improvements since 1.3

Upvotes: 1

Josh Rosen
Josh Rosen

Reputation: 13801

The problem here is that in

characters.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").save()

the .write is being interpreted as selecting a column named "write". The reason for this is that you're using Spark 1.3.1, which doesn't support the .write syntax in its generic load/save functions (see Spark 1.3.1 docs); that syntax is only supported in Spark 1.4.0+ (see Spark 1.4.0 docs).

If you must use Spark 1.3.x, try

characters.save(source="com.mongodb.spark.sql.DefaultSource", mode="overwrite")

(based on the DataFrame.save() Python API docs for Spark 1.3.x).

If at all possible, though, I'd recommend upgrading to a newer Spark version (1.6.x or 2.1.x).

Upvotes: 2

Related Questions