Hadoop Distributed Cache Example
Distribute application-specific large, read-only files efficiently. DistributedCache is a facility provided by the Map-Reduce framework to cache files (text, archives, jars etc.) needed by applications. Applications specify the files, via urls (hdfs:// or http://) to be cached via the JobConf.
The DistributedCache assumes that the files specified via urls are already present on the FileSystem at the path specified by the url and are accessible by every machine in the cluster. The framework will copy the necessary files on to the slave node before any tasks for the job are executed on that node. Its efficiency stems from the fact that the files are only copied once per job and the ability to cache archives which are un-archived on the slaves.
Step 1 - Add all hadoop jar files to your java project. Add following jars.
DistributedCacheExample.java
Step 2 - Change the directory to /usr/local/hadoop/sbin
Step 3 - Start all hadoop daemons
Step 4 - Create file01.txt and file02.txt files. In my case, i have stored these files in /user/hduser/dcin/ folder HDFS.
Step 5 - Create a patterns.txt file In my case, i have stored this file in /user/hduser/pat/ folder HDFS.
Step 6 - Run your DistributedCacheExample program by submitting java project jar file to hadoop. Creating jar file is left to you.
Step 7 - Output
Step 8 - Dont forget to stop hadoop daemons.
Distribute application-specific large, read-only files efficiently. DistributedCache is a facility provided by the Map-Reduce framework to cache files (text, archives, jars etc.) needed by applications. Applications specify the files, via urls (hdfs:// or http://) to be cached via the JobConf.
The DistributedCache assumes that the files specified via urls are already present on the FileSystem at the path specified by the url and are accessible by every machine in the cluster. The framework will copy the necessary files on to the slave node before any tasks for the job are executed on that node. Its efficiency stems from the fact that the files are only copied once per job and the ability to cache archives which are un-archived on the slaves.
Step 1 - Add all hadoop jar files to your java project. Add following jars.
/usr/local/hadoop/share/hadoop/common/* /usr/local/hadoop/share/hadoop/common/lib/* /usr/local/hadoop/share/hadoop/mapreduce/* /usr/local/hadoop/share/hadoop/mapreduce/lib* /usr/local/hadoop/share/hadoop/yarn/* /usr/local/hadoop/share/hadoop/yarn/lib/*
import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.StringUtils; public class DistributedCacheExample { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ static enum CountersEnum { INPUT_WORDS } private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private boolean caseSensitive; private Set<String> patternsToSkip = new HashSet<String>(); private Configuration conf; private BufferedReader fis; @Override public void setup(Context context) throws IOException, InterruptedException { conf = context.getConfiguration(); caseSensitive = conf.getBoolean("wordcount.case.sensitive", true); if (conf.getBoolean("wordcount.skip.patterns", true)) { URI[] patternsURIs = Job.getInstance(conf).getCacheFiles(); for (URI patternsURI : patternsURIs) { Path patternsPath = new Path(patternsURI.getPath()); String patternsFileName = patternsPath.getName().toString(); parseSkipFile(patternsFileName); } } } private void parseSkipFile(String fileName) { try { fis = new BufferedReader(new FileReader(fileName)); String pattern = null; while ((pattern = fis.readLine()) != null) { patternsToSkip.add(pattern); } } catch (IOException ioe) { System.err.println("Caught exception while parsing the cached file '" + StringUtils.stringifyException(ioe)); } } @Override public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase(); for (String pattern : patternsToSkip) { line = line.replaceAll(pattern, ""); } StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); Counter counter = context.getCounter(CountersEnum.class.getName(), CountersEnum.INPUT_WORDS.toString()); counter.increment(1); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); String[] remainingArgs = optionParser.getRemainingArgs(); if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) { System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(DistributedCacheExample.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); List<String> otherArgs = new ArrayList<String>(); for (int i=0; i < remainingArgs.length; ++i) { if ("-skip".equals(remainingArgs[i])) { job.addCacheFile(new Path(remainingArgs[++i]).toUri()); job.getConfiguration().setBoolean("wordcount.skip.patterns", true); } else { otherArgs.add(remainingArgs[i]); } } FileInputFormat.addInputPath(job, new Path(otherArgs.get(0))); FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1))); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
$ hadoop jar /path/dc.jar DistributedCacheExample -Dwordcount.case.sensitive=true hdfs://localhost:9000/user/hduser/dcin hdfs://localhost:9000/user/hduser/dcout -skip hdfs://localhost:9000/user/hduser/pat/patterns.txt
$ cd /usr/local/hadoop/sbin
$ start-all.sh
file01.txt
Hello World, Bye World!
file02.txt
Hello Hadoop, Goodbye to hadoop.
patterns.txt
\. \, \! to
$ hadoop jar /path/dc.jar DistributedCacheExample -Dwordcount.case.sensitive=true hdfs://localhost:9000/user/hduser/dcin hdfs://localhost:9000/user/hduser/dcout -skip hdfs://localhost:9000/user/hduser/pat/patterns.txt
Step 8 - Dont forget to stop hadoop daemons.
$ stop-all.sh
Comments
Post a Comment