opus111
opus111

Reputation: 2844

Loading large data from MySQL into Spark

Looking for Spark understanding...

I am loading large amounts of data from MySQL into Spark, and it keeps dying :-(

org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)

Here is my code

val query =
  s"""
     (
      select 
      mod(act.AccountID, ${parts}) part,
      p.Value name, event.EventTime eventTime, act.AccountID accountID, act.UserGoal goalID,event.ActivityID activityID, id.CountryID countryID, arr.ConsumerID consumerID
      from DimIdentity as id
      join FactArrival as arr on  arr.IdentityID=id.IdentityID
      join FactActivityEvent as event on event.ArrivalID=arr.ArrivalID
      join DimAccount as act on  act.AccountID=event.AccountID
      join DimAccountRoleTypeMatch as role on role.AccountID=act.AccountID
      join DimDateTime as d on event.DateTimeID=d.DateTimeID
      join DimProperty as p on p.PropertyID=event.EventTypeID
      where
        id.Botness=0 and 
        d.DayOfYear>=${from} and d.DayOfYear<${to} and d.Year=${year} and
        (role.AccountRoleTypeID=1 or role.AccountRoleTypeID=2)
  ) a
  """.stripMargin

val events = sqlContext.read.format("jdbc").
  option("url", sqlURL).
  option("driver", "com.mysql.jdbc.Driver").
  option("useUnicode", "true").
  option("zeroDateTimeBehavior", "round").
  option("continueBatchOnError", "true").
  option("useSSL", "false").
  option("dbtable", query).
  option("user", sqlUser).
  option("password", sqlPassword).
  option("partitionColumn", "part").
  option("lowerBound", "0").
  option("upperBound", s"${parts - 1}").
  option("numPartitions", s"${parts}").
  load().as[Activity].toDF

Note that I am using partitionColumn, lowerBound, upperBound, numPartitions as recommended in other answers

I tried setting partitions from 4 to 512, but it always dies. Reading the same amount of data from file or Mongo has no problem. Is this an issue with the MySQL connector? Is there a solution?

Note that I found one answer that suggests I avoid Spark, and read the query into a file on HDFS, then load the file

Multiple Partitions in Spark RDD

Is this really the best way?

Upvotes: 2

Views: 4842

Answers (3)

user1899888
user1899888

Reputation: 59

You can read data by changing sql query with limit offset. Then use shell script to automate the task using for loop. This worked for me

Upvotes: 0

Arvind Kumar
Arvind Kumar

Reputation: 1335

You can try increasing the fetch size without using dynamic partitioning for read.

sqlContext.read.options(options).jdbc(
url=sqlURL, table=query, columnName="part",
fetchSize=1000000,connectionProperties=new java.util.Properties())

Upvotes: 1

opus111
opus111

Reputation: 2844

Here is the answer I got to...

For me, the answer is to avoid the mysql-connection for Spark :-( I found it too difficult to avoid the crashing caused by partitioning. Mysql-connection requires hand tuning of the partitions, and does not yield any increase in speed. Much easier to write non-Spark code that reads the data into large text files, and the call Spark on the text file. Spark is really good with most data sources, but not mysql... at least not yet

Upvotes: 1

Related Questions