Mattias
Mattias

Reputation: 158

Hive - dynamic partitions: Long loading times with a lot of partitions when updating table

I run Hive via AWS EMR and have a jobflow that parses log data frequently into S3. I use dynamic partitions (date and log level) for my parsed Hive table.

One thing that is taking forever now when I have several gigabytes of data and a lot of partitions is when Hive is loading data to the table after the parsing is done.

Loading data to table default.logs partition (dt=null, level=null)
    ...
    Loading partition {dt=2013-08-06, level=INFO}
    Loading partition {dt=2013-03-12, level=ERROR}
    Loading partition {dt=2013-08-03, level=WARN}
    Loading partition {dt=2013-07-08, level=INFO}
    Loading partition {dt=2013-08-03, level=ERROR}
    ...

    Partition default.logs{dt=2013-03-05, level=INFO} stats: [num_files: 1, num_rows: 0, total_size: 1905, raw_data_size: 0]
    Partition default.logs{dt=2013-03-06, level=ERROR} stats: [num_files: 1, num_rows: 0, total_size: 4338, raw_data_size: 0]
    Partition default.logs{dt=2013-03-06, level=INFO} stats: [num_files: 1, num_rows: 0, total_size: 828250, raw_data_size: 0]
    ...
    Partition default.logs{dt=2013-08-14, level=INFO} stats: [num_files: 5, num_rows: 0, total_size: 626629, raw_data_size: 0]
    Partition default.logs{dt=2013-08-14, level=WARN} stats: [num_files: 4, num_rows: 0, total_size: 4405, raw_data_size: 0]

Is there a way to overcome this problem and reduce the loading times for this step?

I have already tried to archive old logs to Glacier via a bucket lifecycle rule in hopes that Hive would skip loading the archived partitions. Well, since this still keeps the file(path)s visible in S3 Hive recognizes the archived partitions anyway so no performance is gained.

Update 1

The loading of the data is done by simple inserting the data into the dynamically partitioned table

INSERT INTO TABLE logs PARTITION (dt, level)
SELECT time, thread, logger, identity, message, logtype, logsubtype, node, storageallocationstatus, nodelist, userid, nodeid, path, datablockid, hash, size, value, exception, server, app, version, dt, level
FROM new_logs ;

from one table that contain the unparsed logs

CREATE EXTERNAL TABLE new_logs (
  dt STRING,
  time STRING,
  thread STRING,
  level STRING,
  logger STRING,
  identity STRING,
  message STRING,
  logtype STRING,
  logsubtype STRING,
  node STRING,
  storageallocationstatus STRING,
  nodelist STRING,
  userid STRING,
  nodeid STRING,
  path STRING,
  datablockid STRING,
  hash STRING,
  size STRING,
  value STRING,
  exception STRING,
  version STRING
)
PARTITIONED BY (
  server STRING,
  app STRING
)
ROW FORMAT
  DELIMITED
  FIELDS TERMINATED BY '\t'
STORED AS
  INPUTFORMAT 'org.maz.hadoop.mapred.LogFileInputFormat'
  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION 's3://my-log/logs/${LOCATION}' ;

into the new (parsed) table

CREATE EXTERNAL TABLE logs (
  time STRING,
  thread STRING,
  logger STRING,
  identity STRING,
  message STRING,
  logtype STRING,
  logsubtype STRING,
  node STRING,
  storageallocationstatus STRING,
  nodelist STRING,
  userid STRING,
  nodeid STRING,
  path STRING,
  datablockid STRING,
  hash STRING,
  size STRING,
  exception STRING,
  value STRING,
  server STRING,
  app STRING,
  version STRING
)
PARTITIONED BY (
  dt STRING,
  level STRING
)
ROW FORMAT
  DELIMITED
  FIELDS TERMINATED BY '\t'
  LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION 's3://my-log/parsed-logs' ;

The input format (LogFileInputFormat) is responsible of parsing log entries to the desired log format.

Update 2

When I try the following

INSERT INTO TABLE logs PARTITION (dt, level)
SELECT time, thread, logger, identity, message, logtype, logsubtype, node, storageallocationstatus, nodelist, userid, nodeid, path, datablockid, hash, size, value, exception, server, app, version, dt, level
FROM new_logs
WHERE dt > 'some old date';

Hive still loads all partitions in logs. If I on the other hand use static partitioning like

INSERT INTO TABLE logs PARTITION (dt='some date', level)
SELECT time, thread, logger, identity, message, logtype, logsubtype, node, storageallocationstatus, nodelist, userid, nodeid, path, datablockid, hash, size, value, exception, server, app, version, level
FROM new_logs
WHERE dt = 'some date';

Hive only loads the concerned partitions, but then I need to create one query for each date I think might be present in new_logs. Usually new_logs only contain log entries from today and yesterday it but might contain older entries as well.

Static partitioning are my solution of choice at the moment but aren't there any other (better) solutions to my problem?

Upvotes: 4

Views: 4956

Answers (2)

Jay
Jay

Reputation: 1072

AWS has improved HIVE Partition recovery time by more than an order of magnitude on EMR 3.2.x and above.

We have a HIVE table that has more than 20,000 partitions on S3. With prior versions of EMR, it used to take ~80 minutes to recover and now with 3.2.x/3.3.x, we are able to do it under 5 minutes.

Upvotes: 0

Carter Shanklin
Carter Shanklin

Reputation: 3047

During this slow phase, Hive takes the files it built for each partition and moves it from a temporary directory to a permanent directory. You can see this in the "explain extended" called a Move Operator.

So for each partition it's one move and an update to the metastore. I don't use EMR but I presume this act of moving files to S3 has high latency for each file it needs to move.

What's not clear from what you wrote is whether you're doing a full load each time you run. For example why do you have a 2013-03-05 partition? Are you getting new log data that contains this old date? If this data is already in your logs table you should modify your insert statement like

SELECT fields
FROM new_logs
WHERE dt > 'date of last run';

This way you'll only get a few buckets and only a few files to move. It's still wasteful to scan all this extra data from new_logs but you can solve that by partitioning new_logs.

Upvotes: 1

Related Questions