SLIT
SLIT

Reputation: 33

Flume stream to mysql

I have been trying to stream a data into MySQL database using APACHE KAFKA and FLUME. (Here is my flume configuration file)

agent.sources=kafkaSrc
agent.channels=channel1
agent.sinks=jdbcSink

agent.channels.channel1.type=org.apache.flume.channel.kafka.KafkaChannel
agent.channels.channel1.brokerList=localhost:9092
agent.channels.channel1.topic=kafkachannel
agent.channels.channel1.zookeeperConnect=localhost:2181
agent.channels.channel1.capacity=10000
agent.channels.channel1.transactionCapacity=1000


agent.sources.kafkaSrc.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSrc.channels = channel1
agent.sources.kafkaSrc.zookeeperConnect = localhost:2181
agent.sources.kafkaSrc.topic = kafka-mysql

***agent.sinks.jdbcSink.type = How to declare this?***
agent.sinks.jdbcSink.connectionString = jdbc:mysql://1.1.1.1:3306/test
agent.sinks.jdbcSink.username=user
agent.sinks.jdbcSink.password=password
agent.sinks.jdbcSink.batchSize = 10
agent.sinks.jdbcSink.channel =channel1
agent.sinks.jdbcSink.sqlDialect=MYSQL
agent.sinks.jdbcSink.driver=com.mysql.jdbc.Driver
agent.sinks.jdbcSink.sql=(${body:varchar})

I know how to stream data into hadoop or hbase (logger type or hdfs type), However can't find a type to stream into mysql DB. So my question is how do i declare the jdbcSink.type?

Upvotes: 1

Views: 2003

Answers (1)

frb
frb

Reputation: 3798

You could always create a custom sink for MySQL. This is what we did at FIWARE with Cygnus tool.

Feel free to get inspired from it: https://github.com/telefonicaid/fiware-cygnus/blob/master/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMySQLSink.java

It extends this other custom base class for all our sinks: https://github.com/telefonicaid/fiware-cygnus/blob/master/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSISink.java

Basically, you have to extend AbstractSink and implement the Configurable interface. That means to override al least the following methods:

public Status process() throws EventDeliveryException

and:

public void configure(Context context)

respectively.

Upvotes: 1

Related Questions