Reputation: 337
CentOS 7.9
GNU bash, version 4.2.46(2)-release (x86_64-redhat-linux-gnu)
I am doing S3 objects migration via shell script (curl
to local MinIO endpoint).
This script spawns multiple workers (via background process &
), uses FIFO as job queue, and flock
to control read job from queue, then downloads objects, uploads objects ... etc.
Part of the script looks like this:
# ...
# read params, show info ...
# setting up locks, fifo ...
# define the job() function run by worker.
#
# ...
# the FIFO (job queue) is FD 3
# the FIFO lock is FD 4
function work () {
wid=$1
# setup FD and locks
# ...
while true; do
flock 4 # get the fifo lock
read -r -su 3 job_id obj
read_status=$?
flock -u 4 # release the fifo lock
if [[ $read_status -eq 0 ]]; then
# if read job from queue, run job in a subprocess.
fi
done
# clean up FDs
}
echo "Spawning workers..."
for ((i=1;i<="${WORKERS:-4}";i++)); do
printf "%s" "_"
work $i &
done
echo
function append_job() {
job_id=$1
target_object=$2
printf "\r(%d s) Append job [# %s] %.110s\e[K" "$(( $(date +%s) - ts_start ))" "${job_id}" "${target_object}"
echo "${job_id}" "${target_object}" 1>&3
}
while read -r obj_name ; do
append_job $i "${obj_name}"
i=$((i+1))
done < <(the_function_that_list_all_objects_from_bucket ${TARGET_BUCKET})
echo
The script works fine until one bucket's objects exceed 7,000 (It only appends exactly 7,000 jobs).
So the workers only migrated 7,000 objects from a bucket and finished.
It should finish all the migration jobs (> 7,000) without any problem.
What I thought (could be very wrong):
<(the_function_that_list_all_objects_from_bucket ${TARGET_BUCKET})
), would store the list of all objects somewhere in memory, nothing to do with the FIFO for now.<(gen_huge_result)
), like > 16GB, but the OS would still take care of it, I don't need to worry about this. So the loop still works, just takes time.I checked the length and size of the object result:
# the_function_that_list_all_objects_from_bucket ${TARGET_BUCKET} | wc -l
8043
# the_function_that_list_all_objects_from_bucket ${TARGET_BUCKET} | wc -c
434387
As far as I know, I can get pipe max size via cat /proc/sys/fs/pipe-max-size
# cat /proc/sys/fs/pipe-max-size
1048576
These 8,043 objects should be fine on migrating.
I also tried to force the appending job to slow down, but still "finished" on 7,000 objects.
I guess the appending job stops while it's hitting the FIFO max size, but I don't know how or why.
As @pts mentioned on comments
I quadrupled the job param:
work() {
# ...
while true; do
## try to read the queue
flock 4 # obtain the fifo lock
read -r -su 3 work_id work_item tmp1 tmp2 tmp3 tmp4 tmp5 tmp6 # read into work_id and work_item
read_status=$? # save the exit status of read
# ...
}
append_job() {
# ...
echo "${work_id}" "$work_item" "${work_id}" "$work_item" "${work_id}" "$work_item" "${work_id}" "$work_item" 1>&3 ## the fifo is fd 3
}
And I saw it exceeded 2,000, then CTRL+C
. If it was hitting some line limit, it should finish on ~1,750 (7,000/4)
It does not look like poisonous job. There is no stderr from jobs and I even set the job just doing true
; still same.
I believe it should be some limits on sending job.
I tried to do migration with "move" method (copy and delete) instead of just "copy", to make the work done yesterday.
Since this environment is a VM from my company, I ask infra team help me to restore the snapshot, which is right before the migration work starts. In this way, I can test scripts provided from @pts below, try to find some root cause.
After booting, I run again same script (above), want to make sure it stops on 7,000.
Then it shows appending jobs (objects): 8,043.
Maybe it is the re-booting (restore VM image) that flushes some limits on OS.
Since I can not reproduce this issue, I would just leave updates here.
If I hit this next time, I will try scripts below.
Thank you very much!
Upvotes: 4
Views: 177
Reputation: 87271
This works for me on Linux with many workers:
#! /bin/bash --
set -e # Exit on first non-zero status.
cat /proc/sys/fs/pipe-max-size >&2 # Doesn't matter.
trap '' PIPE
function work() {
local wi="$1"
ls -l /proc/"$BASHPID"/fd >&2
echo "start work $wi $$ $BASHPID" >&2
while true; do
echo "pop $wi" >&2
if ! flock 0; then
echo "flock 0 failed in $wi" >&2
break
fi
if ! read -rs cmd i; then
echo "pop $wi failed" >&2
flock -u 0 ||:
break
fi
if ! flock -u 0; then
echo "flock -u 0 failed in $wi" >&2
break
fi
echo "popped $i in $wi" >&2
#sleep 10000
sleep ".0$((RANDOM%10))" # Simulate slow work.
#test $i = 100 && break
done
echo "end work $wi" >&2
}
function workers() {
local twc="$1" # Total worker count.
exec 3<&0
local wc=0 wx
while test "$wc" != "$twc"; do
let wc=wc+1 ||:
work "$wc" <&3- & # Start worker in the background.
done
while test "$wc" != 0; do
wx=0; wait >&2 || wx="$?"
echo "worker exit $wx" >&2 # We don't know which worker.
let wc=wc-1 ||:
done
echo "end workers" >&2
}
function pusher() {
i=0
echo "start push $$ $BASHPID" >&2
rm -f queue.job.log
while true; do
let i=i+1 ||:
echo "push $i" >&2
#if ! flock 1; then # This would eventually cause a deadlock.
# echo "flock 1 failed" >&2
# break
#fi
if ! echo "this-is-a-long-string-for-a-job-command $i"; then
echo "push failed" >&2
rm -f -- "$ff" # Indicate to workers that they can exit.
break
fi
echo "JOB $i" >>queue.job.log
#if ! flock -u 1; then
# echo "flock -u 1 failed" >&2
# break
#fi
echo "pushed $i" >&2
test $i = 2000 && break
done
echo "end pusher" >&2
}
pusher | workers "${1:-4}"
echo "end program" >&2
The actual work is done in the sleep
commands. Please note that if that command fails, the entire worker fails (because of set -e
), and it won't process more jobs. If you want the workers to continue, add ||:
, like this: my-process-job-command "$i" ||:
.
It was very hard to get right.
It's also possible to make it work with named pipes (i.e.. mkfifo(1)), but preventing the redirects from blocking indefinitely is much more tricky.
Please note that the pipe queue buffer size seems to be a bit less than 65536 bytes (at least 65511 bytes) on my Linux system, even though /proc/sys/fs/pipe-max-size is 1048576. –
Upvotes: 2