Matt Michala
Matt Michala

Reputation: 1

NiFi Create Indexes after Inserting Records into table

I've got my first Process Group that drops indexes in table. Then that routes to another process group the does inserts into table. After successfully inserting the half million rows, I want to create the indexes on the table and analyze it. This is typical Data Warehouse methodology. Can anyone please give advice on how to do this?

I've tried setting counters, but cannot reference counters in Expression Language. I've tried RouteOnAttribute but getting nowhere. Now I'm digging into Wait & Notify Processors - maybe there's a solution there??

I have gotten Counters to count the flow file sql insert statements, but cannot reference the Counter values via Expression Language. Ie this always returns nulls: "${InsertCounter}" where InsertCounter is being set properly it appears via my UpdateCounter process in my flow.

So maybe this code can be used?

In the wait processor set the Target Signal Count to ${fragment.count}.

Set the Release Signal Identifier in both the notify and wait processor to ${fragment.identifier}

nothing works

Upvotes: 0

Views: 809

Answers (2)

Koji
Koji

Reputation: 66

You can use Wait/Notify processors to do that. I assume you're using ExecuteSQL, SplitAvro? If so, the flow will look like:

Split approach

At the 2nd ProcessGroup

  • ExecuteSQL: e.g. 1 output FlowFile containing 5,000 records
  • SpritAvro: creates 5,000 FlowFiles, this processor adds fragment.identifier and fragment.count (=5,000) attributes.
    • split:
      • XXXX: Do some conversion per record
      • PutSQL: Insert records individually
      • Notify: Increase count for the fragment.identifier (Release Signal Identifier) by 1. Executed 5,000 times.
    • original - to the next ProcessGroup

At the 3rd ProcessGroup

  • Wait: waiting for fragment.identifier (Release Signal Identifier) to reach fragment.count (Target Signal Count). This route processes the original FlowFile, so executed only once.
  • PutSQL: Execute a query to create indices and analyze tables

Alternatively, if possible, using Record aware processors would make the flow simpler and more efficient.

Record approach

  • ExecuteSQL: e.g. 1 output FlowFile containing 5,000 records
  • Perform record level conversion: With UpdateRecord or LookupRecord, you can do data processing without splitting records into multiple FlowFiles.
  • PutSQL: Execute a query to create indices and analyze tables. Since the single FlowFile containing all records, no Wait/Notify is required, the output FlowFile can be connected to the downstream flow.

Upvotes: 1

Up_One
Up_One

Reputation: 5271

I Think my suggestion to this question will fit into your scenario as well

How to execute a processor only when another processor is not executing?

Check it out

Upvotes: 0

Related Questions