dilsingi
dilsingi

Reputation: 2958

Spark dataframe from String to UUID

While trying to move data from S3 to Mongo via spark-mongo connector and using SparkSQL for transformations, I'm getting stuck with having to transform a column from string to UUID. The column is stored as string in S3 and Im looking for the appropriate transformation function to call out to store it as UUID while saving to Mongo.

Tried using udf but not able to read the specific column from data frame and convert a string value into uuid. Any advice on how to write a spark udf ?

Sample Input from S3 file : key1 string, key2 string, key2_type int

Expected output into Mongo : key1 UUID, key2 string, key2_type int

Currently we use SparkSQL transformation reading from S3 saving into Mongo

sourceMap = sourceMap ++ jsonObjectPropertiesToMap(List("s3path", "fileformat", "awsaccesskeyid", "awssecretaccesskey"), source)
            sparkSession.sparkContext.hadoopConfiguration.set("mapreduce.input.fileinputformat.‌​input.dir.recursive" , "true")
            setAWSCredentials (sparkSession, sourceMap);
            df = s3ToDataFrame(sourceMap("s3path"), sourceMap("fileformat"), sparkSession)
            
val dft = sparkSession.sql(mappingsToTransformedSQL(mappings))

destinationMap = destinationMap ++ jsonObjectPropertiesToMap(List("cluster", "database", "authenticationdatabase","collection", "login", "password"), destination)
            dataFrameToMongodb(destinationMap("cluster"), destinationMap("database"), destinationMap("authenticationdatabase"),destinationMap("collection"),destinationMap("login"),destinationMap("password"), dft)

Here is the function as recommended below for stringtoUUID

def stringToUUID(uuid : String):String = {
          java.util.UUID.fromString(
            uuid
              .replaceFirst(
                "(\\p{XDigit}{8})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}+)", "$1-$2-$3-$4-$5"
              )
          ).toString
        }

        val stringToUUIDUdf = udf((uuid: String) => stringToUUID(uuid))
        
        dft.withColumn("key1", stringToUUIDUdf(df("key1")))

Here is the error we get

17/07/01 17:51:05 INFO SparkSqlParser: Parsing command: Select key1 AS key1,key1_type_id AS key1_type_id,key2 AS key2,key2_type_id AS key2_type_id,site AS site,updated AS updated FROM tmp
org.apache.spark.sql.AnalysisException: resolved attribute(s) key1#1 missing from key2#19,updated#22,site#21,key1#17,key1_type_id#18,key2_type_id#20 in operator !Project [UDF(key1#1) AS key1#30, key1_type_id#18, key2#19, key2_type_id#20, site#21, updated#22];;
!Project [UDF(key1#1) AS key1#30, key1_type_id#18, key2#19, key2_type_id#20, site#21, updated#22]
+- Project [key1#1 AS key1#17, key1_type_id#2 AS key1_type_id#18, key2#3 AS key2#19, key2_type_id#4 AS key2_type_id#20, site#5 AS site#21, updated#6 AS updated#22]
   +- SubqueryAlias tmp, `tmp`
      +- Relation[key1#1,key1_type_id#2,key2#3,key2_type_id#4,site#5,updated#6,pdateid#7] parquet

Upvotes: 2

Views: 5112

Answers (2)

knowledge_seeker
knowledge_seeker

Reputation: 372

Use the below logic to get it working.

Dependency :

   <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>bson</artifactId>
        <version>3.4.2</version>
    </dependency>

Function:

def test(uuids : String): Binary ={ 
val uuid = UUID.fromString(uuids)
val holder = new BsonDocument 
val writer = new BsonDocumentWriter(holder)      
writer.writeStartDocument()
writer.writeName("uuid")
new UuidCodec(UuidRepresentation.STANDARD).encode(writer, uuid, 
EncoderContext.builder().build())
writer.writeEndDocument()
val bsonBinary = holder.getBinary("uuid");
val test2=  new Binary(bsonBinary.getType(), bsonBinary.getData()); 
return test2

}

Upvotes: 1

Piotr Kalański
Piotr Kalański

Reputation: 689

Start from defining Scala function:

def stringToUUID(uuid: String): String = {
  java.util.UUID.fromString(
    uuid
      .replaceFirst(
        "(\\p{XDigit}{8})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}+)", "$1-$2-$3-$4-$5"
      )
  ).toString
}

Create UDF based on above function:

val stringToUUIDUdf = udf((uuid: String) => stringToUUID(uuid))

Add new uuid column using withColumn transformation:

df.withColumn("uuid", stringToUUIDUdf(df("text")))

You can also use select transformation:

df.select(stringToUUIDUdf(df("text")).alias("uuid"))

Example:

val df = session.createDataset(Seq(
  "7158e7a4c1284697bcab58dfb8c80e66",
  "cf251f4c667c46b3a9f67681f3be2338",
  "42d3ee515d8c4268b47b579170c88e4c",
  "6b7e3222292d4dc5a8a369f7fede7dc4",
  "b371896d39d04fbb8a8646a176e60d17",
  "e2b57f1677154c5bbe181a575aba4684",
  "2a2e11c4cc604673bbd13b22f029dabb",
  "fcad3f649a114336a721fc3eaefd6ce1",
  "f3f6fcfd16394e1e9c98aae0bd062432",
  "8b0e1929e335489997bfca20bb021d62"
)).toDF("text")

df.withColumn("uuid", stringToUUIDUdf(df("text"))).show(false)

Result: +--------------------------------+------------------------------------+ |text |uuid | +--------------------------------+------------------------------------+ |7158e7a4c1284697bcab58dfb8c80e66|7158e7a4-c128-4697-bcab-58dfb8c80e66| |cf251f4c667c46b3a9f67681f3be2338|cf251f4c-667c-46b3-a9f6-7681f3be2338| |42d3ee515d8c4268b47b579170c88e4c|42d3ee51-5d8c-4268-b47b-579170c88e4c| |6b7e3222292d4dc5a8a369f7fede7dc4|6b7e3222-292d-4dc5-a8a3-69f7fede7dc4| |b371896d39d04fbb8a8646a176e60d17|b371896d-39d0-4fbb-8a86-46a176e60d17| |e2b57f1677154c5bbe181a575aba4684|e2b57f16-7715-4c5b-be18-1a575aba4684| |2a2e11c4cc604673bbd13b22f029dabb|2a2e11c4-cc60-4673-bbd1-3b22f029dabb| |fcad3f649a114336a721fc3eaefd6ce1|fcad3f64-9a11-4336-a721-fc3eaefd6ce1| |f3f6fcfd16394e1e9c98aae0bd062432|f3f6fcfd-1639-4e1e-9c98-aae0bd062432| |8b0e1929e335489997bfca20bb021d62|8b0e1929-e335-4899-97bf-ca20bb021d62| +--------------------------------+------------------------------------+

Upvotes: 1

Related Questions