Reputation: 900
I have multiple schema like below with different column names and data types.
I want to generate test/simulated data using DataFrame
with Scala for each schema and save it to parquet file.
Below is the example schema (from a sample json) to generate data dynamically with dummy values in it.
val schema1 = StructType(
List(
StructField("a", DoubleType, true),
StructField("aa", StringType, true)
StructField("p", LongType, true),
StructField("pp", StringType, true)
)
)
I need rdd/dataframe like this with 1000 rows each based on number of columns in the above schema.
val data = Seq(
Row(1d, "happy", 1L, "Iam"),
Row(2d, "sad", 2L, "Iam"),
Row(3d, "glad", 3L, "Iam")
)
Basically.. like this 200 datasets are there for which I need to generate data dynamically, writing separate programs for each scheme is merely impossible for me.
Pls. help me with your ideas or impl. as I am new to spark.
Is it possible to generate dynamic data based on schema of different types?
Upvotes: 12
Views: 5579
Reputation: 2909
Using @JacekLaskowski's advice, you could generate dynamic data using generators with ScalaCheck
(Gen
) based on field/types you are expecting.
It could look like this:
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SaveMode}
import org.scalacheck._
import scala.collection.JavaConverters._
val dynamicValues: Map[(String, DataType), Gen[Any]] = Map(
("a", DoubleType) -> Gen.choose(0.0, 100.0),
("aa", StringType) -> Gen.oneOf("happy", "sad", "glad"),
("p", LongType) -> Gen.choose(0L, 10L),
("pp", StringType) -> Gen.oneOf("Iam", "You're")
)
val schemas = Map(
"schema1" -> StructType(
List(
StructField("a", DoubleType, true),
StructField("aa", StringType, true),
StructField("p", LongType, true),
StructField("pp", StringType, true)
)),
"schema2" -> StructType(
List(
StructField("a", DoubleType, true),
StructField("pp", StringType, true),
StructField("p", LongType, true)
)
)
)
val numRecords = 1000
schemas.foreach {
case (name, schema) =>
// create a data frame
spark.createDataFrame(
// of #numRecords records
(0 until numRecords).map { _ =>
// each of them a row
Row.fromSeq(schema.fields.map(field => {
// with fields based on the schema's fieldname & type else null
dynamicValues.get((field.name, field.dataType)).flatMap(_.sample).orNull
}))
}.asJava, schema)
// store to parquet
.write.mode(SaveMode.Overwrite).parquet(name)
}
Upvotes: 15
Reputation: 3863
You could do something like this
import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json4s
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
import scala.util.Random
object Test extends App {
val structType: StructType = StructType(
List(
StructField("a", DoubleType, true),
StructField("aa", StringType, true),
StructField("p", LongType, true),
StructField("pp", StringType, true)
)
)
val spark = SparkSession
.builder()
.master("local[*]")
.config(new SparkConf())
.getOrCreate()
import spark.implicits._
val df = createRandomDF(structType, 1000)
def createRandomDF(structType: StructType, size: Int, rnd: Random = new Random()): DataFrame ={
spark.read.schema(structType).json((0 to size).map { _ => compact(randomJson(rnd, structType))}.toDS())
}
def randomJson(rnd: Random, dataType: DataType): JValue = {
dataType match {
case v: DoubleType =>
json4s.JDouble(rnd.nextDouble())
case v: StringType =>
JString(rnd.nextString(10))
case v: IntegerType =>
JInt(rnd.nextInt())
case v: LongType =>
JInt(rnd.nextLong())
case v: FloatType =>
JDouble(rnd.nextFloat())
case v: BooleanType =>
JBool(rnd.nextBoolean())
case v: ArrayType =>
val size = rnd.nextInt(10)
JArray(
(0 to size).map(_ => randomJson(rnd, v.elementType)).toList
)
case v: StructType =>
JObject(
v.fields.flatMap {
f =>
if (f.nullable && rnd.nextBoolean())
None
else
Some(JField(f.name, randomJson(rnd, f.dataType)))
}.toList
)
}
}
}
Upvotes: 1
Reputation: 74779
ScalaCheck is a framework to generate data, you generate a raw data based on the schema using you custom generators.
Visit ScalaCheck Documentation.
Upvotes: 3