hadoop2-MapReduce详解

本文是对Hadoop2.2.0版本的MapReduce进行详细讲解。请大家要注意版本,因为Hadoop的不同版本,源码可能是不同的。 以下是本文的大纲: 1.获取源码 2.WordCount案例分析 3.客户端源码分析 4.小结 5.Mapper详解   5.1.map输入   5.2.map输出   5.3.map小结 6.Reduce详解 7.总结 若有不正之处,还请多多谅解,并希望批评指正。 请尊重作者劳动成果,转发请标明blog地址 https://www.cnblogs.com/hongten/p/hongten_hadoop_mapreduce.html 1.获取源码 大家可以下载Hbase Hbase: hbase-0.98.9-hadoop2-bin.tar.gz 在里面就包含了Hadoop2.2.0版本的jar文件和源码。 2.WordCount案例分析 在做详解之前,我们先来看一个例子,就是在一个文件中有一下的内容 复制代码 hello hongten 1 hello hongten 2 hello hongten 3 hello hongten 4 hello hongten 5 ...... ...... 复制代码 文件中每一行包含一个hello,一个hongten,然后在每一行最后有一个数字,这个数字是递增的。 我们要统计这个文件里面的单词出现的次数(这个可以在网上找到很多相同的例子) 首先,我们要产生这个文件,大家可以使用以下的java代码生成这个文件 复制代码 1 import java.io.BufferedWriter; 2 import java.io.File; 3 import java.io.FileWriter; 4 5 /** 6 * @author Hongten 7 * @created 11 Nov 2018 8 */ 9 public class GenerateWord { 10 11 public static void main(String[] args) throws Exception { 12 13 double num = 12000000; 14 15 StringBuilder sb = new StringBuilder(); 16 for(int i=1;i { 15 16 private final static IntWritable one = new IntWritable(1); 17 private Text word = new Text(); 18 19 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 20 StringTokenizer itr = new StringTokenizer(value.toString()); 21 while (itr.hasMoreTokens()) { 22 word.set(itr.nextToken()); 23 context.write(word, one); 24 } 25 } 26 27 } 复制代码 自定义的Reduce 复制代码 1 package com.b510.hongten.hadoop; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Reducer; 8 9 /** 10 * @author Hongten 11 * @created 11 Nov 2018 12 */ 13 public class MyReducer extends Reducer { 14 15 private IntWritable result = new IntWritable(); 16 17 public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { 18 int sum = 0; 19 for (IntWritable val : values) { 20 sum += val.get(); 21 } 22 result.set(sum); 23 context.write(key, result); 24 } 25 26 } 复制代码 运行并查看结果 复制代码 cd /home/hadoop-2.5/bin/ --创建测试文件夹 ./hdfs dfs -mkdir -p /usr/input/wordcount1 --把测试文件放入测试文件夹 ./hdfs dfs -put /root/word.txt /usr/input/wordcount1 --运行测试 ./hadoop jar /root/wordcount.jar com.b510.hongten.hadoop.WordCount --下载hdfs上面的文件 ./hdfs dfs -get /usr/output/wordcount/* ~/ --查看文件最后5行 tail -n5 /root/part-r-00000 复制代码 运行结果 从yarn客户端可以看到程序运行的时间长度 从11:47:46开始,到11:56:48结束,总共9min2s.(这是在我机器上面的虚拟机里面跑的结果,如果在真正的集群里面跑的话,应该要快很多) 数据条数:12000000-1条 3.客户端源码分析 当我们在客户端进行了分布式作业的配置后,最后执行 复制代码 // Submit the job, then poll for progress until the job is complete job.waitForCompletion(true); 复制代码 那么在waiteForCompletion()方法里面都做了些什么事情呢? 复制代码 //我们传递的verbose=true public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { //提交动作 submit(); } //verbose=true if (verbose) { //监控并且打印job的相关信息 //在客户端执行分布式作业的时候,我们能够看到很多输出 //如果verbose=false,我们则看不到作业输出信息 monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } //返回作业的状态 return isSuccessful(); } 复制代码 这个方法里面最重要的就是submit()方法,提交分布式作业。所以,我们需要进入submit()方法。 复制代码 public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); //设置新的API,我使用的2.2.0的HadoopAPI,区别于之前的API setUseNewAPI(); //和集群做连接,集群里面做出相应,分配作业ID connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { //提交作业 /* Internal method for submitting jobs to the system. The job submission process involves: 1. Checking the input and output specifications of the job. 2. Computing the InputSplits for the job. 3. Setup the requisite accounting information for the DistributedCache of the job, if necessary. 4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system. 5. Submitting the job to the JobTracker and optionally monitoring it's status. */ //在这个方法里面包含5件事情。 //1.检查输入和输出 //2.为每个job计算输入切片的数量 //3.4.提交资源文件 //5.提交作业,监控状态 //这里要注意的是,在2.x里面,已经没有JobTracker了。 //JobTracker is no longer used since M/R 2.x. //This is a dummy JobTracker class, which is used to be compatible with M/R 1.x applications. return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); } 复制代码 所以我们需要进入submitter.submitJObInternal()方法去看看里面的实现。 复制代码 //在这个方法里面包含5件事情。 //1.检查输入和输出 //2.为每个job计算输入切片的数量 //3.4.提交资源文件 //5.提交作业,监控状态 //这里要注意的是,在2.x里面,已经没有JobTracker了。 JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs checkSpecs(job); Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, job.getConfiguration()); //configure the command line options correctly on the submitting dfs Configuration conf = job.getConfiguration(); InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID(); //设置Job的ID job.setJobID(jobId); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try { conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); populateTokenCache(conf, job.getCredentials()); // generate a secret to authenticate shuffle transfers if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(SHUFFLE_KEY_LENGTH); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); //写切片信息,我们主要关系这个方法 :)) int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // Add HDFS tracking ids ArrayList trackingIds = new ArrayList(); for (Token t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); } // Write job file to submit dir writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // //到这里才真正提交job printTokens(jobId, job.getCredentials()); status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } } 复制代码 在这里我们关心的是 复制代码 int maps = writeSplits(job, submitJobDir); 复制代码 进入writeSplites()方法 复制代码 private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { //可以从job里面获取configuration信息 JobConf jConf = (JobConf)job.getConfiguration();
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信