Reputation: 2958
While trying to move data from S3 to Mongo via spark-mongo connector and using SparkSQL for transformations, I'm getting stuck with having to transform a column from string to UUID. The column is stored as string in S3 and Im looking for the appropriate transformation function to call out to store it as UUID while saving to Mongo.
Tried using udf but not able to read the specific column from data frame and convert a string value into uuid. Any advice on how to write a spark udf ?
Sample Input from S3 file : key1 string, key2 string, key2_type int
Expected output into Mongo : key1 UUID, key2 string, key2_type int
Currently we use SparkSQL transformation reading from S3 saving into Mongo
sourceMap = sourceMap ++ jsonObjectPropertiesToMap(List("s3path", "fileformat", "awsaccesskeyid", "awssecretaccesskey"), source)
sparkSession.sparkContext.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive" , "true")
setAWSCredentials (sparkSession, sourceMap);
df = s3ToDataFrame(sourceMap("s3path"), sourceMap("fileformat"), sparkSession)
val dft = sparkSession.sql(mappingsToTransformedSQL(mappings))
destinationMap = destinationMap ++ jsonObjectPropertiesToMap(List("cluster", "database", "authenticationdatabase","collection", "login", "password"), destination)
dataFrameToMongodb(destinationMap("cluster"), destinationMap("database"), destinationMap("authenticationdatabase"),destinationMap("collection"),destinationMap("login"),destinationMap("password"), dft)
Here is the function as recommended below for stringtoUUID
def stringToUUID(uuid : String):String = {
java.util.UUID.fromString(
uuid
.replaceFirst(
"(\\p{XDigit}{8})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}+)", "$1-$2-$3-$4-$5"
)
).toString
}
val stringToUUIDUdf = udf((uuid: String) => stringToUUID(uuid))
dft.withColumn("key1", stringToUUIDUdf(df("key1")))
Here is the error we get
17/07/01 17:51:05 INFO SparkSqlParser: Parsing command: Select key1 AS key1,key1_type_id AS key1_type_id,key2 AS key2,key2_type_id AS key2_type_id,site AS site,updated AS updated FROM tmp
org.apache.spark.sql.AnalysisException: resolved attribute(s) key1#1 missing from key2#19,updated#22,site#21,key1#17,key1_type_id#18,key2_type_id#20 in operator !Project [UDF(key1#1) AS key1#30, key1_type_id#18, key2#19, key2_type_id#20, site#21, updated#22];;
!Project [UDF(key1#1) AS key1#30, key1_type_id#18, key2#19, key2_type_id#20, site#21, updated#22]
+- Project [key1#1 AS key1#17, key1_type_id#2 AS key1_type_id#18, key2#3 AS key2#19, key2_type_id#4 AS key2_type_id#20, site#5 AS site#21, updated#6 AS updated#22]
+- SubqueryAlias tmp, `tmp`
+- Relation[key1#1,key1_type_id#2,key2#3,key2_type_id#4,site#5,updated#6,pdateid#7] parquet
Upvotes: 2
Views: 5112
Reputation: 372
Use the below logic to get it working.
Dependency :
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>3.4.2</version>
</dependency>
Function:
def test(uuids : String): Binary ={
val uuid = UUID.fromString(uuids)
val holder = new BsonDocument
val writer = new BsonDocumentWriter(holder)
writer.writeStartDocument()
writer.writeName("uuid")
new UuidCodec(UuidRepresentation.STANDARD).encode(writer, uuid,
EncoderContext.builder().build())
writer.writeEndDocument()
val bsonBinary = holder.getBinary("uuid");
val test2= new Binary(bsonBinary.getType(), bsonBinary.getData());
return test2
}
Upvotes: 1
Reputation: 689
Start from defining Scala function:
def stringToUUID(uuid: String): String = {
java.util.UUID.fromString(
uuid
.replaceFirst(
"(\\p{XDigit}{8})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}+)", "$1-$2-$3-$4-$5"
)
).toString
}
Create UDF based on above function:
val stringToUUIDUdf = udf((uuid: String) => stringToUUID(uuid))
Add new uuid column using withColumn
transformation:
df.withColumn("uuid", stringToUUIDUdf(df("text")))
You can also use select
transformation:
df.select(stringToUUIDUdf(df("text")).alias("uuid"))
Example:
val df = session.createDataset(Seq(
"7158e7a4c1284697bcab58dfb8c80e66",
"cf251f4c667c46b3a9f67681f3be2338",
"42d3ee515d8c4268b47b579170c88e4c",
"6b7e3222292d4dc5a8a369f7fede7dc4",
"b371896d39d04fbb8a8646a176e60d17",
"e2b57f1677154c5bbe181a575aba4684",
"2a2e11c4cc604673bbd13b22f029dabb",
"fcad3f649a114336a721fc3eaefd6ce1",
"f3f6fcfd16394e1e9c98aae0bd062432",
"8b0e1929e335489997bfca20bb021d62"
)).toDF("text")
df.withColumn("uuid", stringToUUIDUdf(df("text"))).show(false)
Result:
+--------------------------------+------------------------------------+
|text |uuid |
+--------------------------------+------------------------------------+
|7158e7a4c1284697bcab58dfb8c80e66|7158e7a4-c128-4697-bcab-58dfb8c80e66|
|cf251f4c667c46b3a9f67681f3be2338|cf251f4c-667c-46b3-a9f6-7681f3be2338|
|42d3ee515d8c4268b47b579170c88e4c|42d3ee51-5d8c-4268-b47b-579170c88e4c|
|6b7e3222292d4dc5a8a369f7fede7dc4|6b7e3222-292d-4dc5-a8a3-69f7fede7dc4|
|b371896d39d04fbb8a8646a176e60d17|b371896d-39d0-4fbb-8a86-46a176e60d17|
|e2b57f1677154c5bbe181a575aba4684|e2b57f16-7715-4c5b-be18-1a575aba4684|
|2a2e11c4cc604673bbd13b22f029dabb|2a2e11c4-cc60-4673-bbd1-3b22f029dabb|
|fcad3f649a114336a721fc3eaefd6ce1|fcad3f64-9a11-4336-a721-fc3eaefd6ce1|
|f3f6fcfd16394e1e9c98aae0bd062432|f3f6fcfd-1639-4e1e-9c98-aae0bd062432|
|8b0e1929e335489997bfca20bb021d62|8b0e1929-e335-4899-97bf-ca20bb021d62|
+--------------------------------+------------------------------------+
Upvotes: 1