Sorting Cats with Hadoop and psort
[This post will also be published on http://lwsffhs.wordpress.com/]
This is my first “self” tutorial on hadoop mapreduce streaming. If you are really IT oriented you probably want to read http://hadoop.apache.org/docs/r0.15.2/streaming.html (or any newer version). This post doesn’t add much to that document with respect to hadoop mapreduce streaming. Here I play a bit with the “sort” on the command line. Probably you might want to read first my previous notes: psort: parallel sorting …. I will run these examples in a virtual cluster (libvirt/qemu/KVM) composed of 1 master node with 4 CPUs and 10 computing nodes with 2 CPUs each. The virtual nodes are distributed in two physical machines (I will post here in the future some details about this virtual cluster).
The question I had was: what hadoop mapreduce streaming actually does?
The best I could do was to run it with the minimum coding: using /bin/cat.
Here is the first script I wrote:
#! /bin/sh # # if [ -z $1 ]; then echo "I need a file to process! Please give it as parameter." exit; fi; # infile=$1 if [ ! -f ${infile} ]; then echo "I need a file to process! File ${infile} does't exists." exit; fi; # if [ -f ${infile}.Sorted ]; then echo "output file existing... please remove: ${infile}.Sorted" exit; fi; # hadoop fs -mkdir SortExample hadoop fs -mkdir SortExample/input # hdfs dfs -copyFromLocal ${infile} SortExample/input # echo "Copied files: hadoop fs -ls SortExample ..." hadoop fs -ls SortExample # echo "Copied files: hadoop fs -ls PythonDemo/input ..." hadoop fs -ls SortExample/input # echo "Running map reduce cat sort..." # time \ hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.0.1.jar \ -input SortExample/input \ -output SortExample/output \ -mapper /bin/cat \ -reducer /bin/cat # # hdfs dfs -copyToLocal /user/${USER}/SortExample/output/part-00000 ${infile}.Sorted # echo "Cleaning up ..." hadoop fs -rm -r SortExample #
You might have guessed where I got the idea for the title…
Here we feed hadoop streaming with the command /bin/cat as mapper and as reducer (lines 37 and 38).
The result of this script, or better the result of simply passing a file within the mapreduce machinery is to obtain a sorted file. The final file is sorted “alphabetically”. I wanted to have an idea about the scaling. So I got my usual file:
[mariotti@buffalo SortIng]$ ls -alh MLhetrec11.dat -rw-r--r-- 1 mariotti lws 12M Aug 8 14:07 MLhetrec11.dat
This file is obtained from the Movielens ratings which can be found at this grouplens group page. It actually contains only users IDs, items IDs and ratings.
[mariotti@buffalo SortIng]$ head -4 MLhetrec11.dat 34101 4006 3.5 65308 3358 3 8726 57526 5 45049 1073 4.5
I needed also a bit bigger files, so I did something very simple:
cat MLhetrec11.dat MLhetrec11.dat | shuf > x.2m.dat cat x.2m.dat x.2m.dat | shuf > x.4m.dat cat x.4m.dat x.4m.dat | shuf > x.8m.dat ...etc...
..and I run the script on these files:
time ./hadoop_cat_sort_simple.sh MLhetrec11.dat
Here are the results (time outputs):
Hadoop simple ------------- 1: real 0m28.161s user 0m3.287s sys 0m0.195s L: real 0m40.044s user 0m19.298s sys 0m1.232s 2: real 0m33.243s user 0m3.537s sys 0m0.225s L: real 0m46.832s user 0m20.212s sys 0m1.478s 4: real 0m36.215s user 0m3.368s sys 0m0.213s L: real 0m54.426s user 0m20.111s sys 0m1.582s 8: real 0m49.241s user 0m3.379s sys 0m0.231s L: real 1m8.506s user 0m20.188s sys 0m1.758s 16: real 1m17.278s user 0m3.442s sys 0m0.232s L: real 2m0.970s user 0m27.995s sys 0m3.445s 32: real 1m51.419s user 0m3.653s sys 0m0.226s L: real 2m48.795s user 0m25.924s sys 0m3.652s
Here 1: means 1x MLhetrec11.dat, 2: 2x MLhetrec11.dat (x.2m.dat) etc… The first set of times (left most) are the execution times for the hadoop command line (see line 33 in the script above) while the second set of times are the total script run time which include the copy of the input and output files (you should consider only the “real” wall times).
The scaling is already impressive if we consider the very simple example.
For a first impression that was pretty ok.
But I want to compare with my simple parallel sort version from my previous post. So I need something a tiny bit more complex. From this link I realized that I can use “sort” like options. I wrote a new script which looks like this:
#! /bin/sh # # if [ -z $1 ]; then echo "I need a file to process! Please give it as parameter." exit; fi; # infile=$1 if [ ! -f ${infile} ]; then echo "I need a file to process! File ${infile} does't exists." exit; fi; # if [ -f ${infile}.Sorted ]; then echo "output file existing... please remove: ${infile}.Sorted" exit; fi; # hadoop fs -mkdir SortExample hadoop fs -mkdir SortExample/input # hdfs dfs -copyFromLocal ${infile} SortExample/input # echo "Copied files: hadoop fs -ls SortExample ..." hadoop fs -ls SortExample # echo "Copied files: hadoop fs -ls PythonDemo/input ..." hadoop fs -ls SortExample/input # echo "Running map reduce cat sort..." # time \ hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.0.1.jar \ -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ -D stream.map.output.field.separator=" " \ -D map.output.key.field.separator=" " \ -D stream.num.map.output.key.fields=3 \ -D mapred.text.key.comparator.options='-k1,1n -k2,2n' \ -input SortExample/input \ -output SortExample/output \ -mapper /bin/cat \ -reducer "/bin/sed 's/\t/ /'" # # hdfs dfs -copyToLocal /user/${USER}/SortExample/output/part-00000 ${infile}.Sorted # echo "Cleaning up ..." hadoop fs -rm -r SortExample
Hadoop mapreduce, by default, uses the concept of key/value pairs to address the data. When it reads an ASCII file it stores the text line as a key with a null value. In order to sort our file using two columns of numbers we need to instruct mapreduce to use 3 fields in the key (in line 38). We also define the fields separator in line 37. Line 39 defines the actual sorting using parameters like the command “sort”. note that mapreduce, by default, uses the character TAB as key/value separator. This explains why I changed one of my cat into a “sed” command (probably there is a better way but this was simply convenient and a good reminder for the future).
What about performances then:
Hadoop kn --------- 1: real 0m31.151s user 0m3.289s sys 0m0.219s L: real 0m43.017s user 0m19.018s sys 0m1.309s 2: real 0m37.268s user 0m3.603s sys 0m0.227s L: real 0m50.977s user 0m19.588s sys 0m1.369s 4: real 0m55.273s user 0m3.444s sys 0m0.243s L: real 1m14.012s user 0m20.704s sys 0m1.746s 8: real 1m28.416s user 0m3.382s sys 0m0.261s L: real 1m47.450s user 0m20.708s sys 0m1.791s 16: real 2m31.421s user 0m3.612s sys 0m0.282s L: real 3m17.217s user 0m24.851s sys 0m3.001s 32: real 3m59.697s user 0m3.925s sys 0m0.320s L: real 4m57.764s user 0m29.104s sys 0m4.320s 64: real 4m54.943s user 0m4.122s sys 0m0.380s L: real 7m2.802s user 0m36.107s sys 0m7.698s 128: real 7m42.483s user 0m4.806s sys 0m0.526s L: real 12m2.697s user 0m48.466s sys 0m17.385s
Also pretty impressive. But maybe we want to compare with some other numbers. Here is the timing for a single CPU (also on the virtual cluster) command line sort (time cat <FILE> | sort -n -k 1,1 -k 2,2 > out):
1 CPU Plain: 1:1 real 0m18.932s user 0m18.866s sys 0m0.069s 2:1 real 0m40.498s user 0m40.366s sys 0m0.138s 4:1 real 1m31.671s user 1m30.569s sys 0m0.246s 8:1 real 3m7.435s user 3m6.992s sys 0m0.441s 16:1 real 7m3.383s user 7m2.056s sys 0m1.338s 32:1 real 14m31.948s user 14m29.185s sys 0m2.687s 64:1 real 29m42.870s user 29m36.367s sys 0m5.615s
Now comes a second question: Is it really so convenient? The best I could do was to run my little parallel sort script. Here are the results:
TABLE real time (wall time) NCPUS 1 2 4 8 16 32 FS 1: 0m18.932s 0m10.599s 0m8.749s 0m9.284s 0m10.052s 0m11.814s 2: 0m40.498s 0m39.730s 0m18.905s 0m18.675s 0m22.118s 0m24.074s 4: 1m31.671s 1m20.683s 1m27.838s 0m37.300s 0m42.086s 0m49.270s 8: 3m7.435s 2m42.529s 2m50.724s 3m8.948s 1m20.471s 1m40.009s 16: 7m3.383s 5m36.974s 5m35.425s 6m1.593s 6m42.248s 3m8.407s 32: 14m31.948s 12m36.733s 11m14.520s 11m16.672s 12m33.215s 12m13.892s
Here the numbers are from the virtual cluster up to 4 CPUs (first three columns), and from the host machine when we use more CPUs. You can see that with the increasing size of the file the parallel sort needs more and more CPUs to compete with hadoop. For example with 32 times the MLhetrec11.dat size (about 355Mb), using 32 CPUs doesn’t improve the single CPU result.
Can we try to improve the command line sorting? One of the slow steps seems to be the merging. Why not to try with a second chain of merging?
This is a modified version of the parsort.sh from the the previous post:
#! /bin/sh # # Please note: this code works only up to 10 cpus # for more then ten CPUs play with seq -w 0 11 # It is a parallel sorting!! ncpu=4 ncpum1=$(( $ncpu - 1 )) hncpu=$(( $ncpu/2 )) hncpum1=$(( $hncpu - 1 )) # if [ -z $1 ]; then echo "I need a file to process! Please give it as parameter." exit; fi; # infile=$1 if [ ! -f ${infile} ]; then echo "I need a file to process! File ${infile} does't exists." exit; fi; # if [ -f ${infile}.Sorted ]; then echo "output file existing... please remove: ${infile}.Sorted" exit; fi; # fnl=`wc -l ${infile} | awk '{print $1;}'` echo "File: ${infile} with lines: $fnl" snl=$(($fnl / $ncpu)) snl=$(( $snl + 1)) # echo " File: ${infile}" echo "Lines: $fnl divided in blocks of $snl" # # echo "working on $ncpu CPUs splitted in 2 reducers each with $hncpu" echo "creating in fifos: " parallel -i echo x.tmp.fin{} -- `seq -w 0 ${ncpum1}` parallel -i mkfifo x.tmp.fin{} -- `seq -w 0 ${ncpum1}` echo "Splitting in files:" ls -al x.tmp.fin* split -a 1 -l ${snl} -d ${infile} x.tmp.fin & echo "creating tmp fifos: " parallel -i echo x.tmp.ftmp{} -- `seq -w 0 ${ncpum1}` parallel -i mkfifo x.tmp.ftmp{} -- `seq -w 0 ${ncpum1}` echo "Running part sorts" parallel -i echo sort -n -k 1,1 -k 2,2 x.tmp.fin{} -o x.tmp.ftmp{} -- `seq -w 0 ${ncpum1}` & parallel -i sort -n -k 1,1 -k 2,2 x.tmp.fin{} -o x.tmp.ftmp{} -- `seq -w 0 ${ncpum1}` & echo "Creating partial 2r fifos" mkfifo x.tmp.f_1 mkfifo x.tmp.f_2 ls -al x.tmp.f_1 x.tmp.f_2 echo "Running partial merging" echo "part 1" parallel -i echo x.tmp.ftmp{} -- `seq -w 0 ${hncpum1}` echo "part 2" parallel -i echo x.tmp.ftmp{} -- `seq -w ${hncpu} ${ncpum1}` parallel -i echo x.tmp.ftmp{} -- `seq -w 0 ${hncpum1}` | xargs sort -n -k 1,1 -k 2,2 -m > x.tmp.f_1 & parallel -i echo x.tmp.ftmp{} -- `seq -w ${hncpu} ${ncpum1}` | xargs sort -n -k 1,1 -k 2,2 -m > x.tmp.f_2 & echo "Running final merging" echo x.tmp.f_1 x.tmp.f_2 | xargs sort -n -k 1,1 -k 2,2 -m > ${infile}.Sorted # rm x.tmp.f*
This script executes a number of sort commands on an equivalent number of file splits, half of the sorts are sent to a merge and an other half to a second merge (sort -m). A final “sort -m” merges the results all together. Below you can see the results the results:
Multi Merges TABLE wall time (real) NCPUS 4 8 32 FS 1: 0m 6.499s 0m 5.188s 0m 6.036s 2: 0m13.857s 0m10.929s 0m13.129s 4: 1m21.897s 0m23.298s 0m25.432s 8: 2m40.529s 2m44.893s 0m51.860s 16: 5m25.471s 5m20.891s 1m38.839s 32: 10m55.388s 10m51.074s 11m35.632s
The command line sorting now starts to be fast! But as we can see as soon as the file size increases we start to have again a drop in performances.
We obtain our best sorting performances for file sizes 8x and 16x using 32 CPUs. For example:
8:32 CPU real 0m51.860s user 4m48.468s sys 0m1.269s 16:32 CPU real 1m38.839s user 9m57.542s sys 0m2.088s
these above are the time outputs for file size 8x and 16x using 32 CPUs with the multi merging script. As you can see the “user” time is more then 4 times the “wall” (real) time.
Remarks/Conclusions
This post is a pure exercise to try to understand a bit the mechanism of hadoop and a bit of simple embarrassing parallel implementations. There is no attempt to create a benchmark for sort or hadoop mapreduce.
We can see that hadoop solves a lot of problems which would be a bit difficult to solve in a different way: we would have to run benchmarks to find the optimal file size for sort and eventually tune other technical details for example in sort itself.
If somebody is really interested in the command line I would suggest to check carefully the man page of “sort”, which for example offers itself already a mechanism to regulate the number of merging processes.
I found also this discussion which might be interesting…
http://comments.gmane.org/gmane.comp.sysutils.parallel/180