Sorting Cats with Hadoop and psort

[This post will also be published on http://lwsffhs.wordpress.com/]

This is my first “self” tutorial on hadoop mapreduce streaming. If you are really IT oriented you probably want to read http://hadoop.apache.org/docs/r0.15.2/streaming.html (or any newer version). This post doesn’t add much to that document with respect to hadoop mapreduce streaming. Here I play a bit with the “sort” on the command line. Probably you might want to read first my previous notes: psort: parallel sorting …. I will run these examples in a virtual cluster (libvirt/qemu/KVM) composed of 1 master node with 4 CPUs and 10 computing nodes with 2 CPUs each. The virtual nodes are distributed in two physical machines (I will post here in the future some details about this virtual cluster).

The question I had was: what hadoop mapreduce streaming actually does?

The best I could do was to run it with the minimum coding: using /bin/cat.

Here is the first script I wrote:

#! /bin/sh
#
#
if [ -z $1 ]; then
 echo "I need a file to process! Please give it as parameter."
 exit;
fi;
#
infile=$1
if [ ! -f ${infile} ]; then
 echo "I need a file to process! File ${infile} does't exists."
 exit;
fi;
#
if [ -f ${infile}.Sorted ]; then
 echo "output file existing... please remove: ${infile}.Sorted"
 exit;
fi;
#
hadoop fs -mkdir SortExample
hadoop fs -mkdir SortExample/input
#
hdfs dfs -copyFromLocal ${infile} SortExample/input
#
echo "Copied files: hadoop fs -ls SortExample ..."
hadoop fs -ls SortExample
#
echo "Copied files: hadoop fs -ls PythonDemo/input ..."
hadoop fs -ls SortExample/input
#
echo "Running map reduce cat sort..."
#
time \
 hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.0.1.jar \
 -input SortExample/input \
 -output SortExample/output \
 -mapper /bin/cat \
 -reducer /bin/cat
#
#
hdfs dfs -copyToLocal /user/${USER}/SortExample/output/part-00000 ${infile}.Sorted
#
echo "Cleaning up ..."
hadoop fs -rm -r SortExample
#

You might have guessed where I got the idea for the title…

Here we feed hadoop streaming with the command /bin/cat as mapper and as reducer (lines 37 and 38).

The result of this script, or better the result of simply passing a file within the mapreduce machinery is to obtain a sorted file. The final file is sorted “alphabetically”.  I wanted to have an idea about the scaling. So I got my usual file:

[mariotti@buffalo SortIng]$ ls -alh MLhetrec11.dat
-rw-r--r-- 1 mariotti lws 12M Aug 8 14:07 MLhetrec11.dat

This file is obtained from the Movielens ratings which can be found at this grouplens group page. It actually contains only users IDs, items IDs and ratings.


[mariotti@buffalo SortIng]$ head -4 MLhetrec11.dat
34101 4006 3.5
65308 3358 3
8726 57526 5
45049 1073 4.5

I needed also a bit bigger files, so I did something very simple:

cat MLhetrec11.dat MLhetrec11.dat | shuf > x.2m.dat
cat x.2m.dat x.2m.dat | shuf > x.4m.dat
cat x.4m.dat x.4m.dat | shuf > x.8m.dat
...etc...

..and I run the script on these files:

time ./hadoop_cat_sort_simple.sh MLhetrec11.dat

Here are the results (time outputs):

Hadoop simple
-------------

 1: real 0m28.161s user 0m3.287s sys 0m0.195s L: real 0m40.044s user 0m19.298s sys 0m1.232s
 2: real 0m33.243s user 0m3.537s sys 0m0.225s L: real 0m46.832s user 0m20.212s sys 0m1.478s
 4: real 0m36.215s user 0m3.368s sys 0m0.213s L: real 0m54.426s user 0m20.111s sys 0m1.582s
 8: real 0m49.241s user 0m3.379s sys 0m0.231s L: real 1m8.506s user 0m20.188s sys 0m1.758s
16: real 1m17.278s user 0m3.442s sys 0m0.232s L: real 2m0.970s user 0m27.995s sys 0m3.445s
32: real 1m51.419s user 0m3.653s sys 0m0.226s L: real 2m48.795s user 0m25.924s sys 0m3.652s

Here 1: means 1x MLhetrec11.dat, 2: 2x MLhetrec11.dat (x.2m.dat) etc… The first set of times (left most) are the execution times for the hadoop command line (see line 33 in the script above) while the second set of times are the total script run time which include the copy of the input and output files (you should consider only the “real” wall times).

The scaling is already impressive if we consider the very simple example.

For a first impression that was pretty ok.

But I want to compare with my simple parallel sort version from my previous post. So I need something a tiny bit more complex. From this link I realized that I can use “sort” like options. I wrote a new script which looks like this:

#! /bin/sh
#
#
if [ -z $1 ]; then
 echo "I need a file to process! Please give it as parameter."
 exit;
fi;
#
infile=$1
if [ ! -f ${infile} ]; then
 echo "I need a file to process! File ${infile} does't exists."
 exit;
fi;
#
if [ -f ${infile}.Sorted ]; then
 echo "output file existing... please remove: ${infile}.Sorted"
 exit;
fi;
#
hadoop fs -mkdir SortExample
hadoop fs -mkdir SortExample/input
#
hdfs dfs -copyFromLocal ${infile} SortExample/input
#
echo "Copied files: hadoop fs -ls SortExample ..."
hadoop fs -ls SortExample
#
echo "Copied files: hadoop fs -ls PythonDemo/input ..."
hadoop fs -ls SortExample/input
#
echo "Running map reduce cat sort..."
#
time \
 hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.0.1.jar \
 -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
 -D stream.map.output.field.separator=" " \
 -D map.output.key.field.separator=" " \
 -D stream.num.map.output.key.fields=3 \
 -D mapred.text.key.comparator.options='-k1,1n -k2,2n' \
 -input SortExample/input \
 -output SortExample/output \
 -mapper /bin/cat \
 -reducer "/bin/sed 's/\t/ /'"
#
#
hdfs dfs -copyToLocal /user/${USER}/SortExample/output/part-00000 ${infile}.Sorted
#
echo "Cleaning up ..."
hadoop fs -rm -r SortExample

Hadoop mapreduce, by default, uses the concept of key/value pairs to address the data. When it reads an ASCII file it stores the text line as a key with a null value. In order to sort our file using two columns of numbers we need to instruct mapreduce to use 3 fields in the key (in line 38). We also define the fields separator in line 37. Line 39 defines the actual sorting using parameters like the command “sort”. note that mapreduce, by default, uses the character TAB as key/value separator. This explains why I changed one of my cat into a “sed” command (probably there is a better way but this was simply convenient and a good reminder for the future).

What about performances then:

Hadoop kn
---------

  1: real 0m31.151s user 0m3.289s sys 0m0.219s L: real 0m43.017s user 0m19.018s sys 0m1.309s
  2: real 0m37.268s user 0m3.603s sys 0m0.227s L: real 0m50.977s user 0m19.588s sys 0m1.369s
  4: real 0m55.273s user 0m3.444s sys 0m0.243s L: real 1m14.012s user 0m20.704s sys 0m1.746s
  8: real 1m28.416s user 0m3.382s sys 0m0.261s L: real 1m47.450s user 0m20.708s sys 0m1.791s
 16: real 2m31.421s user 0m3.612s sys 0m0.282s L: real 3m17.217s user 0m24.851s sys 0m3.001s
 32: real 3m59.697s user 0m3.925s sys 0m0.320s L: real 4m57.764s user 0m29.104s sys 0m4.320s
 64: real 4m54.943s user 0m4.122s sys 0m0.380s L: real 7m2.802s  user 0m36.107s sys 0m7.698s
128: real 7m42.483s user 0m4.806s sys 0m0.526s L: real 12m2.697s user 0m48.466s sys 0m17.385s

Also pretty impressive. But maybe we want to compare with some other numbers. Here is the timing for a single CPU (also on the virtual cluster) command line sort (time cat <FILE> | sort -n -k 1,1 -k 2,2 > out):


1 CPU Plain:

 1:1 real  0m18.932s user  0m18.866s sys 0m0.069s
 2:1 real  0m40.498s user  0m40.366s sys 0m0.138s
 4:1 real  1m31.671s user  1m30.569s sys 0m0.246s
 8:1 real  3m7.435s  user  3m6.992s  sys 0m0.441s
16:1 real  7m3.383s  user  7m2.056s  sys 0m1.338s
32:1 real 14m31.948s user 14m29.185s sys 0m2.687s
64:1 real 29m42.870s user 29m36.367s sys 0m5.615s

Now comes a second question: Is it really so convenient? The best I could do was to run my little parallel sort script. Here are the results:

TABLE real time (wall time)

NCPUS  1           2           4           8          16          32
FS
 1:    0m18.932s   0m10.599s   0m8.749s    0m9.284s    0m10.052s   0m11.814s
 2:    0m40.498s   0m39.730s   0m18.905s   0m18.675s   0m22.118s   0m24.074s
 4:    1m31.671s   1m20.683s   1m27.838s   0m37.300s   0m42.086s   0m49.270s
 8:    3m7.435s    2m42.529s   2m50.724s   3m8.948s    1m20.471s   1m40.009s
16:    7m3.383s    5m36.974s   5m35.425s   6m1.593s    6m42.248s   3m8.407s
32:   14m31.948s  12m36.733s  11m14.520s  11m16.672s  12m33.215s  12m13.892s

Here the numbers are from the virtual cluster up to 4 CPUs (first three columns), and from the host machine when we use more CPUs. You can see that with the increasing size of the file the parallel sort needs more and more CPUs to compete with hadoop. For example with 32 times the MLhetrec11.dat size (about 355Mb), using 32 CPUs doesn’t improve the single CPU result.

Can we try to improve the command line sorting? One of the slow steps seems to be the merging. Why not to try with a second chain of merging?

This is a modified version of the parsort.sh from the the previous post:

#! /bin/sh
#
# Please note: this code works only up to 10 cpus
# for more then ten CPUs play with seq -w 0 11
# It is a parallel sorting!!
ncpu=4
ncpum1=$(( $ncpu - 1 ))
hncpu=$(( $ncpu/2 ))
hncpum1=$(( $hncpu - 1 ))
#
if [ -z $1 ]; then
 echo "I need a file to process! Please give it as parameter."
 exit;
fi;
#
infile=$1
if [ ! -f ${infile} ]; then
 echo "I need a file to process! File ${infile} does't exists."
 exit;
fi;
#
if [ -f ${infile}.Sorted ]; then
 echo "output file existing... please remove: ${infile}.Sorted"
 exit;
fi;
#
fnl=`wc -l ${infile} | awk '{print $1;}'`
echo "File: ${infile} with lines: $fnl"
snl=$(($fnl / $ncpu))
snl=$(( $snl + 1))
#
echo " File: ${infile}"
echo "Lines: $fnl divided in blocks of $snl"
#
#
echo "working on $ncpu CPUs splitted in 2 reducers each with $hncpu"
echo "creating in fifos: "
parallel -i echo x.tmp.fin{} -- `seq -w 0 ${ncpum1}`
parallel -i mkfifo x.tmp.fin{} -- `seq -w 0 ${ncpum1}`
echo "Splitting in files:"
ls -al x.tmp.fin*
split -a 1 -l ${snl} -d ${infile} x.tmp.fin &
echo "creating tmp fifos: "
parallel -i echo x.tmp.ftmp{} -- `seq -w 0 ${ncpum1}`
parallel -i mkfifo x.tmp.ftmp{} -- `seq -w 0 ${ncpum1}`
echo "Running part sorts"
parallel -i echo sort -n -k 1,1 -k 2,2 x.tmp.fin{} -o x.tmp.ftmp{} -- `seq -w 0 ${ncpum1}` &
parallel -i sort -n -k 1,1 -k 2,2 x.tmp.fin{} -o x.tmp.ftmp{} -- `seq -w 0 ${ncpum1}` &
echo "Creating partial 2r fifos"
mkfifo x.tmp.f_1
mkfifo x.tmp.f_2
ls -al x.tmp.f_1 x.tmp.f_2
echo "Running partial merging"
echo "part 1"
parallel -i echo x.tmp.ftmp{} -- `seq -w 0 ${hncpum1}`
echo "part 2"
parallel -i echo x.tmp.ftmp{} -- `seq -w ${hncpu} ${ncpum1}`
parallel -i echo x.tmp.ftmp{} -- `seq -w 0 ${hncpum1}` | xargs sort -n -k 1,1 -k 2,2 -m > x.tmp.f_1 &
parallel -i echo x.tmp.ftmp{} -- `seq -w ${hncpu} ${ncpum1}` | xargs sort -n -k 1,1 -k 2,2 -m > x.tmp.f_2 &
echo "Running final merging"
echo x.tmp.f_1 x.tmp.f_2 | xargs sort -n -k 1,1 -k 2,2 -m > ${infile}.Sorted
#
rm x.tmp.f*

This script executes a number of sort commands on an equivalent number of file splits, half of the sorts are sent to a merge and an other half to a second merge (sort -m). A final “sort -m” merges the results all together. Below you can see the results the results:

Multi Merges
TABLE wall time (real)
NCPUS    4              8               32
FS
 1:     0m 6.499s      0m 5.188s        0m 6.036s
 2:     0m13.857s      0m10.929s        0m13.129s
 4:     1m21.897s      0m23.298s        0m25.432s
 8:     2m40.529s      2m44.893s        0m51.860s
16:     5m25.471s      5m20.891s        1m38.839s
32:    10m55.388s     10m51.074s       11m35.632s

The command line sorting now starts to be fast! But as we can see as soon as the file size increases we start to have again a drop in performances.

We obtain our best sorting performances for file sizes 8x and 16x using 32 CPUs. For example:

 8:32 CPU real  0m51.860s user  4m48.468s sys 0m1.269s
16:32 CPU real  1m38.839s user  9m57.542s sys 0m2.088s

these above are the time outputs for file size 8x and 16x using 32 CPUs with the multi merging script. As you can see the “user” time is more then 4 times the “wall” (real) time.

Remarks/Conclusions

This post is a pure exercise to try to understand a bit the mechanism of hadoop and a bit of simple embarrassing parallel implementations. There is no attempt to create a benchmark for sort or hadoop mapreduce.

We can see that hadoop solves a lot of problems which would be a bit difficult to solve in a different way: we would have to run benchmarks to find the optimal file size for sort and eventually tune other technical details for example in sort itself.

If somebody is really interested in the command line I would suggest to check carefully the man page of “sort”, which for example offers itself already a mechanism to regulate the number of merging processes.

3 comments

  1. Pingback: hd bbc – our lab local virtual hadoop cluster | Laboratory for Web Science
  2. Pingback: hd bbc – our lab local virtual hadoop cluster « FM Techottis

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.