Jé Queue
Jé Queue

Reputation: 10637

Isochronous data file delivery in multi-process, employ select | poll | other?

I have a large data file that gets constantly (and synchronously) appended-to by measurement devices out in the field. I need to isochronously deliver the most recent data to this file to an online dashboard. I say isochronous because the "dashboard" doesn't care about displaying a stream of data (high latency situation), it just cares about the very last few data points in the file that get sent to it. I cannot guarantee that the rate of file growth is less than my effective outbound throughput.

So I have one ever-appending file but I have multiple processes that need to regularly send the very last block of information out of it. A loose pub-sub of sorts, I guess.

I can:

  1. Poll the file to see if there is growth, then seek back from the EOF for the last block(s) of data,
  2. select()-style and be notified of said change, but then don't I have to seek to the last bit of data anyway?
  3. Have a process shove the last bit of data into shared memory for reading, but would I not then have the same issue I need solving in #1 and #2 because the shared memory writer will behave the same.

Any other suggestions or recommendations?

Upvotes: 0

Views: 279

Answers (1)

ninjalj
ninjalj

Reputation: 43718

If I've understood you correctly, you just want to have a dashboard periodically updated with the last (current) block. Then, an easy option would be:

  • sleep(), stat() file to see if it has changed, send data if appropiate.

On Linux, you can use inotify to be notified when a file changes. This way you can avoid the unnecessary wakeups of the previous approach. So this option would be:

  • wait for notification, if notification arrives stat(), if file has changed send data and sleep() to avoid too frequent updates.

This last one may look something like:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>

#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>

#include <sys/inotify.h>


#define BLOCK_SIZE 5


static ssize_t read_fully(int fd, void *buf, size_t count)
{
    ssize_t ret;
    size_t nread;

    nread = 0;
    do {
            ret = read(fd, buf, count);
            if (ret > 0)
                    nread += ret;
    } while ((ret > 0 && nread < count) || (ret < 0 && errno == EINTR));

    return ret < 0 ? ret : (ssize_t) nread;
}

static void show_current_block(int fd)
{
    static off_t size = 0;  /* non-theadsafe, move fd and size to a
                               context struct passed as an argument
                               if you want thread-safety            */
    signed char block[BLOCK_SIZE];  /* Assume only ASCII-compatible encoding */
    struct stat st;
    ssize_t ret;

    if (fstat(fd, &st) < 0) {
            perror("fstat");
            exit(1);
    }

    /* Handle truncated file */
    if (st.st_size < size) {
            size = 0;
    }

    if (st.st_size >= size + BLOCK_SIZE) {
            size = st.st_size / BLOCK_SIZE * BLOCK_SIZE;

            if (lseek(fd, -BLOCK_SIZE, SEEK_END) < 0) {
                    perror("lseek");
                    exit(1);
            }

            ret = read_fully(fd, block, BLOCK_SIZE);

            if (ret < 0) {
                    perror("read");
                    exit(1);
            }

            if (ret == 0) {
                    fprintf(stderr, "file closed!");
                    exit(1);
            }

            /* Assume only ASCII-compatible encoding, don't print
             * neither C0 control chars, nor > 0x7f chars (including C1)
             */
            printf("Current block: %c%c%c%c%c\n",
                    block[0] < 20 ? '.' : block[0],
                    block[1] < 20 ? '.' : block[1],
                    block[2] < 20 ? '.' : block[2],
                    block[3] < 20 ? '.' : block[3],
                    block[4] < 20 ? '.' : block[4]);

            /* Don't update too often */
            usleep(3000 * 1000);
    }
}

int main(void)
{
    int fd, ifd, wd;
    struct inotify_event ev;
    ssize_t ret;

    fd = open("testfile", O_RDONLY);
    if (fd < 0) {
            perror("open");
            exit(1);
    }

    ifd = inotify_init();
    if (ifd < 0) {
            perror("inotify_init");
            exit(1);
    }

    /* XXX race between open and inotify_add_watch */
    wd = inotify_add_watch(ifd, "testfile", IN_MODIFY);
    if (wd < 0) {
            perror("inotify_add_watch");
            exit(1);
    }

    show_current_block(fd);

    while ((ret = read(ifd, &ev, sizeof(struct inotify_event)))) {
            if (ret < 0) {
                    perror("read inotify watch");
                    exit(1);
            }
            if (ret == 0) {
                    fprintf(stderr, "inotify watch closed!\n");
                    exit(1);
            }
            if (ret != sizeof(struct inotify_event)) {
                    fprintf(stderr, "bad inotify event size %d (expected %d)\n",
                            ret, sizeof(struct inotify_event));
                    exit(1);
            }

            show_current_block(fd);
    }

    return 0;
}

Upvotes: 1

Related Questions