Fisher Coder
Fisher Coder

Reputation: 3576

Cannot run Spark jobs for large datasets

I wrote a Spark job to read from Hive data in S3 and generate HFiles.

This job works fine when reading only one ORC file (about 190 MB), however, when I used it to read the entire S3 directory, about 400 ORC files, so about 400*190 MB = 76 GB data, it keeps throwing this following error/stacktrace:

17/06/12 01:48:03 ERROR server.TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/importer-all.jar, byteCount=194727686, body=FileSegmentManagedBuffer{file=/tmp/importer-all.jar, offset=0, length=194727686}} to /10.211.XX.XX:39149; closing connection
java.nio.channels.ClosedChannelException
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
17/06/12 01:48:03 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 0.0 (TID 6, ip-10-211-127-63.ap-northeast-2.compute.internal, executor 9): java.nio.channels.ClosedChannelException
    at org.apache.spark.network.client.StreamInterceptor.channelInactive(StreamInterceptor.java:60)
    at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:179)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:455)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:745)

17/06/12 01:48:03 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 0.0 (TID 541, ip-10-211-126-250.ap-northeast-2.compute.internal, executor 72, partition 6, PROCESS_LOCAL, 6680 bytes)
17/06/12 01:48:03 ERROR server.TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/importer-all.jar, byteCount=194727686, body=FileSegmentManagedBuffer{file=/tmp/importer-all.jar, offset=0, length=194727686}} to /10.211.XX.XX:39151; closing connection
java.nio.channels.ClosedChannelException
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)

My cluster is big enough to process it: (This was verified already)

It has 40 nodes, over 800 GB memory available, 320 VCores.

And here's my Java code:

protected void sparkGenerateHFiles(JavaRDD<Row> rdd) throws IOException {
        System.out.println("In sparkGenerateHFiles....");
        JavaPairRDD<ImmutableBytesWritable, KeyValue> javaPairRDD = rdd.mapToPair(
            new PairFunction<Row, ImmutableBytesWritable, KeyValue>() {
            public Tuple2<ImmutableBytesWritable, KeyValue> call(Row row) throws Exception {
                System.out.println("running call now ....");
                String key = (String) row.get(0);
                String value = (String) row.get(1);

                ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
                byte[] rowKeyBytes = Bytes.toBytes(key);
                rowKey.set(rowKeyBytes);

                KeyValue keyValue = new KeyValue(rowKeyBytes,
                    Bytes.toBytes("fam"),
                    Bytes.toBytes("qualifier"),
                    ProductJoin.newBuilder()
                        .setId(key)
                        .setSolrJson(value)
                        .build().toByteArray());

                return new Tuple2<ImmutableBytesWritable, KeyValue>(rowKey, keyValue);
            }
        });
        Partitioner partitioner = new IntPartitioner(2);
        // repartition and sort the data - HFiles want sorted data
        JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRDD =
            javaPairRDD.repartitionAndSortWithinPartitions(partitioner);


        Configuration baseConf = HBaseConfiguration.create();
        Configuration conf = new Configuration();
        conf.set(HBASE_ZOOKEEPER_QUORUM, importerParams.zkQuorum);
        Job job = new Job(baseConf, "map data");
        HTable table = new HTable(conf, importerParams.hbaseTargetTable);
        System.out.println("gpt table: " + table.getName());
        HFileOutputFormat2.configureIncrementalLoad(job, table);
        System.out.println("Done configuring incremental load....");

        Configuration config = job.getConfiguration();


        repartitionedRDD.saveAsNewAPIHadoopFile(
            "HFILE_OUTPUT_PATH",
            ImmutableBytesWritable.class,
            KeyValue.class,
            HFileOutputFormat2.class,
            config
            );
        System.out.println("Saved to HFILE_OUTPUT_PATH: " + HFILE_OUTPUT_PATH);
    }

protected JavaRDD<Row> readJsonTable() {
        System.out.println("In readJsonTable.....");
        SparkSession.Builder builder = SparkSession.builder().appName("Importer");
        String hiveTable = "";
        if (importerParams.local) {
            builder.master("local");
            hiveTable = HIVE_TABLE_S3A_DEV_SAMPLE;
        } else {
            hiveTable = importerParams.hiveSourceTable;
        }
        SparkSession spark = builder.getOrCreate();

        SparkContext sparkContext = spark.sparkContext();

        // this is important. need to set the endpoint to ap-northeast-2
        sparkContext.hadoopConfiguration()
            .set("fs.s3a.endpoint", "s3.ap-northeast-2.amazonaws.com");

        Dataset<Row> rows = null;
        if (importerParams.local) {
            rows = spark.read().format("orc").load(hiveTable);
        } else {
            rows = spark.read().format("orc").load(hiveTable);//use this one temporarily
//          rows = spark.read().format("orc").load(HIVE_TABLE_S3A_PREFIX
            // + importerParams.latestDateHour);
        }
        System.out.println("Finished loading hive table from S3, rows.count() = "
            + (rows != null ? rows.count() : 0));

        return rows.toJavaRDD();
    }

main program:

        long startTime = System.currentTimeMillis();
        JavaRDD<Row> rdd = readJsonTable();

        sparkGenerateHFiles(rdd);
        System.out.println("it took " + (System.currentTimeMillis() - startTime)/1000 + " seconds to generate HFiles...\n\n\n\n");

What I've tried:

I saw the closest one post on Stackoverflow. Then I've set this builder.config("spark.shuffle.blockTransferService", "nio"); but still no luck.

Any help is greatly appreciated!

Upvotes: 1

Views: 2436

Answers (1)

Fisher Coder
Fisher Coder

Reputation: 3576

As @Wang pointed out, it's really due to my data skewing problem.

To solve this problem, what I did is:

I re-create my HBase table, but this time, I used SPLITS, and split my HBase table into 80 regions. Then in my Spark code, I wrote a customized Partitioner to repartition each entry based on its key, this way, there will be not HOTSPOTTING issue, i.e. one region server is being overloaded while others are left idle.

Some other tricks learned along the way, when using SPLITS to create HBase tables, by default, the startkey of the first region and the endkey of the last region is empty string "", be sure to do the right thing there to avoid HOTSPOTTING too.

Here's a working example of my partitioner.

Thanks!

Upvotes: 1

Related Questions