Reputation: 1202
I have cygnus working automatically just fine in row mode. But I want to switch to column mode, this means that I now have to manually set up my tables with appropriate column types (right?).
I figured that the column type of the attributes should correspond to the ones I specified in context broker (i.e Temperature: float; pressure: integer; and so on), am I correct?.
BUT what about these types:
recvTime (Im guessing datetime?); fiwareservicepath (string?); entityId (integer?); entityType (string?); temperature_md (is float same as Temperature or what?); pressure_md (is it integer same as pressure or what?);
_md columns; fiwareservicepath
Can I remove those?
EDIT1: I tried puting the types as I speculated in step 3 and got the following output in the log:
11 Dec 2015 15:22:12,783 INFO [conf-file-poller-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:167) - Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.event.take.attempt == 1
11 Dec 2015 15:22:12,784 INFO [conf-file-poller-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:167) - Shutdown Metric for type: CHANNEL, name: mysql-channel. channel.event.take.success == 0
11 Dec 2015 15:22:12,785 INFO [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:138) - Starting new configuration:{ sourceRunners:{http-source=EventDrivenSourceRunner: { source:org.apache.flume.source.http.HTTPSource{name:http-source,state:START} }} sinkRunners:{mysql-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2e5babc counterGroup:{ name:null counters:{runner.interruptions=1, runner.backoffs.consecutive=1, runner.backoffs=1} } }} channels:{mysql-channel=org.apache.flume.channel.MemoryChannel{name: mysql-channel}} }
11 Dec 2015 15:22:12,787 INFO [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:145) - Starting Channel mysql-channel
11 Dec 2015 15:22:12,789 INFO [lifecycleSupervisor-1-4] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:94) - Component type: CHANNEL, name: mysql-channel started
11 Dec 2015 15:22:12,792 INFO [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:173) - Starting Sink mysql-sink
11 Dec 2015 15:22:12,793 INFO [lifecycleSupervisor-1-2] (com.telefonica.iot.cygnus.sinks.OrionMySQLSink.start:152) - [mysql-sink] Startup completed
11 Dec 2015 15:22:12,794 INFO [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:184) - Starting Source http-source
11 Dec 2015 15:22:12,800 INFO [lifecycleSupervisor-1-1] (com.telefonica.iot.cygnus.interceptors.GroupingInterceptor.initialize:92) - Grouping rules read:
11 Dec 2015 15:22:12,802 ERROR [lifecycleSupervisor-1-1] (com.telefonica.iot.cygnus.interceptors.GroupingInterceptor.parseGroupingRules:165) - Error while parsing the Json-based grouping rules file. Details=null
11 Dec 2015 15:22:12,803 WARN [lifecycleSupervisor-1-1] (com.telefonica.iot.cygnus.interceptors.GroupingInterceptor.initialize:98) - Grouping rules syntax has errors
11 Dec 2015 15:22:12,804 INFO [lifecycleSupervisor-1-1] (org.mortbay.log.Slf4jLog.info:67) - jetty-6.1.26
11 Dec 2015 15:22:12,809 INFO [lifecycleSupervisor-1-1] (org.mortbay.log.Slf4jLog.info:67) - Started [email protected]:5050
11 Dec 2015 15:22:12,810 INFO [lifecycleSupervisor-1-1] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:94) - Component type: SOURCE, name: http-source started
11 Dec 2015 15:22:46,806 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:246) - Batch accumulation time reached, the batch will be processed as it is
EDIT2: here is the create table script I am using:
CREATE TABLE sensor_room1_room (
sensor_room1_roomID INT NOT NULL AUTO_INCREMENT,
recvTime varchar(40),
fiwareservicepath varchar(40),
entityId int (10),
entityType varchar (40),
pressure int (3),
pressure_md int(3),
temperature float (5),
temperature_md float(5),
PRIMARY KEY (sensor_room1_roomID));
EDIT3: Something is very wrong here, and I have nothing to go on. Take a look at my table structure:
mysql> describe sensor;
+-------------------+------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+------------+------+-----+---------+-------+
| recvTime | mediumtext | YES | | NULL | |
| fiwareservicepath | text | YES | | NULL | |
| entityId | text | YES | | NULL | |
| entityType | text | YES | | NULL | |
| pressure | text | YES | | NULL | |
| pressure_md | text | YES | | NULL | |
| temperature | text | YES | | NULL | |
| temperature_md | text | YES | | NULL | |
+-------------------+------------+------+-----+---------+-------+
8 rows in set (0.00 sec)
I am passing this NGSI command:
(curl localhost:1026/v1/updateContext -s -S --header 'Content-Type: application/json' \
--header 'Accept: application/json' -d @- | python -mjson.tool) <<EOF
{
"contextElements": [
{
"type": "Room",
"isPattern": "false",
"id": "Room1",
"attributes": [
{
"name": "temperature",
"type": "float",
"value": "321"
},
{
"name": "pressure",
"type": "integer",
"value": "123"
}
]
}
],
"updateAction": "APPEND"
}
EOF
And I get this error in the log:
14 Dec 2015 14:42:46,248 INFO [1161924167@qtp-1635328039-1] (com.telefonica.iot.cygnus.handlers.OrionRestHandler.getEvents:255) - Event put in the channel (id=110985483, ttl=10)
14 Dec 2015 14:43:09,258 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:246) - Batch accumulation time reached, the batch will be processed as it is
14 Dec 2015 14:43:09,266 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionMySQLSink.persistAggregation:429) - [mysql-sink] Persisting data at OrionMySQLSink. Database (trace_data), Table (sensor), Fields ((recvTime,fiwareservicepath,entityId,entityType,temperature,temperature_md)), Values (('2015-12-14T13:42:46.206Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.170Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.220Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.223Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.225Z','sensor','Room1','Room','321','[]'),('2015-12-14T13:42:46.228Z','sensor','Room1','Room','123','[]','321','[]'),('2015-12-14T13:42:46.248Z','sensor','Room1','Room','123','[]','321','[]'))
14 Dec 2015 14:43:09,300 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:318) - Bad context data (Column count doesn't match value count at row 6)
14 Dec 2015 14:43:09,300 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:323) - Finishing transaction (1450100555-186-0000000001,1450100555-186-0000000000,1450100555-186-0000000002,1450100555-186-0000000003,1450100555-186-0000000005,1450100555-186-0000000004,1450100555-186-0000000006)
14 Dec 2015 14:43:39,305 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:246) - Batch accumulation time reached, the batch will be processed as it is
14 Dec 2015 14:43:39,305 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:295) - Finishing transaction ()
7. There is a conflict here (Column count doesn't match value count at row 6) I cannot solve. Any suggestions?
8. I still need to include the tableID in there, but dont know how?
I should here stress that my DB of choice here is only MySql.
Upvotes: 4
Views: 191
Reputation: 3798
I'll answer your questions bullet by bullet:
OrionMySQLSink
the tables must be provisioned in advance. You can find an explanation on this here.OrionMySQLSink
cannot automatically create the tables. Anyway, the attribute values will be "casted" by the MySQL driver to the MySQL types you stated in the table definition. Can you edit the question showing your table creation command, please?OrionMySQLSink
will expect to find those metadata fields.OrionMySQLSink
does not add any primary key by its own. That's something you can add by using an auot-increment key, as you propose.EDIT
the column mode is only recommended if your subscription is designed for always sending the same attributes, event if they were not updated since the last notification.
Another solution could be to move to a configured batch_size
of 1; your performance may not be so good, but each notification will be processed independently of the others and, thus, each MySQL query will have the query fields matching the query values.
Upvotes: 2