Vrankela
Vrankela

Reputation: 1202

How to properly set up column mode in cygnus?

  1. I have cygnus working automatically just fine in row mode. But I want to switch to column mode, this means that I now have to manually set up my tables with appropriate column types (right?).

  2. I figured that the column type of the attributes should correspond to the ones I specified in context broker (i.e Temperature: float; pressure: integer; and so on), am I correct?.

  3. BUT what about these types:

recvTime (Im guessing datetime?); fiwareservicepath (string?); entityId (integer?); entityType (string?); temperature_md (is float same as Temperature or what?); pressure_md (is it integer same as pressure or what?);

  1. Furthermore I could really do without the following columns:

_md columns; fiwareservicepath

Can I remove those?

  1. And finally where is the primary key in this scenario? Can I just manually add ID and set it as auto-increment without encountering any conflicts with Cygnus?

EDIT1: I tried puting the types as I speculated in step 3 and got the following output in the log:

11 Dec 2015 15:22:12,783 INFO  [conf-file-poller-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:167)  - Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.event.take.attempt == 1
11 Dec 2015 15:22:12,784 INFO  [conf-file-poller-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:167)  - Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.event.take.success == 0
11 Dec 2015 15:22:12,785 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:138)  - Starting new configuration:{ sourceRunners:{http-source=EventDrivenSourceRunner: { source:org.apache.flume.source.http.HTTPSource{name:http-source,state:START} }} sinkRunners:{mysql-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2e5babc counterGroup:{ name:null counters:{runner.interruptions=1, runner.backoffs.consecutive=1, runner.backoffs=1} } }} channels:{mysql-channel=org.apache.flume.channel.MemoryChannel{name: mysql-channel}} }
11 Dec 2015 15:22:12,787 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:145)  - Starting Channel mysql-channel
11 Dec 2015 15:22:12,789 INFO  [lifecycleSupervisor-1-4] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:94)  - Component type: CHANNEL, name: mysql-channel started
11 Dec 2015 15:22:12,792 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:173)  - Starting Sink mysql-sink
11 Dec 2015 15:22:12,793 INFO  [lifecycleSupervisor-1-2] (com.telefonica.iot.cygnus.sinks.OrionMySQLSink.start:152)  - [mysql-sink] Startup completed
11 Dec 2015 15:22:12,794 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:184)  - Starting Source http-source
11 Dec 2015 15:22:12,800 INFO  [lifecycleSupervisor-1-1] (com.telefonica.iot.cygnus.interceptors.GroupingInterceptor.initialize:92)  - Grouping rules read: 
11 Dec 2015 15:22:12,802 ERROR [lifecycleSupervisor-1-1] (com.telefonica.iot.cygnus.interceptors.GroupingInterceptor.parseGroupingRules:165)  - Error while parsing the Json-based grouping rules file. Details=null
11 Dec 2015 15:22:12,803 WARN  [lifecycleSupervisor-1-1] (com.telefonica.iot.cygnus.interceptors.GroupingInterceptor.initialize:98)  - Grouping rules syntax has errors
11 Dec 2015 15:22:12,804 INFO  [lifecycleSupervisor-1-1] (org.mortbay.log.Slf4jLog.info:67)  - jetty-6.1.26
11 Dec 2015 15:22:12,809 INFO  [lifecycleSupervisor-1-1] (org.mortbay.log.Slf4jLog.info:67)  - Started [email protected]:5050
11 Dec 2015 15:22:12,810 INFO  [lifecycleSupervisor-1-1] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:94)  - Component type: SOURCE, name: http-source started
11 Dec 2015 15:22:46,806 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:246)  - Batch accumulation time reached, the batch will be processed as it is
  1. So I am guessing that I should do something about the matching rules. But I have no idea what? I tried using the link provided in the grouping_rules.conf, but it returns a 404.

EDIT2: here is the create table script I am using:

CREATE TABLE sensor_room1_room (
sensor_room1_roomID INT NOT NULL AUTO_INCREMENT, 
recvTime varchar(40), 
fiwareservicepath varchar(40), 
entityId int (10), 
entityType varchar (40), 
pressure int (3), 
pressure_md int(3), 
temperature float (5), 
temperature_md float(5), 
PRIMARY KEY (sensor_room1_roomID));

EDIT3: Something is very wrong here, and I have nothing to go on. Take a look at my table structure:

mysql> describe sensor;
+-------------------+------------+------+-----+---------+-------+
| Field             | Type       | Null | Key | Default | Extra |
+-------------------+------------+------+-----+---------+-------+
| recvTime          | mediumtext | YES  |     | NULL    |       |
| fiwareservicepath | text       | YES  |     | NULL    |       |
| entityId          | text       | YES  |     | NULL    |       |
| entityType        | text       | YES  |     | NULL    |       |
| pressure          | text       | YES  |     | NULL    |       |
| pressure_md       | text       | YES  |     | NULL    |       |
| temperature       | text       | YES  |     | NULL    |       |
| temperature_md    | text       | YES  |     | NULL    |       |
+-------------------+------------+------+-----+---------+-------+
8 rows in set (0.00 sec)

I am passing this NGSI command:

(curl localhost:1026/v1/updateContext -s -S --header 'Content-Type: application/json' \
    --header 'Accept: application/json' -d @- | python -mjson.tool) <<EOF
{
    "contextElements": [
        {
            "type": "Room",
            "isPattern": "false",
            "id": "Room1",
            "attributes": [
                {
                    "name": "temperature",
                    "type": "float",
                    "value": "321"
                },
                {
                    "name": "pressure",
                    "type": "integer",
                    "value": "123"
                }
            ]
        }
    ],
    "updateAction": "APPEND"
} 
EOF

And I get this error in the log:

14 Dec 2015 14:42:46,248 INFO  [1161924167@qtp-1635328039-1] (com.telefonica.iot.cygnus.handlers.OrionRestHandler.getEvents:255)  - Event put in the channel (id=110985483, ttl=10)
14 Dec 2015 14:43:09,258 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:246)  - Batch accumulation time reached, the batch will be processed as it is
14 Dec 2015 14:43:09,266 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionMySQLSink.persistAggregation:429)  - [mysql-sink] Persisting data at OrionMySQLSink. Database (trace_data), Table (sensor), Fields ((recvTime,fiwareservicepath,entityId,entityType,temperature,temperature_md)), Values (('2015-12-14T13:42:46.206Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.170Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.220Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.223Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.225Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.228Z','sensor','Room1','Room','123','[]','321','[]'),('2015-12-14T13:42:46.248Z','sensor','Room1','Room','123','[]','321','[]'))
14 Dec 2015 14:43:09,300 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:318)  - Bad context data (Column count doesn't match value count at row 6)
14 Dec 2015 14:43:09,300 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:323)  - Finishing transaction (1450100555-186-0000000001,1450100555-186-0000000000,1450100555-186-0000000002,1450100555-186-0000000003,1450100555-186-0000000005,1450100555-186-0000000004,1450100555-186-0000000006)
14 Dec 2015 14:43:39,305 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:246)  - Batch accumulation time reached, the batch will be processed as it is
14 Dec 2015 14:43:39,305 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:295)  - Finishing transaction ()

7. There is a conflict here (Column count doesn't match value count at row 6) I cannot solve. Any suggestions?

8. I still need to include the tableID in there, but dont know how?

I should here stress that my DB of choice here is only MySql.

Upvotes: 4

Views: 191

Answers (1)

frb
frb

Reputation: 3798

I'll answer your questions bullet by bullet:

  1. Yes, when working in the column mode with OrionMySQLSink the tables must be provisioned in advance. You can find an explanation on this here.
  2. It is not mandatory because the types stated at Orion Context Broker don't mean anything; they have no real semantic, but they have semantic for you, the user. I mean, you could say a temperature attribute has a "float" type, or a "centigrade degrees" type, or "potato". This is why the column mode of OrionMySQLSink cannot automatically create the tables. Anyway, the attribute values will be "casted" by the MySQL driver to the MySQL types you stated in the table definition. Can you edit the question showing your table creation command, please?
  3. Value per value:
    • recvTime --> usually DATETIME, but could also be TEXT or VARCHAR, depending on how you want to use this value.
    • fiwareservicepath --> usually TEXT, but could also be INTEGER, depending if it is a string or a number.
    • entityId --> usually TEXT, but could also be INTEGER, depending if it is a string or a number.
    • entityType --> usually TEXT, but could also be INTEGER, depending if it is a string or a number.
    • temperature_md --> usually TEXT, since this can be a Json value, nevertheless I think recent versions of MySQL support the JSON type.
    • pressure_md --> usually TEXT, since this can be a Json value, nevertheless I think recent versions of MySQL support the JSON type.
  4. You cannot remove them since OrionMySQLSink will expect to find those metadata fields.
  5. OrionMySQLSink does not add any primary key by its own. That's something you can add by using an auot-increment key, as you propose.
  6. Not necessarily. In fact, the grouping rules file can be empty despite such an error (Cygnus will run after all).

EDIT

  1. The problem is about the fact not all the notifications you are receiving at Cygnus have the same attribute length. Thus, when aggregating them in a batch, there are problems with number of fields and values. If you have a look on the official documentation, you'll see that it is required all the attributes are sent when working in column mode:

    the column mode is only recommended if your subscription is designed for always sending the same attributes, event if they were not updated since the last notification.

Another solution could be to move to a configured batch_size of 1; your performance may not be so good, but each notification will be processed independently of the others and, thus, each MySQL query will have the query fields matching the query values.

  1. Simply create the table with this auto-incremet field, and each time Cygnus inserts new rows, this field will be updated automatically.

Upvotes: 2

Related Questions