ktc
ktc

Reputation: 337

Troubleshoot maximum FIFO size

Environment Info


Background

I am doing S3 objects migration via shell script (curl to local MinIO endpoint).

This script spawns multiple workers (via background process &), uses FIFO as job queue, and flock to control read job from queue, then downloads objects, uploads objects ... etc.

Part of the script looks like this:

# ...
# read params, show info ...
# setting up locks, fifo ...
# define the job() function run by worker.
#
# ...

# the FIFO (job queue) is FD 3
# the FIFO lock is FD 4


function work () {
  wid=$1  
  # setup FD and locks
  # ...

  while true; do
    flock 4                          # get the fifo lock
    read -r -su 3 job_id obj
    read_status=$?
    flock -u 4                       # release the fifo lock
    
    if [[ $read_status -eq 0 ]]; then
      # if read job from queue, run job in a subprocess.
    fi
  done
  
  # clean up FDs
}

echo "Spawning workers..."
for ((i=1;i<="${WORKERS:-4}";i++)); do
  printf "%s" "_"
  work $i &
done
echo


function append_job() {
  job_id=$1
  target_object=$2
  printf "\r(%d s) Append job [# %s] %.110s\e[K" "$(( $(date +%s) - ts_start ))" "${job_id}" "${target_object}"
  echo "${job_id}" "${target_object}" 1>&3 
}


while read -r obj_name ; do
  append_job $i "${obj_name}"
  i=$((i+1))
done < <(the_function_that_list_all_objects_from_bucket ${TARGET_BUCKET})
echo

Problem Encountered

The script works fine until one bucket's objects exceed 7,000 (It only appends exactly 7,000 jobs).

So the workers only migrated 7,000 objects from a bucket and finished.

What I expect

It should finish all the migration jobs (> 7,000) without any problem.

What I thought (could be very wrong):

  1. The stdin pass to the while loop (<(the_function_that_list_all_objects_from_bucket ${TARGET_BUCKET})), would store the list of all objects somewhere in memory, nothing to do with the FIFO for now.
  2. Even the FIFO has max size, and the loop keeps appending jobs into it, once one of the workers reads a job from the FIFO, at that moment the FIFO size should be reduced.
  3. Assume the stdin is very huge (<(gen_huge_result)), like > 16GB, but the OS would still take care of it, I don't need to worry about this. So the loop still works, just takes time.

What I have tried

I checked the length and size of the object result:

# the_function_that_list_all_objects_from_bucket ${TARGET_BUCKET} | wc -l 
8043

# the_function_that_list_all_objects_from_bucket ${TARGET_BUCKET} | wc -c
434387

As far as I know, I can get pipe max size via cat /proc/sys/fs/pipe-max-size

# cat /proc/sys/fs/pipe-max-size
1048576

These 8,043 objects should be fine on migrating.

I also tried to force the appending job to slow down, but still "finished" on 7,000 objects.

I guess the appending job stops while it's hitting the FIFO max size, but I don't know how or why.


Update:

1. Some experiments

As @pts mentioned on comments

  1. byte limits or line limits?

I quadrupled the job param:


work() {
  # ...
  while true; do
    ## try to read the queue
    flock 4                          # obtain the fifo lock
    read -r -su 3 work_id work_item tmp1 tmp2 tmp3 tmp4 tmp5 tmp6 # read into work_id and work_item
    read_status=$?                   # save the exit status of read
    # ...
}


append_job() {
  # ...
  echo "${work_id}" "$work_item" "${work_id}" "$work_item" "${work_id}" "$work_item" "${work_id}" "$work_item" 1>&3 ## the fifo is fd 3
}

And I saw it exceeded 2,000, then CTRL+C. If it was hitting some line limit, it should finish on ~1,750 (7,000/4)

  1. poisonous jobs?

It does not look like poisonous job. There is no stderr from jobs and I even set the job just doing true; still same.

I believe it should be some limits on sending job.

2. I can not reproduce this issue today.

I tried to do migration with "move" method (copy and delete) instead of just "copy", to make the work done yesterday.

Since this environment is a VM from my company, I ask infra team help me to restore the snapshot, which is right before the migration work starts. In this way, I can test scripts provided from @pts below, try to find some root cause.

After booting, I run again same script (above), want to make sure it stops on 7,000.

Then it shows appending jobs (objects): 8,043.

Maybe it is the re-booting (restore VM image) that flushes some limits on OS.

Since I can not reproduce this issue, I would just leave updates here.

If I hit this next time, I will try scripts below.

Thank you very much!

Upvotes: 4

Views: 177

Answers (1)

pts
pts

Reputation: 87271

This works for me on Linux with many workers:

#! /bin/bash --

set -e  # Exit on first non-zero status.

cat /proc/sys/fs/pipe-max-size >&2  # Doesn't matter.
trap '' PIPE

function work() {
  local wi="$1"
  ls -l /proc/"$BASHPID"/fd >&2
  echo "start work $wi $$ $BASHPID" >&2
  while true; do
    echo "pop $wi" >&2
    if ! flock 0; then
      echo "flock 0 failed in $wi" >&2
      break
    fi
    if ! read -rs cmd i; then
      echo "pop $wi failed" >&2
      flock -u 0 ||:
      break
    fi
    if ! flock -u 0; then
      echo "flock -u 0 failed in $wi" >&2
      break
    fi
    echo "popped $i in $wi" >&2
    #sleep 10000
    sleep ".0$((RANDOM%10))"  # Simulate slow work.
    #test $i = 100 && break
  done
  echo "end work $wi" >&2
}

function workers() {
  local twc="$1"  # Total worker count.
  exec 3<&0
  local wc=0 wx
  while test "$wc" != "$twc"; do
    let wc=wc+1 ||:
    work "$wc" <&3- &  # Start worker in the background.
  done
  while test "$wc" != 0; do
    wx=0; wait >&2 || wx="$?"
    echo "worker exit $wx" >&2  # We don't know which worker.
    let wc=wc-1 ||:
  done
  echo "end workers" >&2
}

function pusher() {
  i=0
  echo "start push $$ $BASHPID" >&2
  rm -f queue.job.log
  while true; do
    let i=i+1 ||:
    echo "push $i" >&2
    #if ! flock 1; then  # This would eventually cause a deadlock.
    #  echo "flock 1 failed" >&2
    #  break
    #fi
    if ! echo "this-is-a-long-string-for-a-job-command $i"; then
      echo "push failed" >&2
      rm -f -- "$ff"  # Indicate to workers that they can exit.
      break
    fi
    echo "JOB $i" >>queue.job.log
    #if ! flock -u 1; then
    #  echo "flock -u 1 failed" >&2
    #  break
    #fi
    echo "pushed $i" >&2
    test $i = 2000 && break
  done
  echo "end pusher" >&2
}

pusher | workers "${1:-4}"
echo "end program" >&2

The actual work is done in the sleep commands. Please note that if that command fails, the entire worker fails (because of set -e), and it won't process more jobs. If you want the workers to continue, add ||:, like this: my-process-job-command "$i" ||:.

It was very hard to get right.

It's also possible to make it work with named pipes (i.e.. mkfifo(1)), but preventing the redirects from blocking indefinitely is much more tricky.

Please note that the pipe queue buffer size seems to be a bit less than 65536 bytes (at least 65511 bytes) on my Linux system, even though /proc/sys/fs/pipe-max-size is 1048576. –

Upvotes: 2

Related Questions