Jim Macaulay
Jim Macaulay

Reputation: 5141

Stream CSV data in Kafka-Python

Am sending the CSV data to Kafka topic using Kafka-Python. Data is sent and received by Consumer successfully. Now am trying to stream a csv file continuously, any new entry added to the file should be automatically sent to Kafka topic. Any suggestion would be helpful on continuous streaming of CSV file

Below is my existing code,

   from kafka import KafkaProducer
   import logging
   from json import dumps, loads
   import csv
   logging.basicConfig(level=logging.INFO)


   producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda 
   K:dumps(K).encode('utf-8'))

   with open('C:/Hadoop/Data/Job.csv', 'r') as file:
   reader = csv.reader(file, delimiter = '\t')
       for messages in reader:
       producer.send('Jim_Topic', messages)
       producer.flush()

Upvotes: 0

Views: 6125

Answers (1)

Robin Moffatt
Robin Moffatt

Reputation: 32090

Kafka Connect (part of Apache Kafka) is a good way to do ingest and egress between Kafka and other systems, including flat files.

You can use the Kafka Connect SpoolDir connector to stream CSV files into Kafka. Install it from Confluent Hub, and then provide it with configuration for your source file:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config \
    -d '{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "orders_spooldir_00",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv",
        "schema.generation.enabled":"true",
        "csv.first.row.as.header":"true"
        }'

See my blog for more examples and details.

Upvotes: 1

Related Questions