使用 Hadoop Mapreduce 进行数据处理

1. 综述

  使用HDP(下载: http://zh.hortonworks.com/products/releases/hdp-2-3/#install)搭建环境,进行分布式数据处理。

  项目文件下载,解压文件后将看到项目文件夹。该程序将读取 cloudMR/internal_use/tmp/dataset/titles 目录下的四个文本文件, 文件中的每一行文本都是来自于 wikipedia 的一个标题, 读取每个标题,并使用 cloudMR/internal_use/tmp/dataset/misc/delimiters.txt 中指定的特殊符号分割标题成独立单词,然后将单词转换为全小写,然后将出现在
cloudMR/internal_use/tmp/dataset/misc/stopwords.txt 中的单词全部删除,最后统计剩余单词的出现次数,并输出。

  程序的编译过程需要在“~/.bashrc”文件内定义自己的环境变量“$hadoop_CLASSPATH”,在“~/.bashrc”文件中添加一行:  

export hadoop_CLASSPATH="/usr/hdp/2.3.2.0-2950/hadoop/conf:/usr/hdp/2.3.2.0-2950/hadoop/conf:/usr/hdp/2.3.2.0-2950/hadoop/conf:/usr/hdp/2.3.2.0-2950/hadoop/lib/*:/usr/hdp/2.3.2.0-2950/hadoop/.//*:/usr/hdp/2.3.2.0-2950/hadoop-hdfs/./:/usr/hdp/2.3.2.0-2950/hadoop-hdfs/lib/*:/usr/hdp/2.3.2.0-2950/hadoop-hdfs/.//*:/usr/hdp/2.3.2.0-2950/hadoop-yarn/lib/*:/usr/hdp/2.3.2.0-2950/hadoop-yarn/.//*:/usr/hdp/2.3.2.0-2950/hadoop-mapreduce/lib/*:/usr/hdp/2.3.2.0-2950/hadoop-mapreduce/.//*:::/usr/share/java/mysql-connector-java-5.1.17.jar:/usr/share/java/mysql-connector-java-5.1.31-bin.jar:/usr/share/java/mysql-connector-java.jar:/usr/hdp/2.3.2.0-2950/tez/*:/usr/hdp/2.3.2.0-2950/tez/lib/*:/usr/hdp/2.3.2.0-2950/tez/conf:/usr/hdp/current/hadoop-yarn-client/.//*:/usr/hdp/current/hadoop-yarn-client/lib/*"

  

 

2. 运行过程

Step (1): 将项目文件夹放入HDP虚拟机,进入cloudMR文件夹,运行下列命令启动:

./start.sh

要求输入账号,随意输入10位数字即可。再运行下列命令检查 hadoop 是否正常运行:

hadoop version

 

Step (2): 编写 TitleCount.java 文件,完成相应功能。完成后的 TitleCount.java 如下:

 

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.*;
import java.util.*;
/**
 * Classic "Word Count"
 */
public class TitleCount extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new TitleCount(), args);
        System.exit(res);
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(this.getConf(), "Title Count");
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setMapperClass(TitleCountMap.class);
        job.setReducerClass(TitleCountReduce.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setJarByClass(TitleCount.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static String readHDFSFile(String path, Configuration conf) throws IOException{
        Path pt=new Path(path);
        FileSystem fs = FileSystem.get(pt.toUri(), conf);
        FSDataInputStream file = fs.open(pt);
        BufferedReader buffIn=new BufferedReader(new InputStreamReader(file));

        StringBuilder everything = new StringBuilder();
        String line;
        while( (line = buffIn.readLine()) != null) {
            everything.append(line);
            everything.append("\n");
        }
        return everything.toString();
    }
    
    public static class TitleCountMap extends Mapper<Object, Text, Text, IntWritable> {
        Set<String> stopWords = new HashSet<String>();
        String delimiters;

        @Override
        protected void setup(Context context) throws IOException,InterruptedException {

            Configuration conf = context.getConfiguration();
            
            String delimitersPath = conf.get("delimiters");
            delimiters = readHDFSFile(delimitersPath, conf);
            
            String stopWordsPath = conf.get("stopwords");
            List<String> stopWordsList = Arrays.asList(readHDFSFile(stopWordsPath, conf).split("\n"));
            for(String e : stopWordsList){
                stopWords.add(e);
            }
        }

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer stk = new StringTokenizer(value.toString(),delimiters);
            while(stk.hasMoreTokens()){
                String e = stk.nextToken().trim().toLowerCase();
                if(stopWords.contains(e) == false){
                    context.write(new Text(e),new IntWritable(1));
                }
            }
        }
    }

    public static class TitleCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for(IntWritable e : values){
                sum += e.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }
}

 

Step (3): 编译java源文件。为了方便,在cloudMR文件夹中新建output文件夹,用来存放编译生成的.class文件。使用以下命令(在cloudMR文件夹下执行):

mkdir output
javac -classpath $Hadoop_CLASSPATH -d output TitleCount.java

进入output文件夹会看到3个.class文件。

 

Step (4): 将编译生成的类文件打包。

首先在 cloudMR 文件夹下新建文本文件 manifest.mf,使用以下命令(在 cloudMR 文件夹下执行):

touch manifest.mf

编辑内容为

Main-Class: TitleCount.class

manifest.mf 内是一些关于这个包的信息,这里定义了主类。

再使用下面命令打包(在cloudMR文件夹下执行):

jar cvfM TitleCount.jar manifest.mf -C output/ .

这条命令的含义是;

jar        打包命令

cvfM       

TitleCount.jar   打成的包的名字

manifest.mf     将这个文件打进包里

-C        -C之后的文件夹内的所有文件打进包里

output/      将output文件夹内的文件全部打进包里

.          打成的包TitleCount.jar放在当前文件夹

注意:打包过程很重要而且易错,请务必按照上文所述步骤进行。

 

Step (5): 将 TitleCount.jar 发布。

在发布(yarn)之前,还要完成准备工作。

将相关文件:cloudMR/internal_use/tmp/dataset/titles 目录下的四个文本文件、cloudMR/internal_use/tmp/dataset/misc/delimiters.txt 、
cloudMR/internal_use/tmp/dataset/misc/stopwords.txt 上传到hdfs.

在 hdfs 的 /user/root/ 文件夹内新建 data 文件夹,将 delimiters.txt、stopwords.txt 放入 data 文件夹,再在 data 文件夹中新建 titles 文件夹,将cloudMR/internal_use/tmp/dataset/titles 目录下的四个文本文件放入 titles 文件夹。

介绍相关的命令:

hadoop fs -ls                                         列出hdfs目录,由于没有参数,列出的是当前用户的主目录

hadoop fs -ls /                                         列出hdfs根目录

hadoop fs -mkdir data                                       在默认目录下新建data目录

hadoop fs -mkdir data/titles                               在data目录中新建 titles目录

hadoop fs -copyFromLocal ./abc.txt  data           上传当前目录(本地)中的 abc.txt 到 hdfs 上的 data 目录

  之后便可以发布了,使用命令:  

yarn jar TitleCount.jar TitleCount -D delimiters="/user/root/data/delimiters.txt" -D stopwords="/user/root/data/stopwords.txt" data/titles output

这条命令的含义是;

yarn                              发布内容

jar                                要发布的内容为jar包

TitleCount.jar                发布的内容

TitleCount                 TitleCount.jar的入口

-D delimiters="/user/root/data/delimiters.txt" -D stopwords="/user/root/data/stopwords.txt"       -D后跟参数,这里定义了两个参数

data/titles                     输入文件夹,其内的文件作为Map 的输入

output                          输出文件存放的位置

  yarn 命令执行完毕后,即可查看运行结果。