psort: Parallel sorting on the command line. An example.
[This is a copy of the post on: http://lwsffhs.wordpress.com/ at http://lwsffhs.wordpress.com/2012/08/29/psort-parallel-sorting-on-the-command-line-an-example/ ]
I am in the process to understand hadoop and the map-reduce framework.
This introductory line will be clarified with the next post, but keep in mind that in this post I am not seeking for the fastest sort but a bit more for a sort within a parallel framework. I need to sort a lot of data 😉
I needed a simple code which would work on my Q6600 processor and also on my 2 nodes 16×2 cores cpus. Sorting seems to be a good example, easy to understand, easy to implement with the sort command and a pretty typical problem. More over hadoop recently (maybe years in the IT time scale) won one of the sorting competition (See here or also here. Google it for up to date data). It sounded a good starting point for a simple and dummy comparison.
Here runs my example.
What I want is:
cat myfile.dat | sort -n -k 1,1 -k 2,2 > result.txt
but done in parallel. So you just search a bit and you find a lot of solutions. What I write here is on the internet but I cannot cite any original work because I got a lot of ideas and lines while surfing but no one matched my target right away. With one exception: psort.sh which apparently I cannot find anymore (I will add the reference ASAP).
Is it really so? The unix/linux infrastructure has no standardized multi cores sort? The GNU and FSF and the like should be moving in the multi core era.
I start with the first full script right away:
#! /bin/sh # # Please note: this code works only up to 10 cpus # for more CPUs please play with seq -w 0 11 and sort -a 1 # It is a parallel sorting!! ncpu=4 ncpum1=$(( $ncpu - 1 )) # if [ -z $1 ]; then echo "I need a file to process! Please give it as parameter." exit; fi; # infile=$1 if [ ! -f ${infile} ]; then echo "I need a file to process! File ${infile} does't exists." exit; fi; # if [ -f ${infile}.Sorted ]; then echo "output file existing... please remove: ${infile}.Sorted" exit; fi; # fnl=`wc -l ${infile} | awk '{print $1;}'` echo "File: ${infile} with lines: $fnl" snl=$(($fnl / $ncpu)) snl=$(( $snl + 1)) # echo " File: ${infile}" echo "Lines: $fnl divided in blocks of $snl" # parallel -i mkfifo tmp.fin{} -- `seq 0 ${ncpum1}` split -a 1 -l ${snl} -d ${infile} tmp.fin & parallel -i mkfifo tmp.ftmp{} -- `seq 0 ${ncpum1}` parallel -i sort -n -k 1,1 -k 2,2 tmp.fin{} -o tmp.ftmp{} -- `seq 0 ${ncpum1}` & ls tmp.ftmp* | xargs sort -n -k 1,1 -k 2,2 -m > ${infile}.Sorted # rm tmp.f* #
The firsts lines are just file checking which I always appreciate in testing scripts. For example I check I do not overwrite an existing output. So it is your responsibility to remove it.
The first important part of the script is to evaluate the number of lines of the input file. This is used to define the size of the splits we will create, these will be processed by parallel instances of the sort command. Between lines 25 and 28 we evaluate the number of lines and calculate how many lines go to each sort process. The “+1” in this case (line 28) is a little trick in order to try to get a uniform distribution: it is not the best trick but it works here with a low number of CPUs.
These few lines are indeed the actual code:
parallel -i mkfifo tmp.fin{} -- `seq 0 ${ncpum1}` split -a 1 -l ${snl} -d ${infile} tmp.fin & <pre>parallel -i mkfifo tmp.ftmp{} -- `seq 0 ${ncpum1}` parallel -i sort -n -k 1,1 -k 2,2 tmp.fin{} -o tmp.ftmp{} -- `seq 0 ${ncpum1}` & ls tmp.ftmp* | xargs sort -n -k 1,1 -k 2,2 -m > ${infile}.Sorted #
I write this again for 4 CPUs (sequence 0..3) and input file INfile:
parallel -i mkfifo tmp.fin{} -- `seq 0 3` split -a 1 -l ${snl} -d INfile tmp.fin & parallel -i mkfifo tmp.fou{} -- `seq 0 3` parallel -i sort -n -k 1,1 -k 2,2 tmp.fin{} -o tmp.fou{} -- `seq 0 3` & ls tmp.fou* | xargs sort -n -k 1,1 -k 2,2 -m > INfile.Sorted #
We use a bit the shell and special command options. I want to emulate a stream of data, so instead of single processes on files, I create FIFO or named pipes. The main characteristic of the named pipes I want to use is that they are blocking. It means that nothing happen if there is no data flux inside the named pipe. (a nice thing is also the error: broken pipe).
This is what happen: line 1 (of the latest code block) creates 4 FIFO(named pipes). The creation is done in parallel (gnu parallel to warranty they are all done). The split command is asked to write to these FIFO and it will block until it can write. This is why you require the ampersand (send to background) at the end of this line: or the script will block.
We now create the FIFOs (named pipes) where the next command should write.
The sort command is instructed to read from the “in” FIFOs and to write to the (still 4) “ou” FIFOs. At this point everything is again blocked because the sort cannot yet write in its own output. This is again why this command has an ampersand (put it in the background).
We indeed free the full pipe by reading from the four FIFO using “sort -m”. With the last line we start the sorting process. This is the merging process, not sorting anymore.
This script is not your best parallel sorting tool but it might be useful for some ideas. I created it in order to understand hadoop and the possible implications. I will post a short blog about performances of this script shortly.
Notes:
There is a little concurrent processes conflict between line 4 and 5. In this example the ‘ls’ command seems to give enough time to the previous command to complete the FIFO opening. Or: the script might start to read from the FIFO before the writing process (a broken pipe error). Pretty unlucky in this example but an existing problem.
2 comments