12.1 A Very Close Lookout
How to change the default block size
If you find that your map tasks is currently inefficient while parsing particular set of files then you may consider the option of changing the default block size of file in HDFS. You can very well achieve this by changing the value for dfs.block.size.
Explain Data Splits and Record Reader with example
Let us first understand this scenario involving Hadoop HDFS. Suppose we a file that contains 100 lines and each line is in the form <k,v> where:
- k being the offset of the line from the beginning
- v represents the content of the line
Now we say that we want to run N map tasks. In this case, does the framework divides input file into N splits and run each map task on that split? Do I need to write a partitioning function that should perform N splits and run each map task on the generated split? Let us understand what exactly happens here. First of all, Thanks to Hadoop for it being a Rack awareness system. When you initiate a job execution, there will be N mappers by default and the mapper on each and every machine will process the part of the data which is stored on this node. If you further explore you will understand that InputFormat take care of the split of the file, you need to look at the default TextInputFormat class. Actually all InputFormat classes are subclass of FileInputFormat and it takes care of the split. FileInputFormat's getSplit function generates a List of InputSplit from the available list of files defined in JobContext. FileInputFormat basically is the abstract class that defines how the input files are read and would be spilt up. FileInputFormat provides following functionalities:
- Select files that should be used as input
- Defines InputSplit that divides a file into task
Note that as per Hadoop’s basic functionality if there are N splits then there will be n mapper. Finally we can say that we just need to upload the data in HDFS and start a MapReduce Job and then the Hadoop will take care of the best optimized execution.
Explain Rack Awareness code
Rack consists of sequence of Data Nodes. Single Hadoop cluster may have one or more Rack depending on the size of the cluster. Let us understand how we have Racks arranged for small and large Hadoop cluster.
Small Cluster
This is a Hadoop cluster which is considerably smaller in size where all machines are connected by a single switch. In this type of cluster we normally have a single Rack. For Block Replication (where Replication Factor = 3) first Name Node copies data to one Data Node and then other two Data Nodes gets selected randomly.
Larger Clusters
This Hadoop cluster involves many Data Nodes and is of the bigger size. And these different Data Nodes are scattered over many Racks. While implementing Block Replication (where Replication Factor = 3) Hadoop will make first copy of Block at one of the nearby Rack and then place the other two copies of Blocks on other Rack (randomly on two different Data Nodes). Here in this scenario our data is available on two different Racks.
In Hadoop, Name Node and the Job Tracker gets list of all rack ids associated with their Slave Nodes, This makes HDFS very much the Rack Awareness. By using this information HDFS creates a mapping between IP address and received rack id. HDFS applies this knowledge for Block replication over different racks.
To implement Rack awareness in hadoop, we need to configure properties in core-site.xml file and needs to set the property in topology.script.file.name.
Writing Complex Pig Programs
Just like simple Pig programs we can write complex Pig programs. However complex Pig programs involves writing combination of one or many Pig functions. Just find below example:
- You can implement the subquery clause NOT IN With the help of outer join syntax
- If you want to group the data you can do it by
A = GROUP B BY (Employee, Department);
C = FOREACH D GENERATE group, sum(Name) AS Name, sum(Class) AS Class;
User Defined Functions in Pig
A User Defined Function is something you loads and uses. They are once created, can be used in future as and when we want. We can write them comfortably in programming language JAVA. Here are the steps to implement and make the User Defined Function work in your environment:
- Add your class path in the pig jar file. This is the pig-VERSION-core.jar in the pig directory. Add it in the tool from where you are writing JAVA syntax
- Inherit your class from EvalFunc (returns a tuple)
- The exec method is our main method and it should return the proper type
- Use a try catch block to handle errors. Need to make sure that we only return null if there are any problems. Once you return null then everything would be considered as null in the tuple and you can easily filter that out using standard pig stuff
- Use the TupleFactory singleton in order to get a tuple
Optimization in Pig
Performance improvement or the optimization in Pig is altogether a different process. Always remember that sometimes you can achieve the optimization by changing the flow of the program means adjust the amount of data flow in program. Make sure that you are not processing huge chunk of data in every section of program rather you should filter it out (as per the requirement) in early stage. You can take it like you are pushing filters higher in your script so that to reduce the amount of data you are storing in HDFS between MapReduce jobs. Pig’s logical optimizer will always push the filters up whenever it can do that.
Joins in the programming language are widely used. Pig has the capability to adjust the order of execution of the parts of the filter to achieve the better execution time. Note that the Pig pushes parts of the filter when it is not been able to push the filter as a whole unit. Sometimes when you not require any column to be part of end dataset then and also when it is not required in further processing then it is good idea to drop that particular column so that to save overhead of unnecessary column.
In one operations there is a possibility that you can combine in a multiquery, this will make Pig possible for combine it and it would be process at a time. But it is very important to choose the type of data wisely since sometimes it affects the execution time.
It is always better to choose the correct level of parallelism so that to not compromise the effective level of optimization. When processing data there is chance that least one row is corrupt or may cause an unexpected result. Take an example of division by zero, though no record is supposed to have a zero in the denominator. This will cause an entire job to fail over one bad record and it is definitely not good. To avoid this kind of failures, Pig:
- Inserts a null
- Issues a warning
- Continues processing
Pig is able to tell loader which field it needs and which key in a map it needs. Pig can also push down certain types of filters.
Note that it is always good to have good memory of your machines since some calculation requires your job to hold lot of information in memory. There are various adjustment that Pig can make so that to bring the correct level of optimization and make the execution better:
Result Size
Shuffle Size
Input Size
Output Size
12.2 Performance Tuning in Hadoop
Hadoop provides lot of functionalities and features (set of options) for implementing performance tuning. Basically you need to tune on every part of the Hadoop ecosystem. Note that most of the hadoop tasks are not CPU bounded, hence we usually look to optimize disk and usage of memory. Always remember that triggering swapping is not consider good for the optimization of tuning memory. By monitoring memory usage on server, you can find the best parameters of memory for tuning. Generally Disk Input Output is usually the performance blockage. We can mention mepred.compress.map.output in configuration file for compressing memory output. Here are some of the tips that you can follow for performance tuning in Hadoop -:
- Select finest number of Mappers and Reducers per node
- Mappers to Reducers ratio should be somewhere closer to 4:3
- Enhance buffers for shuffling and sorting
- Avoid disk falls by keeping the high buffer size
- Compress the intermediate data
- We should have enough number of hard disk drives in order to satisfy the expected storage requirements of the ongoing and near future workload
- Our configuration for the Map Reduce slots should be such that –
- Map phase receives as much as the CPU resources as possible
- All the processor cores gets utilized during the Reduce phase
- Once we fulfill Map and Reduce task’s heap requirement, then we should have been left with enough memory size of JAVA heap for caches and OS buffers. Just like an example if we have issued 7 JVM processes per hardware core at any given time and that is consuming as much as 7GB heap per hardware core. In this case we have 8GB of RAM per hardware core available on the systems. The remaining 1GB of memory per hardware core would be left for caches and OS buffers
- Compression is being supported in Hadoop at three different levels
- input data
- intermediate Map output data
- Reduce output data
There are other multiple codecs too that can be used for performing compression and decompression. Out of those some codecs have a comparing to better compression factor but at the same time takes longer to compress and to decompress. However some codec provides a good balance between the factor of compression and overhead of compression and decompression
- Configuration parameter mapred.job.reuse.jvm.num.tasks governs whether Map Reduce JVM processes spawned once are going to be reused for running more than 1 task. This property is present in mapred-site.xml and default value of this parameter is 1 that means JVM is not going to be reused for executing multiple tasks. However setting this value to -1 will indicate that an unlimited number of tasks can be scheduled on a particular JVM instance. Note that enabling JVM reuse policy will eventually reduce the overhead of JVM startup and teardown and at the same time it improves performance. JVM reuse policy is expected to specifically benefit in the scenarios where we have large number of very short running tasks going on
- If you notice the overall architecture of Hadoop, you will find that Reduce phase can be crucial in manipulating the overall execution time of Hadoop workloads. Reduce phase is always more network intensive and possibly be more Input Output based program in Hadoop workload. Depending on total number of Reduce slots assigned to the Hadoop job, the Java heap of Reduce JVMs could be more demanding as compare to Map JVMs
- Consider a scenario when a long running Reduce task fails and there are no additional Reduce slot available then in this case the failed task will not be rescheduled by Hadoop framework until there is a free Reduce slot available. We can set the value of mapred.max.tracker.failures property to a satisfactorily large value
- Finally choose every Hadoop cluster component precisely