Shay
Shay

Reputation: 505

Hive dynamic partitioning - concurrent writes from Spark corrupts data

We have setup a spark job to insert into Hive (using dataframes). The hive table is set for creating dynamic partitions. Everything works perfectly as long we run one spark job to insert data into Hive.

The problem we have is that we anticipate to be running concurrent spark jobs to load data into Hive. This does not seem to work. I have read that dynamic partitioning does not provide an EXCLUSIVE lock, but instead a SHARED lock. In our case, we can see that we if we run 4-5 spark jobs at the same time, the data is corrupted, some records are lost. Very easily reproducible, happens almost every time.

Has anyone solved for this? i.e. insert into a hive table using dynamic partitioning with concurrent jobs and still ensure that no data corruption happens. Any inputs are greatly appreciated!!

Snippet of the spark code:

// Set hive conf to allow dynamic partitions to be created
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

//Create temp table to load data into Hive
parsedDataDF.registerTempTable("parsedDatatempTable")

//Insert data into Hive, dynamic partitioning   
sqlContext.sql("insert into table " + hiveDBToLoad + "." + hiveTableToLoad + " partition (partition_1, partition_2, partition_3) " +
    "select * from parsedDatatempTable")

Upvotes: 5

Views: 1846

Answers (1)

Kishan
Kishan

Reputation: 1

    $partition_tbl=array();
$partition_tbl[]=array(
    'tbl'=>$wpdb->prefix.'postmeta',
    'field'=>'meta_id',
    'limit'=>2,
    'part_key'=>'p',
);
$partition_tbl[]=array(
    'tbl'=>$wpdb->prefix.'posts',
    'field'=>'ID',
    'limit'=>2,
    'part_key'=>'pos',
);
        
if(!empty($partition_tbl)){
    foreach($partition_tbl as $ppptt){
        $field=$ppptt['field'];
        $tbl=$ppptt['tbl'];
        $limit=$ppptt['limit'];
        $part_key=$ppptt['part_key'];
        $get_max = $wpdb->get_results("SELECT * FROM ".$wpdb->prefix."postmeta ORDER BY $field DESC LIMIT 0, 1");
        if(!empty($get_max)){
            $max=$get_max[0]->$field;
            if($max > $limit){                              
                $partcc=ceil($max/$limit);          
                $has_partitions = $wpdb->get_results("EXPLAIN partitions SELECT * FROM ".$tbl);     
                $haspartarr=array('none');
                if(!empty($has_partitions)){
                    if(!empty($has_partitions[0]->partitions)){
                        $haspartarr=explode(",",$has_partitions[0]->partitions);
                        if(count($haspartarr) > 1){
                             unset($haspartarr[count($haspartarr)-1]);
                        }
                    }
                }
                $part_sql='';
                $part_arr=array();
                $first=true;
                for($i=0;$i<$partcc;$i++){
                    $cpart=$part_key.$i;
                    $reclim=$limit*($i+1);
                    if(!in_array($cpart,$haspartarr)){
                        if(empty($has_partitions[0]->partitions)){
                            $part_arr[]="PARTITION $cpart VALUES LESS THAN ($reclim)";
                        }else{
                            $nwlmn=($reclim-$limit);                        
                            if($first){
                                $part_sql.="ALTER TABLE $tbl REORGANIZE PARTITION $cpart INTO (";
                                $first=false;
                                $morel=$nwlmn-$limit;                                       
                            }
                            $part_arr[]="PARTITION $cpart VALUES LESS THAN ($reclim)";                                          
                        }
                    }
                }
                if(empty($has_partitions[0]->partitions) && !empty($part_arr)){
                    $cpart=$part_key.$i;                
                    $part_arr[]="PARTITION $cpart VALUES LESS THAN MAXVALUE";               
                    $part_sql.="ALTER TABLE $tbl PARTITION BY RANGE($field)(";
                    $part_sql.=implode(",",$part_arr);
                    $part_sql.=")";
                    $wpdb->get_results($part_sql);
                }else{
                    if(!empty($part_arr)){
                        $cpart=$part_key.$i;
                        $part_arr[]="PARTITION $cpart VALUES LESS THAN MAXVALUE";
                        $part_sql.=implode(",",$part_arr);      
                        $part_sql.=");";
                        $wpdb->get_results($part_sql);
                    }           
                }
            }
        }

    }
}

Upvotes: -3

Related Questions