ewm
ewm

Reputation: 13

Live tail a CSV from an FTP server

I am trying to live tail a CSV file from the FTP server of a data logger using pandas. I am trying to implement a solution from Live tail a CSV using Pandas to process, but am having issues with reading new lines. The program will read all available lines, but after that only reads empty lines. this is the output when printing every read line:

b'+1.000000000E+01,+1.98000E+01,+3.15500E-04,-4.23700E-03,-3.20300E-03,-1.13750E-03,-3.26900E-03,+1.47450E-03,-3.39800E-03,-1.05500E-04,-4.24500E-03,0\r\n'
b'+1.100000000E+01,+1.98000E+01,-2.63000E-04,-4.24300E-03,-3.23350E-03,-4.79000E-04,-3.26000E-03,+1.43450E-03,-3.49800E-03,-6.47500E-04,-4.11750E-03,0\r\n'
b'+1.200000000E+01,+1.98000E+01,-9.49500E-04,-3.80900E-03,-3.22450E-03,+1.14500E-04,-3.26800E-03,+1.26850E-03,-3.63450E-03,-1.32050E-03,-3.59800E-03,0\r\n'
...
b''
b''
b''
b''

code:

from ftplib import FTP
import os
import pandas as pd
import time
from io import StringIO
from io import BytesIO

ftp = FTP('192.168.10.100')
ftp.login()

def follow(thefile):
    global ftp
    with BytesIO() as flo:
        ftp.retrbinary('RETR ' + thefile, flo.write)
        flo.seek(0)
        while True:
            line = flo.readline()
            print(line)
            if not line or not line.endswith(b'\n'):
                time.sleep(1)
                continue
            yield line.decode('utf-8')

if __name__ == "__main__":
    global df
    # set the file we want to log the current line to
    log_file = "./current_line"

    # check if the last line processed has been saved
    if os.path.exists(log_file):
        with open(log_file, 'r') as ifile:
            # get the last line processed
            start_line = int(ifile.read())
    else:
        # set the last line processed to be the first data row (not the header).  If there is no header then set to 0
        start_line = 1

    # set the file we are reading
    myfile = '/sdcard/HIOKI/LR8450/DATA/24-04-12/tmpWvData240412_172748.CSV'

    # remove this line if you don't need the header
    # flo.seek(0)
    # header = pd.read_csv(flo, nrows=0, on_bad_lines='skip', engine="python", encoding='unicode_escape')

    # initialize the list to store the lines in
    lines = []

    # loop through each line in the file
    for nline, line in enumerate(follow(myfile)):
        # if we have already processed this file
        if nline < start_line:
            continue
        # append to the lines list
        lines.append(line)

        # check if the we have hit the number of lines we want to handle
        if len(lines) == 10:
            # read the csv from the lines we have processed
            df = pd.read_csv(StringIO(''.join(lines)), header=None, on_bad_lines='skip', engine="python", encoding='unicode_escape')
            # update the header.  Delete this row if there is no header
            df.columns = header.columns

            # do something with df
            print(df)

            # reset the lines list
            lines = []  

            # open the log file and note the line we have processed up to
            with open(log_file, 'w') as lfile:
                lfile.write(str(nline))  # only write the processed lines when we have actually done something

Upvotes: 1

Views: 107

Answers (2)

ewm
ewm

Reputation: 13

i was able to do this using FTP.retrbinary, where prev_size is initially set when the initial data dataframe is created

def get_data():
    global prev_size, data

    size = max(prev_size, 0) #calculate the added bytes to the file
    prev_size = ftp.size(file_name) #update previous size
    flo2 = BytesIO()
    ftp.retrbinary('RETR ' + file_name, flo2.write, rest=size)
    flo2.seek(0)

    new_data = pd.read_csv(flo2)

Upvotes: 0

Martin Prikryl
Martin Prikryl

Reputation: 202534

Your code downloads state of the file at the time of the retrbinary call. And that's it, it won't keep reading.

FTP protocol does not have tail-like API. All you can do, is to keep starting new downloads in a loop, yielding only new data.

You can use rest parameter of FTP.retrbinary to download only the new part of the file.

For similar question/code, see:
Resume FTP download after timeout

Upvotes: 1

Related Questions