wawanopoulos
wawanopoulos

Reputation: 9804

Ingest data into warp10 - Performance tip

We're looking for the best way to ingest data in warp10. We are on a Microservices architecture that use Kafka mainly. Two solutions:

As described here, we use Ingress solution as of now, based on an aggregation of data for x seconds, and call the Ingress API to send data per packet. (Instead of calling the API each time we need to insert something).

For few days, we are experimenting with the Kafka Plugin. We successfully set up the plugin and create an .mc2 responsible to consume data from a given topic and then insert them using UPDATE into warp10.

Questions:

  1. Using the Kafka plugin, would it be better to apply the same buffer mechanism as the one applied when we use the Ingress endpoint? Or, is there any specific implementation in warp10 Kafka plugin that allows to consume message per message in the topic and call the UPDATE function for each ?
  2. Today, as both solutions are working, we're trying to find differences to get the best performance results during ingestion of data. And if possible, without having to apply any buffer mechanism because we are trying to be in real-time as much as possible.

MC2 file:

    {                                                                                                                                                                                                                                                                             
    'topics' [ 'our_topic_name' ] // List of Kafka topics to subscribe to                                                                                                                                                                                                    
    'parallelism' 1                // Number of threads to start for processing the incoming messages. Each thread will handle a certain number of partitions.                                                                                                                  
    'config' { // Map of Kafka consumer parameters                                                                                                                                                                                                                              
        'bootstrap.servers' 'kafka-headless:9092'                                                                                                                                                                                                                                  
        'group.id' 'senx-consumer'                                                                                                                                                                                                                                                 
        'enable.auto.commit' 'true'                                                                                                                                                                                                                                                
    }                                                                                                                                                                                                                                                                          
    'macro' <%                                                                                                                                                                                                                                                                  
        // macro executed each time a kafka record is consumed                                                                                                                                                                                                                          
        /*                                                                                                                                                                                                                                                                        
            // received record format :                                                                                                                                                                                                                                           
            {                                                                                                                                                                                                                                                                     
                'timestamp' 123        // The record timestamp                                                                                                                                                                                                                    
                'timestampType' 'type' // The type of timestamp, can be one of 'NoTimestampType', 'CreateTime', 'LogAppendTime'                                                                                                                                                   
                'topic' 'topic_name'   // Name of the topic which received the message                                                                                                                                                                                            
                'offset' 123           // Offset of the message in 'topic'                                                                                                                                                                                                        
                'partition' 123        // Id of the partition which received the message                                                                                                                                                                                          
                'key' ...              // Byte array of the message key                                                                                                                                                                                                           
                'value' ...            // Byte array of the message value                                                                                                                                                                                                         
                'headers' { }          // Map of message headers                                                                                                                                                                                                                  
            }                                                                                                                                                                                                                                                                     
        */                                                                                                                                                                                                                                                                        
        "recordArray" STORE                                                                                                                                                                                                                                                       
                                                                                                                                                                                                                                                                                    
        "preprod.write" "token" STORE                                                                                                                                                                                                                                                 

        // macro can be called on timeout with an empty entry map
        $recordArray SIZE 0 !=
        <%
            $recordArray 'value' GET // kafka record value is retrieved in bytes
            'UTF-8' BYTES-> // convert bytes to string (WARP10 INGRESS format)
            JSON->
            "value" STORE
            "Records received through Kafka" LOGMSG
            $value LOGMSG
            $value
            <%
                DROP
                PARSE
                // PARSE outputs a gtsList, including only one gts
                0 GET
                // GTS rename is required to use UPDATE function
                "gts" STORE
                $gts $gts NAME RENAME
            %>
            LMAP

            // Store GTS in Warp10
            $token
            UPDATE                                                                                                                                                                                                                                                                    
        %>                                                                                                                                                                                                                                                                       
        IFT                                                                                                                                                                                                                                                                      
    %> // end macro                                                                                                                                                                                                                                                            
    'timeout' 10000 // Polling timeout (in ms), if no message is received within this delay, the macro will be called with an empty map as input                                                                                                                 
}

Upvotes: 0

Views: 156

Answers (1)

Pi-r-p
Pi-r-p

Reputation: 32

If you want to cache something in Warp 10 to avoid lots of UPDATE per second, you can use SHM (SHared Memory). This is a built-in extension you need to activate.

Once activated, use it with SHMSTORE and SHMLOAD to keep objects in RAM between two WarpScript executions.

In you example, you can push all the incoming GTS in a list, or a list of list of GTS, using +! to append elements to an existing list.

The MERGE of all the GTS in the cache (by name + labels) and UPDATE in the database can then be done in a runner (don't forget to use a MUTEX)

Don't forget the total operation cost:

  • The ingress format can be optimized for ingestion speed, if you do not repeat classname and labels, and if you gather lines per gts. See here.
  • PARSE deserialize data from the Warp 10 ingress format.
  • UPDATE serialize data to the Warp 10 optimized ingress format (and push it to the update endpoint).
  • the update endpoint deserialize again.

It makes sense to do these deserialize/serialize/deserialize operation if your input data is far from the optimal ingress format. It also make sense if you want to RANGECOMPACT your data to save disk space, or do any preprocessing.

Upvotes: 0

Related Questions