Reputation: 156
I have an RDD extracted from files containing source and destination URLS of the format :
google.de/2011/10/Extract-host link.de/2011/10/extact-host
facebook.de/2014/11/photos facebook.de/2014/11/name.jpg
community.cloudera.com/t5/ community.cloudera.com/t10/
These are the source and destination URLS. I would like to extract only the host name like for example:
google.de link.de
facebook.de facebook.de
community.cloudera.com community.cloudera.com
I know how to extract the host name if there is only one column in the file using
file.flatMap(_.split("/").take(1)).
Im not sure how to apply it on both source and destination URLs. Here is what i have tried :
file.flatMap{case(src + "\t" + dst) =>
((split.take(1).flatMap(line => line.split("/").take(1))),
(split.takeRight(1).flatMap(line => line.split("/").take(1))))}
Please tell me how to extract this format using scala.
Thank you!!
Upvotes: 2
Views: 1664
Reputation: 1584
Assuming the input source and destination urls are separated with "\t"
val result = file.map(f => {
val urls = f.split("\t")
if (!(urls.length < 2)) {
val result = urls(0).takeWhile(_ != '/') + "\t" + urls(1).takeWhile(_ != '/')
result
} else
null
})
result.collect().foreach(println(_))
Upvotes: 1
Reputation: 967
You can use pattern matching:
val pattern = """([a-zA-Z0-9\.]+)/\S*\s+([a-zA-Z0-9\.]+)/.*""".r
val srcAndDest = rdd flatMap {
_ match {
case pattern(src, dest) => Some(src, dest)
case _ => None
}
}
Upvotes: 0
Reputation: 1553
A data frame oriented answer :
val df_raw = spark.read
.format("com.databricks.spark.csv")
.option("delimiter","\t")
.option("header", "true")
.load("your_file.txt")
//if header is false just specify a schema
import org.apache.spark.sql.types._
val schema = StructType(
StructField("src", StringType, true) ::
StructField("dst", StringType, true) :: Nil)
//and add this line to the spark.read :
//.schema(schema)
Using an udf function, dunno if this is optimized :
val get_domain = spark.udf.register("get_domain",(value:String)=> value.takeWhile(_ != '/'))
or
val get_domain = udf((value:String)=> value.takeWhile(_ != '/'))
And selecting the new columns :
val df_final = df_raw
.withColumn("src_domain",get_domain(col("src")))
.withColumn("dst_domain",get_domain(col("dst")))
Upvotes: 1