<dfn id="7pnvb"><del id="7pnvb"><form id="7pnvb"></form></del></dfn>

      <form id="7pnvb"></form>

        <b id="7pnvb"><strike id="7pnvb"></strike></b>

          <form id="7pnvb"></form>
          現在的位置: 首頁 > 黃專家專欄 > 正文

          作業的提交和監控(二)

          2014年11月03日 黃專家專欄 ⁄ 共 4632字 ⁄ 字號 評論關閉

          文件分片

          函數

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
              Path jobSubmitDir) throws IOException,
              InterruptedException, ClassNotFoundException {
            JobConf jConf = (JobConf)job.getConfiguration();
            int maps;
            if (jConf.getUseNewMapper()) {
              maps = writeNewSplits(job, jobSubmitDir);
            } else {
              maps = writeOldSplits(jConf, jobSubmitDir);
            }
            return maps;
          }

          執行文件分片,并得到需要的 map 數目

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          37
          38
          39
          40
          41
          42
          43
          44
          45
          46
          47
          48
          49
          50
          51
          52
          53
          54
          55
          56
          57
          58
          public InputSplit[] getSplits(JobConf job, int numSplits)
            throws IOException {
            // 得到輸入文件的各種狀態
            FileStatus[] files = listStatus(job);
          
            // Save the number of input files in the job-conf
            // conf 中設置輸入文件的數目
            job.setLong(NUM_INPUT_FILES, files.length);
          
            // 計算總的大小
            long totalSize = 0;                           // compute total size
            for (FileStatus file: files) {                // check we have valid files
              if (file.isDir()) {
                throw new IOException("Not a file: "+ file.getPath());
              }
              totalSize += file.getLen();
            }
          
            // numSplits 傳進來的是 map 的數目
            // 獲得每一個分片的期望大小
            long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
          
            // 獲得最小的分片大小,這個可以在 mapred.min.split.size 中設置
            long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
                                    minSplitSize);
          
            // generate splits
            // 以下是生成分片的計算
            ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
            NetworkTopology clusterMap = new NetworkTopology();
            for (FileStatus file: files) {
              Path path = file.getPath();
              FileSystem fs = path.getFileSystem(job);
              long length = file.getLen();
              BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
              // isSplitable 是判斷該文件是否可以分片
              // 一般情況下都是可以的,但是如果是 stream compressed 的方式,那么是不可以的
              if ((length != 0) && isSplitable(fs, path)) { 
                long blockSize = file.getBlockSize();
          
                // 計算每一個分片大小的實際函數
                // 得到真實的分片大小
                long splitSize = computeSplitSize(goalSize, minSize, blockSize);
          
                long bytesRemaining = length;
          
                // 允許最后一個分片在 SPLIT_SLOP(默認 1.1) 比例之下
                while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                  String[] splitHosts = getSplitHosts(blkLocations, 
                      length-bytesRemaining, splitSize, clusterMap);
                  // 加入分片
                  splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                      splitHosts));
                  bytesRemaining -= splitSize;
                }
          
                // 加入最后一個分片
                // 這個比例最大不超過期望分片的 1.1
                if (bytesRemaining != 0) {
                  splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
                             blkLocations[blkLocations.length-1].getHosts()));
                }
              } else if (length != 0) {
                String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
                splits.add(new FileSplit(path, 0, length, splitHosts));
              } else { 
                //Create empty hosts array for zero length files
                splits.add(new FileSplit(path, 0, length, new String[0]));
              }
            }
            LOG.debug("Total # of splits: " + splits.size());
            return splits.toArray(new FileSplit[splits.size()]);
          }
          
          protected long computeSplitSize(long goalSize, long minSize,
                                               long blockSize) {
            // 計算分片大小,很明顯
            // 這里設定了最大最小值,每一個分片大小在 minSize 和 blockSize 之間
            return Math.max(minSize, Math.min(goalSize, blockSize));
          }

          這樣看,要想設置超過大于 block size 的也是可以的,只要將 minSize 設置很大即可 以上分片算法只是單純計算需要多少個 map ,根據設定的 mapred.map.tasks 計算出這個任務需要多少個 map 最終的 map 數目,可能和 mapred.map.tasks 不同

          但是這樣仍然會有一個問題,就是這個只是按照輸入文件的大小做邏輯的切分,但是如果文件中含有邊界(比如 Text 文件就是以行作為邊界),那么實際的劃分就不一定是這樣的。

          這個是由 RecordReader 實現的,它將某一個 split 解析成一個個 key 和 value 對

          我們看看實際的 TextInputFormat 類,它其實生成了 LineRecordReader

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          37
          38
          39
          40
          41
          42
          43
          44
          45
          46
          47
          48
          public LineRecordReader(Configuration job, FileSplit split,
              byte[] recordDelimiter) throws IOException {
            this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                            Integer.MAX_VALUE);
          
            // 得到文件開始和結束的位置
            start = split.getStart();
            end = start + split.getLength();
            final Path file = split.getPath();
            compressionCodecs = new CompressionCodecFactory(job);
            final CompressionCodec codec = compressionCodecs.getCodec(file);
          
            // open the file and seek to the start of the split
            FileSystem fs = file.getFileSystem(job);
            FSDataInputStream fileIn = fs.open(split.getPath());
          
            // skipFirstLine 表示跳過第一行
            boolean skipFirstLine = false;
            if (codec != null) {
              in = new LineReader(codec.createInputStream(fileIn), job,
                    recordDelimiter);
              end = Long.MAX_VALUE;
            } else {
              if (start != 0) {
                // 如果開始的位置不是整個文件的開始
                // 那么,有可能是在行的中間, LineRecordReader 的處理方式是跳過這行,從下一行處理起
                skipFirstLine = true;
                --start;
                fileIn.seek(start);
              }
              in = new LineReader(fileIn, job, recordDelimiter);
            }
            if (skipFirstLine) {  // skip first line and re-establish "start".
                // 跳過第一行
              start += in.readLine(new Text(), 0,
                                   (int)Math.min((long)Integer.MAX_VALUE, end - start));
            }
            this.pos = start;
          }
          
          public synchronized boolean next(LongWritable key, Text value)
            throws IOException {
          
            while (pos < end) {
              key.set(pos);
          
              // 在這里, 會處理一個完整行
              // 但是有可能最后一行的另外一個部分在另一個 split 里面
              // 但是 FSDataInputStream fileIn 作為一個抽象,這樣的操作使得對 Reader 透明了
              int newSize = in.readLine(value, maxLineLength,
                                        Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
                                                 maxLineLength));
              if (newSize == 0) {
                return false;
              }
              pos += newSize;
              if (newSize < maxLineLength) {
                return true;
              }
          
              // line too long. try again
              LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
            }
          
            return false;
          }

          以上代碼我們可以知道,TextInputFormat 生成的 LineRecordReader 會根據行邊界來切分,避免了 split 邏輯分片不考慮邊界的情況。

          其實 SequenceFileInputFormat 輸入也同樣有邊界問題,這是根據創建時候的序列點來實現的。 具體代碼可以看 SequenceFileRecordReader 里面的實現

          抱歉!評論已關閉.

          新婚之夜我被十几个男人一起_60歳の熟女セックス_肚兜下的浑圆被揉捏np_漂亮的小峓子4在钱免费

              <dfn id="7pnvb"><del id="7pnvb"><form id="7pnvb"></form></del></dfn>

              <form id="7pnvb"></form>

                <b id="7pnvb"><strike id="7pnvb"></strike></b>

                  <form id="7pnvb"></form>