mback2k
mback2k

Reputation: 441

Windows API: Wait for data to be available on non-GUI console input (PIPE-based STDIN)

Background

I am currently working on a Windows select-like function that not only supports SOCKET handles, but also other kinds of waitable handles. My goal is to wait on standard console handles in order to provide select-functionality to the curl testsuite.

The related program can be found in the curl git repository: sockfilt.c

Question

Is it possible to wait for data to be available on a non-GUI-based console input? The issue is that WaitFor* methods do not support PIPE handles and therefore STDIN is not supported if the process input is fed from another process, e.g. using the pipe | functionality of cmd.


The following example program illustrates the problem: select_ws.c

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#include <windows.h>
#include <winsock2.h>
#include <malloc.h>

#include <conio.h>
#include <fcntl.h>

#define SET_SOCKERRNO(x)  (WSASetLastError((int)(x)))

typedef SOCKET curl_socket_t;

/*
 * select function with support for WINSOCK2 sockets and all
 * other handle types supported by WaitForMultipleObjectsEx.
 * http://msdn.microsoft.com/en-us/library/windows/desktop/ms687028.aspx
 * http://msdn.microsoft.com/en-us/library/windows/desktop/ms741572.aspx
 */
static int select_ws(int nfds, fd_set *readfds, fd_set *writefds,
                     fd_set *exceptfds, struct timeval *timeout)
{
  long networkevents;
  DWORD milliseconds, wait, idx, avail, events, inputs;
  WSAEVENT wsaevent, *wsaevents;
  WSANETWORKEVENTS wsanetevents;
  INPUT_RECORD *inputrecords;
  HANDLE handle, *handles;
  curl_socket_t sock, *fdarr, *wsasocks;
  int error, fds;
  DWORD nfd = 0, wsa = 0;
  int ret = 0;

  /* check if the input value is valid */
  if(nfds < 0) {
    SET_SOCKERRNO(EINVAL);
    return -1;
  }

  /* check if we got descriptors, sleep in case we got none */
  if(!nfds) {
    Sleep((timeout->tv_sec * 1000) + (timeout->tv_usec / 1000));
    return 0;
  }

  /* allocate internal array for the original input handles */
  fdarr = malloc(nfds * sizeof(curl_socket_t));
  if(fdarr == NULL) {
    SET_SOCKERRNO(ENOMEM);
    return -1;
  }

  /* allocate internal array for the internal event handles */
  handles = malloc(nfds * sizeof(HANDLE));
  if(handles == NULL) {
    SET_SOCKERRNO(ENOMEM);
    return -1;
  }

  /* allocate internal array for the internal socket handles */
  wsasocks = malloc(nfds * sizeof(curl_socket_t));
  if(wsasocks == NULL) {
    SET_SOCKERRNO(ENOMEM);
    return -1;
  }

  /* allocate internal array for the internal WINSOCK2 events */
  wsaevents = malloc(nfds * sizeof(WSAEVENT));
  if(wsaevents == NULL) {
    SET_SOCKERRNO(ENOMEM);
    return -1;
  }

  /* loop over the handles in the input descriptor sets */
  for(fds = 0; fds < nfds; fds++) {
    networkevents = 0;
    handles[nfd] = 0;

    if(FD_ISSET(fds, readfds))
      networkevents |= FD_READ|FD_ACCEPT|FD_CLOSE;

    if(FD_ISSET(fds, writefds))
      networkevents |= FD_WRITE|FD_CONNECT;

    if(FD_ISSET(fds, exceptfds))
      networkevents |= FD_OOB;

    /* only wait for events for which we actually care */
    if(networkevents) {
      fdarr[nfd] = (curl_socket_t)fds;
      if(fds == fileno(stdin)) {
        handles[nfd] = GetStdHandle(STD_INPUT_HANDLE);
      }
      else if(fds == fileno(stdout)) {
        handles[nfd] = GetStdHandle(STD_OUTPUT_HANDLE);
      }
      else if(fds == fileno(stderr)) {
        handles[nfd] = GetStdHandle(STD_ERROR_HANDLE);
      }
      else {
        wsaevent = WSACreateEvent();
        if(wsaevent != WSA_INVALID_EVENT) {
          error = WSAEventSelect(fds, wsaevent, networkevents);
          if(error != SOCKET_ERROR) {
            handles[nfd] = wsaevent;
            wsasocks[wsa] = (curl_socket_t)fds;
            wsaevents[wsa] = wsaevent;
            wsa++;
          }
          else {
            handles[nfd] = (HANDLE)fds;
            WSACloseEvent(wsaevent);
          }
        }
      }
      nfd++;
    }
  }

  /* convert struct timeval to milliseconds */
  if(timeout) {
    milliseconds = ((timeout->tv_sec * 1000) + (timeout->tv_usec / 1000));
  }
  else {
    milliseconds = INFINITE;
  }

  /* wait for one of the internal handles to trigger */
  wait = WaitForMultipleObjectsEx(nfd, handles, FALSE, milliseconds, FALSE);

  /* loop over the internal handles returned in the descriptors */
  for(idx = 0; idx < nfd; idx++) {
    fds = fdarr[idx];
    handle = handles[idx];
    sock = (curl_socket_t)fds;

    /* check if the current internal handle was triggered */
    if(wait != WAIT_FAILED && (wait - WAIT_OBJECT_0) >= idx &&
       WaitForSingleObjectEx(handle, 0, FALSE) == WAIT_OBJECT_0) {
      /* try to handle the event with STD* handle functions */
      if(fds == fileno(stdin)) {
        /* check if there is no data in the input buffer */
        if(!stdin->_cnt) {
          /* check if we are getting data from a PIPE */
          if(!GetConsoleMode(handle, &avail)) {
            /* check if there is no data from PIPE input */
            if(!PeekNamedPipe(handle, NULL, 0, NULL, &avail, NULL))
              avail = 0;
            if(!avail)
              FD_CLR(sock, readfds);
          } /* check if there is no data from keyboard input */
          else if (!_kbhit()) {
            /* check if there are INPUT_RECORDs in the input buffer */
            if(GetNumberOfConsoleInputEvents(handle, &events)) {
              if(events > 0) {
                /* remove INPUT_RECORDs from the input buffer */
                inputrecords = (INPUT_RECORD*)malloc(events *
                                                     sizeof(INPUT_RECORD));
                if(inputrecords) {
                  if(!ReadConsoleInput(handle, inputrecords,
                                       events, &inputs))
                    inputs = 0;
                  free(inputrecords);
                }

                /* check if we got all inputs, otherwise clear buffer */
                if(events != inputs)
                  FlushConsoleInputBuffer(handle);
              }
            }

            /* remove from descriptor set since there is no real data */
            FD_CLR(sock, readfds);
          }
        }

        /* stdin is never ready for write or exceptional */
        FD_CLR(sock, writefds);
        FD_CLR(sock, exceptfds);
      }
      else if(fds == fileno(stdout) || fds == fileno(stderr)) {
        /* stdout and stderr are never ready for read or exceptional */
        FD_CLR(sock, readfds);
        FD_CLR(sock, exceptfds);
      }
      else {
        /* try to handle the event with the WINSOCK2 functions */
        error = WSAEnumNetworkEvents(fds, NULL, &wsanetevents);
        if(error != SOCKET_ERROR) {
          /* remove from descriptor set if not ready for read/accept/close */
          if(!(wsanetevents.lNetworkEvents & (FD_READ|FD_ACCEPT|FD_CLOSE)))
            FD_CLR(sock, readfds);

          /* remove from descriptor set if not ready for write/connect */
          if(!(wsanetevents.lNetworkEvents & (FD_WRITE|FD_CONNECT)))
            FD_CLR(sock, writefds);

          /* remove from descriptor set if not exceptional */
          if(!(wsanetevents.lNetworkEvents & FD_OOB))
            FD_CLR(sock, exceptfds);
        }
      }

      /* check if the event has not been filtered using specific tests */
      if(FD_ISSET(sock, readfds) || FD_ISSET(sock, writefds) ||
         FD_ISSET(sock, exceptfds)) {
        ret++;
      }
    }
    else {
      /* remove from all descriptor sets since this handle did not trigger */
      FD_CLR(sock, readfds);
      FD_CLR(sock, writefds);
      FD_CLR(sock, exceptfds);
    }
  }

  for(idx = 0; idx < wsa; idx++) {
    WSAEventSelect(wsasocks[idx], NULL, 0);
    WSACloseEvent(wsaevents[idx]);
  }

  free(wsaevents);
  free(wsasocks);
  free(handles);
  free(fdarr);

  return ret;
}

int main(void)
{
  WORD wVersionRequested;
  WSADATA wsaData;
  SOCKET sock[4];
  struct sockaddr_in sockaddr[4];
  fd_set readfds;
  fd_set writefds;
  fd_set exceptfds;
  SOCKET maxfd = 0;
  int selfd = 0;
  void *buffer = malloc(1024);
  ssize_t nread;

  setmode(fileno(stdin), O_BINARY);

  wVersionRequested = MAKEWORD(2, 2);
  WSAStartup(wVersionRequested, &wsaData);

  sock[0] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  sockaddr[0].sin_family = AF_INET;
  sockaddr[0].sin_addr.s_addr = inet_addr("74.125.134.26");
  sockaddr[0].sin_port = htons(25);
  connect(sock[0], (struct sockaddr *) &sockaddr[0], sizeof(sockaddr[0]));

  sock[1] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  sockaddr[1].sin_family = AF_INET;
  sockaddr[1].sin_addr.s_addr = inet_addr("74.125.134.27");
  sockaddr[1].sin_port = htons(25);
  connect(sock[1], (struct sockaddr *) &sockaddr[1], sizeof(sockaddr[1]));

  sock[2] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  sockaddr[2].sin_family = AF_INET;
  sockaddr[2].sin_addr.s_addr = inet_addr("127.0.0.1");
  sockaddr[2].sin_port = htons(1337);
  printf("bind = %d\n", bind(sock[2], (struct sockaddr *) &sockaddr[2], sizeof(sockaddr[2])));
  printf("listen = %d\n", listen(sock[2], 5));

  sock[3] = INVALID_SOCKET;

  while(1) {
    FD_ZERO(&readfds);
    FD_ZERO(&writefds);
    FD_ZERO(&exceptfds);

    FD_SET(sock[0], &readfds);
    FD_SET(sock[0], &exceptfds);
    maxfd = maxfd > sock[0] ? maxfd : sock[0];

    FD_SET(sock[1], &readfds);
    FD_SET(sock[1], &exceptfds);
    maxfd = maxfd > sock[1] ? maxfd : sock[1];

    FD_SET(sock[2], &readfds);
    FD_SET(sock[2], &exceptfds);
    maxfd = maxfd > sock[2] ? maxfd : sock[2];

    FD_SET((SOCKET)fileno(stdin), &readfds);
    maxfd = maxfd > (SOCKET)fileno(stdin) ? maxfd : (SOCKET)fileno(stdin);

    printf("maxfd = %d\n", maxfd);
    selfd = select_ws(maxfd + 1, &readfds, &writefds, &exceptfds, NULL);
    printf("selfd = %d\n", selfd);

    if(FD_ISSET(sock[0], &readfds)) {
      printf("read sock[0]\n");
      nread = recv(sock[0], buffer, 1024, 0);
      printf("read sock[0] = %d\n", nread);
    }
    if(FD_ISSET(sock[0], &exceptfds)) {
      printf("exception sock[0]\n");
    }

    if(FD_ISSET(sock[1], &readfds)) {
      printf("read sock[1]\n");
      nread = recv(sock[1], buffer, 1024, 0);
      printf("read sock[1] = %d\n", nread);
    }
    if(FD_ISSET(sock[1], &exceptfds)) {
      printf("exception sock[1]\n");
    }

    if(FD_ISSET(sock[2], &readfds)) {
      if(sock[3] != INVALID_SOCKET)
        closesocket(sock[3]);

      printf("accept sock[2] = %d\n", sock[2]);
      nread = sizeof(sockaddr[3]);
      printf("WSAGetLastError = %d\n", WSAGetLastError());
      sock[3] = accept(sock[2], (struct sockaddr *) &sockaddr[3], &nread);
      printf("WSAGetLastError = %d\n", WSAGetLastError());
      printf("accept sock[2] = %d\n", sock[3]);
    }
    if(FD_ISSET(sock[2], &exceptfds)) {
      printf("exception sock[2]\n");
    }

    if(FD_ISSET(fileno(stdin), &readfds)) {
      printf("read fileno(stdin)\n");
      nread = read(fileno(stdin), buffer, 1024);
      printf("read fileno(stdin) = %d\n", nread);
    }
  }

  WSACleanup();
  free(buffer);
}

Compile using MinGW with the following command:

mingw32-gcc select_ws.c -Wl,-lws2_32 -g -o select_ws.exe

Running the program directly from the console using the following command works:

select_ws.exe

But doing the same with a pipe will constantly signal WaitForMultipleObjectsEx:

ping -t 8.8.8.8 | select_ws.exe

The pipe is ready to read until the parent process is finished, e.g.:

ping 8.8.8.8 | select_ws.exe

Is there a compatible way to simulate a blocking wait on the PIPE-based console input handle together with the other handles? The use of threads should be avoided.

You are welcome to contribute changes to the example program in this gist.

Thanks in advance!

Upvotes: 5

Views: 2006

Answers (2)

mback2k
mback2k

Reputation: 441

I actually found a way to make it work using a separate waiting-thread. Please see the following commit in the curl repository on github.com.

Thanks for your comments!

Upvotes: 3

Remy Lebeau
Remy Lebeau

Reputation: 596206

Use GetStdHandle(STD_INPUT_HANDLE) to get the STDIN pipe handle, then use ReadFile/Ex() with an OVERLAPPED structure whose hEvent member is set to a manual-reset event from CreateEvent(). You can then use any of the WaitFor*() functions to wait on the event. If it times out, call CancelIo() to abort the read operation.

Upvotes: 2

Related Questions