Reputation: 2844
Looking for Spark understanding...
I am loading large amounts of data from MySQL into Spark, and it keeps dying :-(
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
Here is my code
val query =
s"""
(
select
mod(act.AccountID, ${parts}) part,
p.Value name, event.EventTime eventTime, act.AccountID accountID, act.UserGoal goalID,event.ActivityID activityID, id.CountryID countryID, arr.ConsumerID consumerID
from DimIdentity as id
join FactArrival as arr on arr.IdentityID=id.IdentityID
join FactActivityEvent as event on event.ArrivalID=arr.ArrivalID
join DimAccount as act on act.AccountID=event.AccountID
join DimAccountRoleTypeMatch as role on role.AccountID=act.AccountID
join DimDateTime as d on event.DateTimeID=d.DateTimeID
join DimProperty as p on p.PropertyID=event.EventTypeID
where
id.Botness=0 and
d.DayOfYear>=${from} and d.DayOfYear<${to} and d.Year=${year} and
(role.AccountRoleTypeID=1 or role.AccountRoleTypeID=2)
) a
""".stripMargin
val events = sqlContext.read.format("jdbc").
option("url", sqlURL).
option("driver", "com.mysql.jdbc.Driver").
option("useUnicode", "true").
option("zeroDateTimeBehavior", "round").
option("continueBatchOnError", "true").
option("useSSL", "false").
option("dbtable", query).
option("user", sqlUser).
option("password", sqlPassword).
option("partitionColumn", "part").
option("lowerBound", "0").
option("upperBound", s"${parts - 1}").
option("numPartitions", s"${parts}").
load().as[Activity].toDF
Note that I am using partitionColumn, lowerBound, upperBound, numPartitions as recommended in other answers
I tried setting partitions from 4 to 512, but it always dies. Reading the same amount of data from file or Mongo has no problem. Is this an issue with the MySQL connector? Is there a solution?
Note that I found one answer that suggests I avoid Spark, and read the query into a file on HDFS, then load the file
Multiple Partitions in Spark RDD
Is this really the best way?
Upvotes: 2
Views: 4842
Reputation: 59
You can read data by changing sql query with limit offset. Then use shell script to automate the task using for loop. This worked for me
Upvotes: 0
Reputation: 1335
You can try increasing the fetch size without using dynamic partitioning for read.
sqlContext.read.options(options).jdbc(
url=sqlURL, table=query, columnName="part",
fetchSize=1000000,connectionProperties=new java.util.Properties())
Upvotes: 1
Reputation: 2844
Here is the answer I got to...
For me, the answer is to avoid the mysql-connection for Spark :-( I found it too difficult to avoid the crashing caused by partitioning. Mysql-connection requires hand tuning of the partitions, and does not yield any increase in speed. Much easier to write non-Spark code that reads the data into large text files, and the call Spark on the text file. Spark is really good with most data sources, but not mysql... at least not yet
Upvotes: 1