Friday, 16 May 2014

Multiple threads in a mapper i.e. MultithreadedMapper


 As the name suggests it is map task that spawns multiple threads. A map task can be considered as a process which runs on its own jvm boundary. Multithreaded spawns multiple threads within the same map task. Don’t confuse the same as multiple tasks within the same jvm (this is achieved with jvm reuse). When I say a task has multiple threads, a task would be reusing the input split as defined by the input format and record reader reads the input like a normal map task. The multi threading happens after this stage; once the record reading has happened then the input/task is divided into multiple threads. (ie the input IO is not multi threaded and multiple threads come into picture after that)
MultiThreadedMapper is a good fit if your operation is highly CPU intensive and multiple threads getting multiple cycles could help in speeding up the task. If IO intensive, then running multiple tasks is much better than multi thread as in multiple tasks multiple IO reads would be happening in parallel.
Let us see how we can use MultiThreadedMapper. There are different ways to do the same in old mapreduce API and new API.

Old API
Enable Multi threaded map runner as
-D mapred.map.runner.class = org.apache.hadoop.mapred.lib.MultithreadedMapRunner
Or
jobConf.setMapRunnerClass(org.apache.hadoop.mapred.lib.MultithreadedMapRunner);

New API
Your mapper class should sub class (extend) org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper instead of org.apache.hadoop.mapreduce.Mapper . The Multithreadedmapper has a different implementation of run() method.

You can set the number of threads within a mapper in MultiThreadedMapper by

MultithreadedMapper.setNumberOfThreads(n); or
mapred.map.multithreadedrunner.threads = n


Note: Don’t think it in a way that multi threaded mapper is better than normal map reduce as it spawns less jvms and less number of processes. If a mapper is loaded with lots of threads the chances of that jvm crashing are more and the cost of re-execution of such a hadoop task would be terribly high.
            Don’t use Multi Threaded Mapper to control the number of jvms spanned, if that is your goal you need to tweak the mapred.job.reuse.jvm.num.tasks parameter whose default value is 1, means no jvm reuse across tasks.
            The threads are at the bottom level ie within a map task and the higher levels on hadoop framework like the job has no communication regarding the same.


Friday, 2 May 2014

Hadoop Commands

hadoop command [genericOptions] [commandOptions]

hadoop fs
Usage: java FsShell
 [-ls <path>]
 [-lsr <path>]
 [-df [<path>]]
 [-du <path>]
 [-dus <path>]
 [-count[-q] <
path>]
 [-mv <src> <dst>]
 [-cp <src> <dst>]
 [-rm [-skipTrash] <path>]
 [-rmr [-skipTrash] <path>]
 [-expunge]
 [-put <localsrc> ... <dst>]
 [-copyFromLocal <localsrc> ... <dst>]
 [-moveFromLocal <localsrc> ... <dst>]
 [-get [-ignoreCrc] [-crc] <src> <localdst>]
 [-getmerge <src> <localdst> [addnl]]
 [-cat <src>]
 [-text <src>]
 [-copyToLocal [-ignoreCrc] [-crc] <src> <localdst>]
 [-moveToLocal [-crc] <src> <localdst>]
 [-mkdir <path>]
 [-setrep [-R] [-w] <rep> <path/file>]
 [-touchz <path>]
 [-kiran -[ezd] <path>]
 [-stat [format] <path>]
 [-tail [-f] <file>]
 [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
 [-chown [-R] [
OWNER][:[GROUP]] PATH...]
 [-chgrp [-R] GROUP PATH...]
 [-help [cmd]]

hadoop fs -ls /
hadoop fs -ls /kiran1/
hadoop fs -cat /kiran1/foo.txt
hadoop fs -rm /kiran1/foo.txt
hadoop fs -mkdir /kiran6/
hadoop fs -put foo.txt /kiran6/
hadoop fs -put /etc/note.txt /kiran2/note_fs.txt
hadoop fs -get /user/kiran/passwd ./
hadoop fs -setrep 5 -R /user/kiran/tmp/
hadoop fsck /user/kiran/tmp -files -blocks -locations
hadoop fs -chmod 1777 /tmp
hadoop fs -touch /user/kiran/kiran/foo
hadoop fs -rmr /user/kiran/kiran/foo
hadoop fs -touchz /user/kiran/kiran/bar
hadoop fs -count -q /user/kiran

hadoop -execute start-all.sh

hadoop job -list
hadoop job -kill jobID
hadoop job -list-attempt-ids jobID taskType taskState
hadoop job -kill-task taskAttemptId

hadoop namenode -format

hadoop jar <jar_file> wordcount <output_file>
hadoop jar /opt/hadoop/hadoop-examples-1.0.4.jar wordcount /out/wc_output

hadoop dfsadmin -report
hadoop dfsadmin -setSpaceQuota 10737418240 /user/esammer
hadoop dfsadmin -refreshNodes
hadoop dfsadmin -upgradeProgress status
hadoop dfsadmin -finalizeUpgrade

hadoop fsck
Usage: DFSck <path> [-move | -delete | -openforwrite] [-files [-blocks [-locations | -racks]]]
 <path>  -- start checking from this path
 -move  -- move
corrupted files to /lost+found
 -delete  -- delete corrupted files
 -files   --
print out files being checked
 -openforwrite --
print out files opened for write
 -blocks  --
print out block report
 -locations  --
print out locations for every block
 -racks  --
print out network topology for data-node locations
By default fsck ignores files opened for write, use -openforwrite to report such files. They are usually tagged CORRUPT or HEALTHY depending on their block allocation status.
hadoop fsck / -files -blocks -locations
hadoop fsck /user/kiran -files -blocks -locations

hadoop distcp                -- Distributed Copy (distcp)
distcp [OPTIONS] <srcurl>* <desturl>
 OPTIONS:
 -p[rbugp] Preserve status
 r: replication number
 b: block size
 u: user
 g: group
 p: permission
 -p alone is equivalent to -prbugp
 -i Ignore failures
 -log <logdir> Write logs to <logdir>
 -m <num_maps> Maximum number of simultaneous copies
 -overwrite Overwrite destination
 -update Overwrite if src size different from dst size
 -skipcrccheck Do not use CRC
check to determine if src is different from dest. Relevant only if -update is specified
 -f <urilist_uri> Use list at <urilist_uri> as src list
 -filelimit <n> Limit the total number of files to be <= n
 -sizelimit <n> Limit the total size to be <= n bytes
 -delete Delete the files
existing in the dst but not in src
 -mapredSslConf <f>
Filename of SSL configuration for mapper task

NOTE 1: if -overwrite or -update are set, each source URI is interpreted as an
isomorphic update to an existing directory.
For example:
hadoop distcp -p -update "hdfs://A:8020/user/foo/bar" "hdfs://B:8020/user/foo/baz"
would update all descendants of 'baz' also in 'bar'; it would *not* update /user/foo/baz/bar

NOTE 2: The parameter <n> in -filelimit and -sizelimit can be specified with symbolic representation. For examples,
1230k = 1230 * 1024 = 1259520
891g = 891 * 1024^3 = 956703965184

hadoop distcp hdfs://A:8020/path/one hdfs://B:8020/path/two
hadoop distcp /path/one /path/two