<dfn id="7pnvb"><del id="7pnvb"><form id="7pnvb"></form></del></dfn>

      <form id="7pnvb"></form>

        <b id="7pnvb"><strike id="7pnvb"></strike></b>

          <form id="7pnvb"></form>
          現在的位置: 首頁 > 黃專家專欄 > 正文

          作業的提交和監控(一)

          2014年11月03日 黃專家專欄 ⁄ 共 4672字 ⁄ 字號 評論關閉

          整體流程

          簡單的代碼就可以運行一個作業

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          // Create a new JobConf
          JobConf job = new JobConf(new Configuration(), MyJob.class);
          
          // Specify various job-specific parameters     
          job.setJobName("myjob");
          
          job.setInputPath(new Path("in"));
          job.setOutputPath(new Path("out"));
          
          job.setMapperClass(MyJob.MyMapper.class);
          job.setReducerClass(MyJob.MyReducer.class);
          
          // Submit the job, then poll for progress until the job is complete
          JobClient.runJob(job);

          runJob 做的最主要兩個過程是提交監控。 提交在函數 submitJobInternal 中執行,監控在函數 monitorAndPrintJob 中執行。我們看看 submitJobInternal 里面的代碼

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          37
          38
          39
          40
          41
          42
          43
          44
          45
          46
          47
          48
          49
          50
          51
          52
          53
          54
          55
          56
          57
          58
          59
          60
          61
          62
          63
          64
          65
          66
          67
          68
          69
          70
          71
          72
          73
          74
          75
          76
          77
          78
          79
          80
          81
          82
          83
          84
          85
          86
          87
          88
          89
          90
          91
          92
          93
          94
          95
          96
          97
          98
          99
          100
          101
          102
          103
          104
          105
          106
          107
          108
          109
          110
          111
          112
          113
          114
          115
          116
          117
          118
          119
          120
          121
          122
          123
          124
          125
          126
          127
          128
          return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
            public RunningJob run() throws FileNotFoundException, 
            ClassNotFoundException,
            InterruptedException,
            IOException {
              JobConf jobCopy = job;
          
              // 指定作業文件的上傳路徑
              // 默認在 mapreduce.jobtracker.staging.root.dir 中指定
              // 比如 mapreduce.jobtracker.staging.root.dir = /user
              // 后面加上用戶名作為文件夾名字
              // 一般是這樣 hdfs://host:port/user/${user}/.staging
              Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
                  jobCopy);
          
              // 得到一個 jobID
              // jobID 是根據日期和時間生成的
              // 假設是 job_201410312221_0001
              JobID jobId = jobSubmitClient.getNewJobId();
          
              // 得到目錄的路徑
              // 可能是這樣 hdfs://host:port/user/${user}/.staging/job_201410312221_0001
              Path submitJobDir = new Path(jobStagingArea, jobId.toString());
          
              // 設置配置文件,將 mapreduce.job.dir 設置為作業文件上傳目錄
              jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
              JobStatus status = null;
              try {
                // 得到 token
                populateTokenCache(jobCopy, jobCopy.getCredentials());
          
                // copy 文件到上傳目錄中
                // 支持以下這幾種格式
                // -files -libjars -archives
                // 在如下的三種目錄里:
                // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/files
                // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/archives
                // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/libjars
                copyAndConfigureFiles(jobCopy, submitJobDir);
          
                // get delegation token for the dir
                TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
                                                    new Path [] {submitJobDir},
                                                    jobCopy);
          
                Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
                    // 得到 reduces 的個數
                int reduces = jobCopy.getNumReduceTasks();
          
                // 設置本機 ip 地址
                InetAddress ip = InetAddress.getLocalHost();
                if (ip != null) {
                  job.setJobSubmitHostAddress(ip.getHostAddress());
                  job.setJobSubmitHostName(ip.getHostName());
                }
                JobContext context = new JobContext(jobCopy, jobId);
          
                jobCopy = (JobConf)context.getConfiguration();
          
                // Check the output specification
                // 檢查指定的輸出目錄
                // 如果輸出文件夾存在, 那么會拋出異常
                // checkOutputSpecs 在 FileOutputFormat.java 中
                if (reduces == 0 ? jobCopy.getUseNewMapper() : 
                  jobCopy.getUseNewReducer()) {
                    // 如果 readuce 數目是 0,但是 mapper 的數目不為 0
                    // 得到指定的輸入類型
                    // 默認是 TextOutputFormat
                  org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
                    ReflectionUtils.newInstance(context.getOutputFormatClass(),
                        jobCopy);
                  output.checkOutputSpecs(context);
                } else {
                  jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
                }
          
                // Create the splits for the job
                // 計算 job 的輸入文件的分片
                FileSystem fs = submitJobDir.getFileSystem(jobCopy);
                LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
                // 設置 map
                int maps = writeSplits(context, submitJobDir);
                jobCopy.setNumMapTasks(maps);
          
                // write "queue admins of the queue to which job is being submitted"
                // to job file.
                // 得到 queue 的名字
                String queue = jobCopy.getQueueName();
                // 然后根據這個 queue 名字獲得訪問控制列表
                AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
                jobCopy.set(QueueManager.toFullPropertyName(queue,
                    QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());
          
                // Write job file to JobTracker's fs  
                // 將重新配置過的 JobConf 寫入到 submitJobDir/job.xml 文件
                FSDataOutputStream out = 
                  FileSystem.create(fs, submitJobFile,
                      new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
          
                // removing jobtoken referrals before copying the jobconf to HDFS
                // as the tasks don't need this setting, actually they may break
                // because of it if present as the referral will point to a
                // different job.
                TokenCache.cleanUpTokenReferral(jobCopy);
          
                try {
                  jobCopy.writeXml(out);
                } finally {
                  out.close();
                }
                //
                // Now, actually submit the job (using the submit name)
                //
                // 提交 job
                // 如果不是 local 模式,那么這個 submitJob 是一個 rpc 調用
                // 調用的是遠程機子上的 JobTracker.submitJob
                // 如果是 local 模式, 就是調用 LocalJobRunner.submitJob
                printTokens(jobId, jobCopy.getCredentials());
                status = jobSubmitClient.submitJob(
                    jobId, submitJobDir.toString(), jobCopy.getCredentials());
                if (status != null) {
                  return new NetworkedJob(status);
                } else {
                  throw new IOException("Could not launch job");
                }
              } finally {
                if (status == null) {
                  LOG.info("Cleaning up the staging area " + submitJobDir);
                  if (fs != null && submitJobDir != null)
                    fs.delete(submitJobDir, true);
                }
              }
            }
          }

          主要的流程如下:

          1. 找到 hdfs 上的 Staging 路徑
          2. 向 JobTracker 申請一個 job id
          3. 根據 job id 在 hdfs 上生成上傳文件的目錄
          4. copy 指定文件到上傳的目錄中去
          5. 得到 reduce 個數,設置本機 ip
          6. 檢查輸出目錄是否存在, 如果不存在,則拋出異常。這其實有一點不太合理的地方,因為是先上傳作業文件,再判斷是否輸入目錄存在。如果正常運行完作業,上傳的文件是能被清理的,但是,如果輸出文件異常,那么這些上傳的文件就得不到清理?
          7. 計算 job 的輸入文件的分片, 根據這個設置 map 的數量
          8. 得到 queue name 和相關授權
          9. 將重新配置過的 JobConf 寫入到 submitJobDir/job.xml 文件
          10. 提交 job

          抱歉!評論已關閉.

          新婚之夜我被十几个男人一起_60歳の熟女セックス_肚兜下的浑圆被揉捏np_漂亮的小峓子4在钱免费

              <dfn id="7pnvb"><del id="7pnvb"><form id="7pnvb"></form></del></dfn>

              <form id="7pnvb"></form>

                <b id="7pnvb"><strike id="7pnvb"></strike></b>

                  <form id="7pnvb"></form>