Reputation: 79
I was trying to read from a CSV file and insert those entries into database. I figured out that internally spark created two RDD i.e. rdd_0_0 and rdd_0_1 that works on same data and does all the processing. Can anyone help in figuring out why call method is called twice by different datasets.
If two datasets/stages are created why they both of them working on same logic?? Please help me in confirming if that is the case spark works??
public final class TestJavaAggregation1 implements Serializable {
private static final long serialVersionUID = 1L;
static CassandraConfig config = null;
static PreparedStatement statement = null;
private transient SparkConf conf;
private PersonAggregationRowWriterFactory aggregationWriter = new PersonAggregationRowWriterFactory();
public Session session;
private TestJavaAggregation1(SparkConf conf) {
this.conf = conf;
}
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName(“REadFromCSVFile”).setMaster(“local[1]”).set(“spark.executor.memory”, “1g”);
conf.set(“spark.cassandra.connection.host”, “localhost”);
TestJavaAggregation1 app = new TestJavaAggregation1(conf);
app.run();
}
private void run() {
JavaSparkContext sc = new JavaSparkContext(conf);
aggregateData(sc);
sc.stop();
}
private JavaRDD sparkConfig(JavaSparkContext sc) {
JavaRDD lines = sc.textFile(“PersonAggregation1_500.csv”, 1);
System.out.println(lines.getCheckpointFile());
lines.cache();
final String heading = lines.first();
System.out.println(heading);
String headerValues = heading.replaceAll(“\t”, “,”);
System.out.println(headerValues);
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
Session session = connector.openSession();
try {
session.execute(“DROP KEYSPACE IF EXISTS java_api5″);
session.execute(“CREATE KEYSPACE java_api5 WITH replication = {‘class': ‘SimpleStrategy’, ‘replication_factor': 1}”);
session.execute(“CREATE TABLE java_api5.person (hashvalue INT, id INT, state TEXT, city TEXT, country TEXT, full_name TEXT, PRIMARY KEY((hashvalue), id, state, city, country, full_name)) WITH CLUSTERING ORDER BY (id DESC);”);
} catch (Exception ex) {
ex.printStackTrace();
}
return lines;
}
@SuppressWarnings(“serial”)
public void aggregateData(JavaSparkContext sc) {
JavaRDD lines = sparkConfig(sc);
System.out.println(“FirstRDD” + lines.partitions().size());
JavaRDD result = lines.map(new Function() {
int i = 0;
public PersonAggregation call(String row) {
PersonAggregation aggregate = new PersonAggregation();
row = row + “,” + this.hashCode();
String[] parts = row.split(“,”);
aggregate.setId(Integer.valueOf(parts[0]));
aggregate.setFull_name(parts[1]);
aggregate.setState(parts[4]);
aggregate.setCity(parts[5]);
aggregate.setCountry(parts[6]);
aggregate.setHashValue(Integer.valueOf(parts[7]));
*//below save inserts 200 entries into the database while the CSV file has only 100 records.*
**saveToJavaCassandra(aggregate);**
return aggregate;
}
});
System.out.println(result.collect().size());
List personAggregationList = result.collect();
JavaRDD aggregateRDD = sc.parallelize(personAggregationList);
javaFunctions(aggregateRDD).writerBuilder(“java_api5″, “person”,
aggregationWriter).saveToCassandra();
}
}
Please find the logs below too:
15/05/29 12:40:37 INFO FileInputFormat: Total input paths to process : 1
15/05/29 12:40:37 INFO SparkContext: Starting job: first at TestJavaAggregation1.java:89
15/05/29 12:40:37 INFO DAGScheduler: Got job 0 (first at TestJavaAggregation1.java:89) with 1 output partitions (allowLocal=true)
15/05/29 12:40:37 INFO DAGScheduler: Final stage: Stage 0(first at TestJavaAggregation1.java:89)
15/05/29 12:40:37 INFO DAGScheduler: Parents of final stage: List()
15/05/29 12:40:37 INFO DAGScheduler: Missing parents: List()
15/05/29 12:40:37 INFO DAGScheduler: Submitting Stage 0 (PersonAggregation_5.csv MappedRDD[1] at textFile at TestJavaAggregation1.java:84), which has no missing parents
15/05/29 12:40:37 INFO MemoryStore: ensureFreeSpace(2560) called with curMem=157187, maxMem=1009589944
15/05/29 12:40:37 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.5 KB, free 962.7 MB)
15/05/29 12:40:37 INFO MemoryStore: ensureFreeSpace(1897) called with curMem=159747, maxMem=1009589944
15/05/29 12:40:37 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1897.0 B, free 962.7 MB)
15/05/29 12:40:37 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:54664 (size: 1897.0 B, free: 962.8 MB)
15/05/29 12:40:37 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/05/29 12:40:37 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838
15/05/29 12:40:37 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (PersonAggregation_5.csv MappedRDD[1] at textFile at TestJavaAggregation1.java:84)
15/05/29 12:40:37 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/05/29 12:40:37 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1326 bytes)
15/05/29 12:40:37 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/05/29 12:40:37 INFO CacheManager: Partition rdd_1_0 not found, computing it
15/05/29 12:40:37 INFO HadoopRDD: Input split: file:/F:/workspace/apoorva/TestProject/PersonAggregation_5.csv:0+230
15/05/29 12:40:37 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/05/29 12:40:37 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/05/29 12:40:37 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/05/29 12:40:37 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/05/29 12:40:37 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/05/29 12:40:37 INFO MemoryStore: ensureFreeSpace(680) called with curMem=161644, maxMem=1009589944
15/05/29 12:40:37 INFO MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 680.0 B, free 962.7 MB)
15/05/29 12:40:37 INFO BlockManagerInfo: Added rdd_1_0 in memory on localhost:54664 (size: 680.0 B, free: 962.8 MB)
15/05/29 12:40:37 INFO BlockManagerMaster: Updated info of block rdd_1_0
15/05/29 12:40:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2335 bytes result sent to driver
15/05/29 12:40:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 73 ms on localhost (1/1)
15/05/29 12:40:37 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/05/29 12:40:37 INFO DAGScheduler: Stage 0 (first at TestJavaAggregation1.java:89) finished in 0.084 s
15/05/29 12:40:37 INFO DAGScheduler: Job 0 finished: first at TestJavaAggregation1.java:89, took 0.129536 s
1,FName1,MName1,LName1,state1,city1,country1
1,FName1,MName1,LName1,state1,city1,country1
15/05/29 12:40:37 INFO Cluster: New Cassandra host localhost/127.0.0.1:9042 added
15/05/29 12:40:37 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
FirstRDD1
SecondRDD1
15/05/29 12:40:47 INFO SparkContext: Starting job: collect at TestJavaAggregation1.java:147
15/05/29 12:40:47 INFO DAGScheduler: Got job 1 (collect at TestJavaAggregation1.java:147) with 1 output partitions (allowLocal=false)
15/05/29 12:40:47 INFO DAGScheduler: Final stage: Stage 1(collect at TestJavaAggregation1.java:147)
15/05/29 12:40:47 INFO DAGScheduler: Parents of final stage: List()
15/05/29 12:40:47 INFO DAGScheduler: Missing parents: List()
15/05/29 12:40:47 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[2] at map at TestJavaAggregation1.java:117), which has no missing parents
15/05/29 12:40:47 INFO MemoryStore: ensureFreeSpace(3872) called with curMem=162324, maxMem=1009589944
15/05/29 12:40:47 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.8 KB, free 962.7 MB)
15/05/29 12:40:47 INFO MemoryStore: ensureFreeSpace(2604) called with curMem=166196, maxMem=1009589944
15/05/29 12:40:47 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.5 KB, free 962.7 MB)
15/05/29 12:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:54664 (size: 2.5 KB, free: 962.8 MB)
15/05/29 12:40:47 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
15/05/29 12:40:47 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:838
15/05/29 12:40:47 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MappedRDD[2] at map at TestJavaAggregation1.java:117)
15/05/29 12:40:47 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
15/05/29 12:40:47 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1326 bytes)
15/05/29 12:40:47 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
15/05/29 12:40:47 INFO BlockManager: Found block rdd_1_0 locally
com.local.myProj1.TestJavaAggregation1$1@2f877f16,797409046,state1,city1,country1
15/05/29 12:40:47 INFO DCAwareRoundRobinPolicy: Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
15/05/29 12:40:47 INFO Cluster: New Cassandra host localhost/127.0.0.1:9042 added
Connected to cluster: Test Cluster
Datacenter: datacenter1; Host: localhost/127.0.0.1; Rack: rack1
com.local.myProj1.TestJavaAggregation1$1@2f877f16,797409046,state2,city2,country1
com.local.myProj1.TestJavaAggregation1$1@2f877f16,797409046,state3,city3,country1
com.local.myProj1.TestJavaAggregation1$1@2f877f16,797409046,state4,city4,country1
com.local.myProj1.TestJavaAggregation1$1@2f877f16,797409046,state5,city5,country1
15/05/29 12:40:47 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 2343 bytes result sent to driver
15/05/29 12:40:47 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 184 ms on localhost (1/1)
15/05/29 12:40:47 INFO DAGScheduler: Stage 1 (collect at TestJavaAggregation1.java:147) finished in 0.185 s
15/05/29 12:40:47 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/05/29 12:40:47 INFO DAGScheduler: Job 1 finished: collect at TestJavaAggregation1.java:147, took 0.218779 s
______________________________5_______________________________
15/05/29 12:40:47 INFO SparkContext: Starting job: collect at TestJavaAggregation1.java:150
15/05/29 12:40:47 INFO DAGScheduler: Got job 2 (collect at TestJavaAggregation1.java:150) with 1 output partitions (allowLocal=false)
15/05/29 12:40:47 INFO DAGScheduler: Final stage: Stage 2(collect at TestJavaAggregation1.java:150)
15/05/29 12:40:47 INFO DAGScheduler: Parents of final stage: List()
15/05/29 12:40:47 INFO DAGScheduler: Missing parents: List()
15/05/29 12:40:47 INFO DAGScheduler: Submitting Stage 2 (MappedRDD[2] at map at TestJavaAggregation1.java:117), which has no missing parents
15/05/29 12:40:47 INFO MemoryStore: ensureFreeSpace(3872) called with curMem=168800, maxMem=1009589944
15/05/29 12:40:47 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.8 KB, free 962.7 MB)
15/05/29 12:40:47 INFO MemoryStore: ensureFreeSpace(2604) called with curMem=172672, maxMem=1009589944
15/05/29 12:40:47 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.5 KB, free 962.7 MB)
15/05/29 12:40:47 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:54664 (size: 2.5 KB, free: 962.8 MB)
15/05/29 12:40:47 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0
15/05/29 12:40:47 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:838
15/05/29 12:40:47 INFO DAGScheduler: Submitting 1 missing tasks from Stage 2 (MappedRDD[2] at map at TestJavaAggregation1.java:117)
15/05/29 12:40:47 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
15/05/29 12:40:47 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, PROCESS_LOCAL, 1326 bytes)
15/05/29 12:40:47 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
15/05/29 12:40:47 INFO BlockManager: Found block rdd_1_0 locally
com.local.myProj1.TestJavaAggregation1$1@17b560af,397762735,state1,city1,country1
com.local.myProj1.TestJavaAggregation1$1@17b560af,397762735,state2,city2,country1
com.local.myProj1.TestJavaAggregation1$1@17b560af,397762735,state3,city3,country1
com.local.myProj1.TestJavaAggregation1$1@17b560af,397762735,state4,city4,country1
com.local.myProj1.TestJavaAggregation1$1@17b560af,397762735,state5,city5,country1
15/05/29 12:40:47 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 2343 bytes result sent to driver
15/05/29 12:40:47 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 16 ms on localhost (1/1)
15/05/29 12:40:47 INFO DAGScheduler: Stage 2 (collect at TestJavaAggregation1.java:150) finished in 0.016 s
15/05/29 12:40:47 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
15/05/29 12:40:47 INFO DAGScheduler: Job 2 finished: collect at TestJavaAggregation1.java:150, took 0.026302 s
Upvotes: 0
Views: 114
Reputation: 26
When you are running a spark cluster and you run a spark job. Spark distributes the data in the cluster in terms of RDD's the partitioning of data is handled by spark. When you create a lines RDD in your sparkConfig method by reading a file. Spark partitions the data and creates RDD partitions internally so that when in memory computation happens it is done over distrubuted data over the RDD's in your cluster. Therefore your JavaRDD lines is internally a union on various RDD_partitions. Hence, when you run a map job on JavaRDD lines, it runs for all the data partitioned amongst various internal RDD's that relate to the JavaRDD on which you ran the map function. As in your case spark created two internal partitions of the JavaRDD Lines, that is why the map function is called two times for the two internal JavaRDD partitions. Please tell me if you have any other questions.
Upvotes: 0