Nikita Dobryukha
Nikita Dobryukha

Reputation: 31

How to decide if Spark application performance is close to maximum (for given cores and memory)?

We use Cassandra 3.5 with Spark 1.6.1 in 2-node cluster (8 cores and 16G memory per node).

There is the following Cassandra table

CREATE TABLE schema.trade (
symbol text,
date int,
trade_time timestamp,
reporting_venue text,
trade_id bigint,
ref_trade_id bigint,
action_type text,
price double,
quantity int,
condition_code text,
PRIMARY KEY ((symbol, date), trade_time, trade_id)
) WITH compaction = {'class' : 'DateTieredCompactionStrategy', 'base_time_seconds' : '3600', 'max_sstable_age_days' : '93'};

And I want to calculate percentage of volume: sum of all volume from trades in the relevant security during the time period groupped by exchange and time bar (1 or 5 minutes). I've created an example:

void getPercentageOfVolume(String symbol, Integer date, Timestamp timeFrom, Timestamp timeTill, Integer barWidth) {
    char MILLISECOND_TO_MINUTE_MULTIPLIER = 60_000;
    LOG.info("start");
    JavaPairRDD<Tuple2, Integer> counts = javaFunctions(sparkContext).cassandraTable("schema", "trade")
            .filter(row ->
                        row.getString("symbol").equals(symbol) && row.getInt("date").equals(date) &&
                        row.getDateTime("trade_time").getMillis() >= timeFrom.getTime() &&
                        row.getDateTime("trade_time").getMillis() < timeTill.getTime())
            .mapToPair(row ->
                new Tuple2<>(
                    new Tuple2(
                            new Timestamp(
                                    (row.getDateTime("trade_time").getMillis() / (barWidth * MILLISECOND_TO_MINUTE_MULTIPLIER)) * barWidth * MILLISECOND_TO_MINUTE_MULTIPLIER
                            ),
                            row.getString("reporting_venue")),
                    row.getInt("quantity")
                )
            ).reduceByKey((a, b) -> a + b);
    LOG.info(counts.collect().toString());
    LOG.info("finish");
}

[2016-06-15 09:25:27.014] [INFO ] [main] [EquityTCAAnalytics] start
[2016-06-15 09:25:28.000] [INFO ] [main] [NettyUtil] Found Netty's native epoll transport in the classpath, using it
[2016-06-15 09:25:28.518] [INFO ] [main] [Cluster] New Cassandra host /node1:9042 added
[2016-06-15 09:25:28.519] [INFO ] [main] [LocalNodeFirstLoadBalancingPolicy] Added host node1 (datacenter1)
[2016-06-15 09:25:28.519] [INFO ] [main] [Cluster] New Cassandra host /node2:9042 added
[2016-06-15 09:25:28.520] [INFO ] [main] [CassandraConnector] Connected to Cassandra cluster: Cassandra
[2016-06-15 09:25:29.115] [INFO ] [main] [SparkContext] Starting job: collect at EquityTCAAnalytics.java:88
[2016-06-15 09:25:29.385] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Registering RDD 2 (mapToPair at EquityTCAAnalytics.java:78)
[2016-06-15 09:25:29.388] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Got job 0 (collect at EquityTCAAnalytics.java:88) with 5 output partitions
[2016-06-15 09:25:29.388] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Final stage: ResultStage 1 (collect at EquityTCAAnalytics.java:88)
[2016-06-15 09:25:29.389] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Parents of final stage: List(ShuffleMapStage 0)
[2016-06-15 09:25:29.391] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Missing parents: List(ShuffleMapStage 0)
[2016-06-15 09:25:29.400] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at mapToPair at EquityTCAAnalytics.java:78), which has no missing parents
[2016-06-15 09:25:29.594] [INFO ] [dag-scheduler-event-loop] [MemoryStore] Block broadcast_0 stored as values in memory (estimated size 10.8 KB, free 10.8 KB)
[2016-06-15 09:25:29.642] [INFO ] [dag-scheduler-event-loop] [MemoryStore] Block broadcast_0_piece0 stored as bytes in memory (estimated size 5.4 KB, free 16.3 KB)
[2016-06-15 09:25:29.647] [INFO ] [dispatcher-event-loop-7] [BlockManagerInfo] Added broadcast_0_piece0 in memory on node2:44871 (size: 5.4 KB, free: 2.4 GB)
[2016-06-15 09:25:29.650] [INFO ] [dag-scheduler-event-loop] [SparkContext] Created broadcast 0 from broadcast at DAGScheduler.scala:1006
[2016-06-15 09:25:29.658] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Submitting 5 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at mapToPair at EquityTCAAnalytics.java:78)
[2016-06-15 09:25:29.661] [INFO ] [dag-scheduler-event-loop] [TaskSchedulerImpl] Adding task set 0.0 with 5 tasks
[2016-06-15 09:25:30.006] [INFO ] [dispatcher-event-loop-7] [SparkDeploySchedulerBackend] Registered executor NettyRpcEndpointRef(null) (node1:41122) with ID 0
[2016-06-15 09:25:30.040] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] Starting task 0.0 in stage 0.0 (TID 0, node1, partition 0,NODE_LOCAL, 11725 bytes)
[2016-06-15 09:25:30.051] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] Starting task 1.0 in stage 0.0 (TID 1, node1, partition 1,NODE_LOCAL, 11317 bytes)
[2016-06-15 09:25:30.054] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] Starting task 2.0 in stage 0.0 (TID 2, node1, partition 2,NODE_LOCAL, 11929 bytes)
[2016-06-15 09:25:30.057] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] Starting task 3.0 in stage 0.0 (TID 3, node1, partition 3,NODE_LOCAL, 11249 bytes)
[2016-06-15 09:25:30.059] [INFO ] [dispatcher-event-loop-7] [TaskSetManager] Starting task 4.0 in stage 0.0 (TID 4, node1, partition 4,NODE_LOCAL, 11560 bytes)
[2016-06-15 09:25:30.077] [INFO ] [dispatcher-event-loop-7] [SparkDeploySchedulerBackend] Registered executor NettyRpcEndpointRef(null) (CassandraCH4.ehubprod.local:33668) with ID 1
[2016-06-15 09:25:30.111] [INFO ] [dispatcher-event-loop-4] [BlockManagerMasterEndpoint] Registering block manager node1:36512 with 511.1 MB RAM, BlockManagerId(0, node1, 36512)
[2016-06-15 09:25:30.168] [INFO ] [dispatcher-event-loop-3] [BlockManagerMasterEndpoint] Registering block manager CassandraCH4.ehubprod.local:33610 with 511.1 MB RAM, BlockManagerId(1, CassandraCH4.ehubprod.local, 33610)
[2016-06-15 09:25:30.818] [INFO ] [dispatcher-event-loop-2] [BlockManagerInfo] Added broadcast_0_piece0 in memory on node1:36512 (size: 5.4 KB, free: 511.1 MB)
[2016-06-15 09:25:36.764] [INFO ] [pool-21-thread-1] [CassandraConnector] Disconnected from Cassandra cluster: Cassandra
[2016-06-15 09:25:48.914] [INFO ] [task-result-getter-0] [TaskSetManager] Finished task 4.0 in stage 0.0 (TID 4) in 18854 ms on node1 (1/5)
[2016-06-15 09:25:55.541] [INFO ] [task-result-getter-1] [TaskSetManager] Finished task 2.0 in stage 0.0 (TID 2) in 25489 ms on node1 (2/5)
[2016-06-15 09:25:57.837] [INFO ] [task-result-getter-2] [TaskSetManager] Finished task 1.0 in stage 0.0 (TID 1) in 27795 ms on node1 (3/5)
[2016-06-15 09:25:57.931] [INFO ] [task-result-getter-3] [TaskSetManager] Finished task 0.0 in stage 0.0 (TID 0) in 27919 ms on node1 (4/5)
[2016-06-15 09:26:01.357] [INFO ] [task-result-getter-0] [TaskSetManager] Finished task 3.0 in stage 0.0 (TID 3) in 31302 ms on node1 (5/5)
[2016-06-15 09:26:01.358] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] ShuffleMapStage 0 (mapToPair at EquityTCAAnalytics.java:78) finished in 31.602 s
[2016-06-15 09:26:01.360] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] looking for newly runnable stages
[2016-06-15 09:26:01.360] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] running: Set()
[2016-06-15 09:26:01.360] [INFO ] [task-result-getter-0] [TaskSchedulerImpl] Removed TaskSet 0.0, whose tasks have all completed, from pool
[2016-06-15 09:26:01.362] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] waiting: Set(ResultStage 1)
[2016-06-15 09:26:01.362] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] failed: Set()
[2016-06-15 09:26:01.365] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Submitting ResultStage 1 (ShuffledRDD[3] at reduceByKey at EquityTCAAnalytics.java:87), which has no missing parents
[2016-06-15 09:26:01.373] [INFO ] [dag-scheduler-event-loop] [MemoryStore] Block broadcast_1 stored as values in memory (estimated size 3.6 KB, free 19.9 KB)
[2016-06-15 09:26:01.382] [INFO ] [dag-scheduler-event-loop] [MemoryStore] Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.1 KB, free 21.9 KB)
[2016-06-15 09:26:01.383] [INFO ] [dispatcher-event-loop-1] [BlockManagerInfo] Added broadcast_1_piece0 in memory on node2:44871 (size: 2.1 KB, free: 2.4 GB)
[2016-06-15 09:26:01.384] [INFO ] [dag-scheduler-event-loop] [SparkContext] Created broadcast 1 from broadcast at DAGScheduler.scala:1006
[2016-06-15 09:26:01.385] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] Submitting 5 missing tasks from ResultStage 1 (ShuffledRDD[3] at reduceByKey at EquityTCAAnalytics.java:87)
[2016-06-15 09:26:01.386] [INFO ] [dag-scheduler-event-loop] [TaskSchedulerImpl] Adding task set 1.0 with 5 tasks
[2016-06-15 09:26:01.390] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] Starting task 0.0 in stage 1.0 (TID 5, node1, partition 0,NODE_LOCAL, 2786 bytes)
[2016-06-15 09:26:01.390] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] Starting task 1.0 in stage 1.0 (TID 6, node1, partition 1,NODE_LOCAL, 2786 bytes)
[2016-06-15 09:26:01.397] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] Starting task 2.0 in stage 1.0 (TID 7, node1, partition 2,NODE_LOCAL, 2786 bytes)
[2016-06-15 09:26:01.398] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] Starting task 3.0 in stage 1.0 (TID 8, node1, partition 3,NODE_LOCAL, 2786 bytes)
[2016-06-15 09:26:01.406] [INFO ] [dispatcher-event-loop-4] [TaskSetManager] Starting task 4.0 in stage 1.0 (TID 9, node1, partition 4,NODE_LOCAL, 2786 bytes)
[2016-06-15 09:26:01.429] [INFO ] [dispatcher-event-loop-4] [BlockManagerInfo] Added broadcast_1_piece0 in memory on node1:36512 (size: 2.1 KB, free: 511.1 MB)
[2016-06-15 09:26:01.452] [INFO ] [dispatcher-event-loop-6] [MapOutputTrackerMasterEndpoint] Asked to send map output locations for shuffle 0 to node1:41122
[2016-06-15 09:26:01.456] [INFO ] [dispatcher-event-loop-6] [MapOutputTrackerMaster] Size of output statuses for shuffle 0 is 161 bytes
[2016-06-15 09:26:01.526] [INFO ] [task-result-getter-1] [TaskSetManager] Finished task 4.0 in stage 1.0 (TID 9) in 128 ms on node1 (1/5)
[2016-06-15 09:26:01.575] [INFO ] [task-result-getter-3] [TaskSetManager] Finished task 2.0 in stage 1.0 (TID 7) in 184 ms on node1 (2/5)
[2016-06-15 09:26:01.580] [INFO ] [task-result-getter-2] [TaskSetManager] Finished task 0.0 in stage 1.0 (TID 5) in 193 ms on node1 (3/5)
[2016-06-15 09:26:01.589] [INFO ] [task-result-getter-3] [TaskSetManager] Finished task 1.0 in stage 1.0 (TID 6) in 199 ms on node1 (4/5)
[2016-06-15 09:26:01.599] [INFO ] [task-result-getter-2] [TaskSetManager] Finished task 3.0 in stage 1.0 (TID 8) in 200 ms on node1 (5/5)
[2016-06-15 09:26:01.599] [INFO ] [task-result-getter-2] [TaskSchedulerImpl] Removed TaskSet 1.0, whose tasks have all completed, from pool
[2016-06-15 09:26:01.599] [INFO ] [dag-scheduler-event-loop] [DAGScheduler] ResultStage 1 (collect at EquityTCAAnalytics.java:88) finished in 0.202 s
[2016-06-15 09:26:01.612] [INFO ] [main] [DAGScheduler] Job 0 finished: collect at EquityTCAAnalytics.java:88, took 32.496470 s
[2016-06-15 09:26:01.634] [INFO ] [main] [EquityTCAAnalytics] [((2016-06-10 13:45:00.0,DA),6944), ((2016-06-10 14:25:00.0,B),5241), ..., ((2016-06-10 10:55:00.0,QD),109080), ((2016-06-10 14:55:00.0,A),1300)]
[2016-06-15 09:26:01.641] [INFO ] [main] [EquityTCAAnalytics] finish

Is 32.5 s normal?

Upvotes: 0

Views: 811

Answers (1)

aleck
aleck

Reputation: 305

I'd say %% of CPU and/or memory usage would be a starting point. It your cores are underutilised that can mean your process is not parallel enough. Memory-size lots depend on your data, but generally it's to use more of RAM than go back to IO.

Upvotes: 0

Related Questions