Skip to main content

Hadoop Distributed Cache example

Hadoop Distributed Cache Example
Distribute application-specific large, read-only files efficiently. DistributedCache is a facility provided by the Map-Reduce framework to cache files (text, archives, jars etc.) needed by applications. Applications specify the files, via urls (hdfs:// or http://) to be cached via the JobConf.
The DistributedCache assumes that the files specified via urls are already present on the FileSystem at the path specified by the url and are accessible by every machine in the cluster. The framework will copy the necessary files on to the slave node before any tasks for the job are executed on that node. Its efficiency stems from the fact that the files are only copied once per job and the ability to cache archives which are un-archived on the slaves.
Step 1 - Add all hadoop jar files to your java project. Add following jars.

/usr/local/hadoop/share/hadoop/common/*
/usr/local/hadoop/share/hadoop/common/lib/*
/usr/local/hadoop/share/hadoop/mapreduce/*
/usr/local/hadoop/share/hadoop/mapreduce/lib* 
/usr/local/hadoop/share/hadoop/yarn/*
/usr/local/hadoop/share/hadoop/yarn/lib/*
DistributedCacheExample.java
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

public class DistributedCacheExample {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    static enum CountersEnum { INPUT_WORDS }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private boolean caseSensitive;
    private Set<String> patternsToSkip = new HashSet<String>();

    private Configuration conf;
    private BufferedReader fis;

    @Override
    public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();
      caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
      if (conf.getBoolean("wordcount.skip.patterns", true)) {
        URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
        for (URI patternsURI : patternsURIs) {
          Path patternsPath = new Path(patternsURI.getPath());
          String patternsFileName = patternsPath.getName().toString();
          parseSkipFile(patternsFileName);
        }
      }
    }

    private void parseSkipFile(String fileName) {
      try {
        fis = new BufferedReader(new FileReader(fileName));
        String pattern = null;
        while ((pattern = fis.readLine()) != null) {
          patternsToSkip.add(pattern);
        }
      } catch (IOException ioe) {
        System.err.println("Caught exception while parsing the cached file '"
            + StringUtils.stringifyException(ioe));
      }
    }

    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String line = (caseSensitive) ?
          value.toString() : value.toString().toLowerCase();
      for (String pattern : patternsToSkip) {
        line = line.replaceAll(pattern, "");
      }
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
        Counter counter = context.getCounter(CountersEnum.class.getName(),
            CountersEnum.INPUT_WORDS.toString());
        counter.increment(1);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) {
      System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(DistributedCacheExample.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    List<String> otherArgs = new ArrayList<String>();
    for (int i=0; i < remainingArgs.length; ++i) {
      if ("-skip".equals(remainingArgs[i])) {
        job.addCacheFile(new Path(remainingArgs[++i]).toUri());
        job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
      } else {
        otherArgs.add(remainingArgs[i]);
      }
    }
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
$ hadoop jar /path/dc.jar DistributedCacheExample -Dwordcount.case.sensitive=true hdfs://localhost:9000/user/hduser/dcin hdfs://localhost:9000/user/hduser/dcout -skip hdfs://localhost:9000/user/hduser/pat/patterns.txt
Step 2 - Change the directory to /usr/local/hadoop/sbin

$ cd /usr/local/hadoop/sbin
Step 3 - Start all hadoop daemons

$ start-all.sh
Step 4 - Create file01.txt and file02.txt files. In my case, i have stored these files in /user/hduser/dcin/ folder HDFS.

file01.txt
Hello World, Bye World!
file02.txt
Hello Hadoop, Goodbye to hadoop.
Step 5 - Create a patterns.txt file In my case, i have stored this file in /user/hduser/pat/ folder HDFS.

patterns.txt
\.
\,
\!
to
Step 6 - Run your DistributedCacheExample program by submitting java project jar file to hadoop. Creating jar file is left to you.

$ hadoop jar /path/dc.jar DistributedCacheExample -Dwordcount.case.sensitive=true hdfs://localhost:9000/user/hduser/dcin hdfs://localhost:9000/user/hduser/dcout -skip hdfs://localhost:9000/user/hduser/pat/patterns.txt
Step 7 - Output

Step 8 - Dont forget to stop hadoop daemons.

$ stop-all.sh

Comments

Popular posts from this blog

Apache Spark WordCount scala example

Apache Spark is an open source cluster computing framework. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since. Spark provides an interface for programming entire clusters with implicit data parallelism and fault-tolerance. Pre Requirements 1) A machine with Ubuntu 14.04 LTS operating system 2) Apache Hadoop 2.6.4 pre installed ( How to install Hadoop on Ubuntu 14.04 ) 3) Apache Spark 1.6.1 pre installed ( How to install Spark on Ubuntu 14.04 ) Spark WordCount Scala Example Step 1 - Change the directory to /usr/local/spark/sbin. $ cd /usr/local/spark/sbin Step 2 - Start all spark daemons. $ ./start-all. sh Step 3 - The JPS (Java Virtual Machine Process Status Tool) tool is limited to reporting information on JVMs for which it has the access permissions. $ jp...

Hive hiveserver2 and Web UI usage

Hive hiveserver2 and Web UI usage HiveServer2 (HS2) is a server interface that enables remote clients to execute queries against Hive and retrieve the results (a more detailed intro here). The current implementation, based on Thrift RPC, is an improved version of HiveServer and supports multi-client concurrency and authentication. It is designed to provide better support for open API clients like JDBC and ODBC. Step 1 - Change the directory to /usr/local/hive/bin $ cd $HIVE_HOME/bin Step 2 - Start hiveserver2 daemon $ hiveserver2 OR $ hive --service hiveserver2 & Step 3 - You can browse to hiveserver2 web ui at following url http: //localhost:10002/hiveserver2.jsp Step 4 - You can see the hive logs in /tmp/hduser/hive. log To kill hiveserver2 daemon $ ps -ef | grep -i hiveserver2 $ kill - 9 29707 OR $ rm -rf /var/run/hive/hive...

Apache Spark Shell Usage

Apache Spark is an open source cluster computing framework. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since. Spark provides an interface for programming entire clusters with implicit data parallelism and fault-tolerance. Pre Requirements 1) A machine with Ubuntu 14.04 LTS operating system 2) Apache Hadoop 2.6.4 pre installed ( How to install Hadoop on Ubuntu 14.04 ) 3) Apache Spark 1.6.1 pre installed ( How to install Spark on Ubuntu 14.04 ) Spark Shell Usage The Spark shell provides an easy and convenient way to prototype certain operations quickly, without having to develop a full program, packaging it and then deploying it. Step 1 - Change the directory to /usr/local/hadoop/sbin. $ cd /usr/local/hadoop/sbin Step 2 - Start all hadoop daemons. $ ./start-all. sh ...