Friday, 16 May 2014

Multiple threads in a mapper i.e. MultithreadedMapper


 As the name suggests it is map task that spawns multiple threads. A map task can be considered as a process which runs on its own jvm boundary. Multithreaded spawns multiple threads within the same map task. Don’t confuse the same as multiple tasks within the same jvm (this is achieved with jvm reuse). When I say a task has multiple threads, a task would be reusing the input split as defined by the input format and record reader reads the input like a normal map task. The multi threading happens after this stage; once the record reading has happened then the input/task is divided into multiple threads. (ie the input IO is not multi threaded and multiple threads come into picture after that)
MultiThreadedMapper is a good fit if your operation is highly CPU intensive and multiple threads getting multiple cycles could help in speeding up the task. If IO intensive, then running multiple tasks is much better than multi thread as in multiple tasks multiple IO reads would be happening in parallel.
Let us see how we can use MultiThreadedMapper. There are different ways to do the same in old mapreduce API and new API.

Old API
Enable Multi threaded map runner as
-D mapred.map.runner.class = org.apache.hadoop.mapred.lib.MultithreadedMapRunner
Or
jobConf.setMapRunnerClass(org.apache.hadoop.mapred.lib.MultithreadedMapRunner);

New API
Your mapper class should sub class (extend) org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper instead of org.apache.hadoop.mapreduce.Mapper . The Multithreadedmapper has a different implementation of run() method.

You can set the number of threads within a mapper in MultiThreadedMapper by

MultithreadedMapper.setNumberOfThreads(n); or
mapred.map.multithreadedrunner.threads = n


Note: Don’t think it in a way that multi threaded mapper is better than normal map reduce as it spawns less jvms and less number of processes. If a mapper is loaded with lots of threads the chances of that jvm crashing are more and the cost of re-execution of such a hadoop task would be terribly high.
            Don’t use Multi Threaded Mapper to control the number of jvms spanned, if that is your goal you need to tweak the mapred.job.reuse.jvm.num.tasks parameter whose default value is 1, means no jvm reuse across tasks.
            The threads are at the bottom level ie within a map task and the higher levels on hadoop framework like the job has no communication regarding the same.


Friday, 2 May 2014

Hadoop Commands

hadoop command [genericOptions] [commandOptions]

hadoop fs
Usage: java FsShell
 [-ls <path>]
 [-lsr <path>]
 [-df [<path>]]
 [-du <path>]
 [-dus <path>]
 [-count[-q] <
path>]
 [-mv <src> <dst>]
 [-cp <src> <dst>]
 [-rm [-skipTrash] <path>]
 [-rmr [-skipTrash] <path>]
 [-expunge]
 [-put <localsrc> ... <dst>]
 [-copyFromLocal <localsrc> ... <dst>]
 [-moveFromLocal <localsrc> ... <dst>]
 [-get [-ignoreCrc] [-crc] <src> <localdst>]
 [-getmerge <src> <localdst> [addnl]]
 [-cat <src>]
 [-text <src>]
 [-copyToLocal [-ignoreCrc] [-crc] <src> <localdst>]
 [-moveToLocal [-crc] <src> <localdst>]
 [-mkdir <path>]
 [-setrep [-R] [-w] <rep> <path/file>]
 [-touchz <path>]
 [-kiran -[ezd] <path>]
 [-stat [format] <path>]
 [-tail [-f] <file>]
 [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
 [-chown [-R] [
OWNER][:[GROUP]] PATH...]
 [-chgrp [-R] GROUP PATH...]
 [-help [cmd]]

hadoop fs -ls /
hadoop fs -ls /kiran1/
hadoop fs -cat /kiran1/foo.txt
hadoop fs -rm /kiran1/foo.txt
hadoop fs -mkdir /kiran6/
hadoop fs -put foo.txt /kiran6/
hadoop fs -put /etc/note.txt /kiran2/note_fs.txt
hadoop fs -get /user/kiran/passwd ./
hadoop fs -setrep 5 -R /user/kiran/tmp/
hadoop fsck /user/kiran/tmp -files -blocks -locations
hadoop fs -chmod 1777 /tmp
hadoop fs -touch /user/kiran/kiran/foo
hadoop fs -rmr /user/kiran/kiran/foo
hadoop fs -touchz /user/kiran/kiran/bar
hadoop fs -count -q /user/kiran

hadoop -execute start-all.sh

hadoop job -list
hadoop job -kill jobID
hadoop job -list-attempt-ids jobID taskType taskState
hadoop job -kill-task taskAttemptId

hadoop namenode -format

hadoop jar <jar_file> wordcount <output_file>
hadoop jar /opt/hadoop/hadoop-examples-1.0.4.jar wordcount /out/wc_output

hadoop dfsadmin -report
hadoop dfsadmin -setSpaceQuota 10737418240 /user/esammer
hadoop dfsadmin -refreshNodes
hadoop dfsadmin -upgradeProgress status
hadoop dfsadmin -finalizeUpgrade

hadoop fsck
Usage: DFSck <path> [-move | -delete | -openforwrite] [-files [-blocks [-locations | -racks]]]
 <path>  -- start checking from this path
 -move  -- move
corrupted files to /lost+found
 -delete  -- delete corrupted files
 -files   --
print out files being checked
 -openforwrite --
print out files opened for write
 -blocks  --
print out block report
 -locations  --
print out locations for every block
 -racks  --
print out network topology for data-node locations
By default fsck ignores files opened for write, use -openforwrite to report such files. They are usually tagged CORRUPT or HEALTHY depending on their block allocation status.
hadoop fsck / -files -blocks -locations
hadoop fsck /user/kiran -files -blocks -locations

hadoop distcp                -- Distributed Copy (distcp)
distcp [OPTIONS] <srcurl>* <desturl>
 OPTIONS:
 -p[rbugp] Preserve status
 r: replication number
 b: block size
 u: user
 g: group
 p: permission
 -p alone is equivalent to -prbugp
 -i Ignore failures
 -log <logdir> Write logs to <logdir>
 -m <num_maps> Maximum number of simultaneous copies
 -overwrite Overwrite destination
 -update Overwrite if src size different from dst size
 -skipcrccheck Do not use CRC
check to determine if src is different from dest. Relevant only if -update is specified
 -f <urilist_uri> Use list at <urilist_uri> as src list
 -filelimit <n> Limit the total number of files to be <= n
 -sizelimit <n> Limit the total size to be <= n bytes
 -delete Delete the files
existing in the dst but not in src
 -mapredSslConf <f>
Filename of SSL configuration for mapper task

NOTE 1: if -overwrite or -update are set, each source URI is interpreted as an
isomorphic update to an existing directory.
For example:
hadoop distcp -p -update "hdfs://A:8020/user/foo/bar" "hdfs://B:8020/user/foo/baz"
would update all descendants of 'baz' also in 'bar'; it would *not* update /user/foo/baz/bar

NOTE 2: The parameter <n> in -filelimit and -sizelimit can be specified with symbolic representation. For examples,
1230k = 1230 * 1024 = 1259520
891g = 891 * 1024^3 = 956703965184

hadoop distcp hdfs://A:8020/path/one hdfs://B:8020/path/two
hadoop distcp /path/one /path/two


Hadoop Space Quotas, HDFS Block Size, Replication and Small Files

A common way to put restrictions on the resource usage of your Hadoop cluster is to set quotas. Recently, I ran into a problem where I could not upload a small file to an empty HDFS directory for which a space quota of 200MB was set.
I won’t go into the details of using quotas in Hadoop, but here it is in a nutshell. Hadoop differentiates between two kinds of quotas: name quotas and space quotas. The former limits thenumber of file and (sub)directory names whereas the latter limits the HDFS “disk” space of the directory (i.e. the number of bytes used by files under the tree rooted at this directory).
You can set HDFS name quotas with the command


1
$ hadoop dfsadmin -setQuota <max_number> <directory>
and you can set HDFS space quotas with the command


1
$ hadoop dfsadmin -setSpaceQuota <max_size> <directory>
To clear quotas, use -clrQuota and -clrSpaceQuota, respectively.
So much for the introduction. Recently, I stumbled upon a problem where Hadoop (version 0.20.2) reported a quota violation for a directory for which a space quota of 200 MB (209,715,200 bytes) was set but no name quota:


1
2
$ hadoop fs -count -q /user/kiran
none    inf    209715200    209715200    5   1   0
The output columns for fs -count -q are: QUOTA, REMAINING_QUOTA, SPACE_QUOTA, REMAINING_SPACE_QUOTA, DIR_COUNT, FILE_COUNT, CONTENT_SIZE, FILE_NAME.
The directory /user/kiran was empty and not a single byte was used yet (according to the quota report above). However, when I tried to upload a very small file to the directory, the upload failed with a (space) quota violation.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ hadoop fs -copyFromLocal small-file.txt /user/kiran
11/03/25 13:09:16 WARN hdfs.DFSClient: DataStreamer Exception: org.apache.hadoop.hdfs.protocol.DSQuotaExceededException: org.apache.hadoop.hdfs.protocol.DSQuotaExceededException: The DiskSpace quota of /user/kiran is exceeded: quota=209715200 diskspace consumed=384.0m
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java               :39)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorI               mpl.java:27)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:96)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:58)
at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBlock(DFSClient.java:293               9)
at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:28               19)
at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:2102)
at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2288)
11/03/25 13:09:16 WARN hdfs.DFSClient: Error Recovery for block null bad datanode[0] nodes == null
11/03/25 13:09:16 WARN hdfs.DFSClient: Could not get block locations. Source file "/user/kiran/small-file.txt" - Aborting...
copyFromLocal: org.apache.hadoop.hdfs.protocol.DSQuotaExceededException: The DiskSpace quota of /user/kiran is exceeded: quota=209715200 diskspace consumed=384.0m
11/03/25 13:09:16 ERROR hdfs.DFSClient: Exception closing file /user/kiran/small-file.txt :
org.apache.hadoop.hdfs.protocol.DSQuotaExceededException:
org.apache.hadoop.hdfs.protocol.DSQuotaExceededException: The DiskSpace quota of /user/kiran is exceeded: quota=209715200 diskspace consumed=384.0m
Clearly, the small file could not have exceeded the space quota of 200MB I thought. I also checked whether the Trash feature of Hadoop could be the culprit but that wasn’t the case.
Eventually, the mystery was solved: Hadoop checks space quotas during space allocation. This means that HDFS block size (here: 128MB) and the replication factor of the file (here: 3, i.e. the default value in the cluster set by the dfs.replication property) play an important role. In my case, this is what seems to have happened: When I tried to copy the local file to HDFS, Hadoop figured it would require a single block of 128MB size to store the small file. With replication factored in, the total space would be 3 * 128MB = 384 MB. And this would violate the space quota of 200MB.


1
2
  required_number_of_HDFS blocks * HDFS_block_size * replication_count
= 1 * 128MB * 3 = 384MB > 200MB.
I verified this by manually overwriting the default replication factor of 3 to 1 via


1
$ hadoop fs -D dfs.replication=1 -copyFromLocal small-file.txt /user/kiran 
which worked successfully.
To be honest, I was a bit puzzled at first because – remembering Tom White’s Hadoop book – a file in HDFS that is smaller than a single HDFS block does not occupy a full block’s worth of underlying storage (i.e. a kind of “sparse” use of storage).


Reference:- http://apache.hadoop.org and Michael G. Noll

Understanding HDFS Quotas and Hadoop Fs and Fsck Tools

In my experience Hadoop users often confuse the file size numbers reported by commands such as hadoop fsckhadoop fs -dus and hadoop fs -count -q when it comes to reasoning about HDFS space quotas. Here is a short summary how the various filesystem tools in Hadoop work in unison.
In this blog post we will look at three commands:

hadoop fsck and hadoop fs -dus

First, let’s start with hadoop fsck and hadoop fs -dus because they will report identical numbers.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$ hadoop fsck /path/to/directory
 Total size:    16565944775310 B    <=== see here
 Total dirs:    3922
 Total files:   418464
 Total blocks (validated):      502705 (avg. block size 32953610 B)
 Minimally replicated blocks:   502705 (100.0 %)
 Over-replicated blocks:        0 (0.0 %)
 Under-replicated blocks:       0 (0.0 %)
 Mis-replicated blocks:         0 (0.0 %)
 Default replication factor:    3
 Average block replication:     3.0
 Corrupt blocks:                0
 Missing replicas:              0 (0.0 %)
 Number of data-nodes:          18
 Number of racks:               1
FSCK ended at Thu Oct 20 20:49:59 CET 2011 in 7516 milliseconds

The filesystem under path '/path/to/directory' is HEALTHY

$ hadoop fs -dus /path/to/directory
hdfs://master:54310/path/to/directory        16565944775310    <=== see here
As you can see, hadoop fsck and hadoop fs -dus report the effective HDFS storage space used, i.e. they show the “normal” file size (as you would see on a local filesystem) and do not account for replication in HDFS. In this case, the directory path/to/directory has stored data with a size of 16565944775310 bytes (15.1 TB). Now fsck tells us that the average replication factor for all files in path/to/directory is exactly 3.0 This means that the total raw HDFS storage space used by these files – i.e. factoring in replication – is actually:
1
3.0 x 16565944775310 (15.1 TB) = 49697834325930 Bytes (45.2 TB)
This is how much HDFS storage is consumed by files in path/to/directory

hadoop fs -count -q

Now, let us inspect the HDFS quota set for path/to/directory. If we run hadoop fs -count -qwe get this result:
1
2
3
$ hadoop fs -count -q /path/to/directory
  QUOTA  REMAINING_QUOTA     SPACE_QUOTA  REMAINING_SPACE_QUOTA    DIR_COUNT  FILE_COUNT      CONTENT_SIZE FILE_NAME
   none              inf  54975581388800          5277747062870        3922       418464    16565944775310 hdfs://master:54310/path/to/directory
(I manually added the column headers like QUOTA to the output for making it easier to read.)
The seventh column CONTENT_SIZE is, again, the effective HDFS storage space used: 16565944775310 Bytes (15.1 TB).
The third column SPACE_QUOTA however, 54975581388800 is the raw HDFS space quota in bytes. The fourth column REMAINING_SPACE_QUOTA with 5277747062870 is the remaining raw HDFS space quota in bytes. Note that whereas hadoop fsck and hadoop fs -dus report the effective data size (= the same numbers you see on a local filesystem), the third and fourth columns of hadoop fs -count -q indirectly return how many bytes this data actually consumes across the hard disks of the distributed cluster nodes – and for this it is counting each of the 3.0 replicas of an HDFS block individually (here, the value 3.0 has been taken from the hadoop fsck output above and actually matches the default value of the replication count). So if we make the subtraction of these two quota-related numbers we get back the number from above:
1
54975581388800 (50 TB) - 5277747062870 (4.8 TB) = 49697834325930 (45.2 TB)
Now keep in mind that the Hadoop space quota always counts against the raw HDFS disk space consumed. So if you have a quota of 10MB, you can store only a single 1MB file if you set its replication to 10. Or you can store up to three 1MB files if their replication is set to 3. The reason why Hadoop’s quotas work like that is because the replication count of an HDFS file is a user-configurable setting. Though Hadoop ships with a default value of 3 it is up to the users to decide whether they want to keep this value or change it. And because Hadoop can’t anticipate how users might be playing around with the replication setting for their files, it was decided that the Hadoop quotas always operate on the raw HDFS disk space consumed.

Summary

If you never change the default value of 3 for the HDFS replication count of any files you store in your Hadoop cluster, this means in a nutshell that you should always multiply the numbers reported by hadoop fsck or hadoop fs -dus times 3 when you want to reason about HDFS space quotas.
Reported file sizeLocal filesystemhadoop fsck and
hadoop fs -dus
hadoop fs -count -q
(if replication factor == 3)
If a file was of size…1GB1GB3GB
I hope this clears things up a bit!
Reference  apache Hadoop, Michael G. Noll reference guide.