免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 4773 | 回复: 0

[Hadoop&HBase] hadoop map/reduce文章阅读笔记 [复制链接]

论坛徽章:
0
发表于 2010-01-22 15:37 |显示全部楼层

                                文章来自
这里
Map/Reduce - User Interfaces
This section provides a reasonable amount of detail on every user-facing
      aspect of the Map/Reduce framwork. This should help users implement,
      configure and tune their jobs in a fine-grained manner. However, please
      note that the javadoc for each class/interface remains the most
      comprehensive documentation available; this is only meant to be a tutorial.
      
Let us first take the Mapper and Reducer
      interfaces. Applications typically implement them to provide the
      map and reduce methods.
We will then discuss other core interfaces including
      JobConf, JobClient, Partitioner,
      OutputCollector, Reporter,
      InputFormat, OutputFormat,
      OutputCommitter and others.
Finally, we will wrap up by discussing some useful features of the
      framework such as the DistributedCache,
      IsolationRunner etc.
Payload
Applications typically implement the Mapper and
        Reducer interfaces to provide the map and
        reduce methods. These form the core of the job.
Mapper
          Mapper maps input key/value pairs to a set of intermediate
          key/value pairs.
/*map产生的中间对和原始的对不一定是相同的*/
Maps are the individual tasks that transform input records into
          intermediate records. The transformed intermediate records do not need
          to be of the same type as the input records. A given input pair may
          map to zero or many output pairs.
/*InputSplit将输入文件切分,将切分后的数据传给map,至于怎样在这些数据中产生对,是由自定义的map类来实现的,*/
The Hadoop Map/Reduce framework spawns one map task for each
          InputSplit generated by the InputFormat for
          the job.
Overall, Mapper implementations are passed the
          JobConf for the job via the
         
          JobConfigurable.configure(JobConf) method and override it to
          initialize themselves. The framework then calls
         
          map(WritableComparable, Writable, OutputCollector, Reporter) for
          each key/value pair in the InputSplit for that task.        
          Applications can then override the
         
          Closeable.close() method to perform any required cleanup.
/*怎样输出pair对*/
Output pairs do not need to be of the same types as input pairs. A
          given input pair may map to zero or many output pairs.  Output pairs
          are collected with calls to
         
          OutputCollector.collect(WritableComparable,Writable).
Applications can use the Reporter to report
          progress, set application-level status messages and update
          Counters, or just indicate that they are alive.
/*OutputKeyComparatorClass的作用,在Reduce之前进行Group操作。这里应该还有一个Region数据按Key的合并操作*/
All intermediate values associated with a given output key are
          subsequently grouped by the framework, and passed to the
          Reducer(s) to  determine the final output. Users can
          control the grouping by specifying a Comparator via
         
          JobConf.setOutputKeyComparatorClass(Class).
/*map的输出是排序的,每个节点上的Region数目与Reducer的数目相等,每个Reducer处理相同Region的数据(也就是包含相同Key的数据)。*/
The Mapper outputs are sorted and then
          partitioned per Reducer. The total number of partitions is
          the same as the number of reduce tasks for the job. Users can control
          which keys (and hence records) go to which Reducer by
          implementing a custom Partitioner.
/*Combiner的作用,执行Map之后在本地进行aggregation,就是从多个对生成这样的Key list,这样可以减少在Map和Reducer之间传输的数据量*/
Users can optionally specify a combiner, via
         
          JobConf.setCombinerClass(Class), to perform local aggregation of
          the intermediate outputs, which helps to cut down the amount of data
          transferred from the Mapper to the Reducer.
/*中间数据的存储形式,以及压缩方法*/
The intermediate, sorted outputs are always stored in a simple
          (key-len, key, value-len, value) format.
          Applications can control if, and how, the
          intermediate outputs are to be compressed and the
         
          CompressionCodec to be used via the JobConf.
/*设置Maper Num的原则*/
How Many Maps?
The number of maps is usually driven by the total size of the
            inputs, that is, the total number of blocks of the input files.
The right level of parallelism for maps seems to be around 10-100
            maps per-node, although it has been set up to 300 maps for very
            cpu-light map tasks. Task setup takes awhile, so it is best if the
            maps take at least a minute to execute.
Thus, if you expect 10TB of input data and have a blocksize of
            128MB, you'll end up with 82,000 maps, unless
            
            setNumMapTasks(int) (which only provides a hint to the framework)
            is used to set it even higher.
Reducer
          Reducer reduces a set of intermediate values which share a key to
          a smaller set of values.
The number of reduces for the job is set by the user
          via
          JobConf.setNumReduceTasks(int).
Overall, Reducer implementations are passed the
          JobConf for the job via the
         
          JobConfigurable.configure(JobConf) method and can override it to
          initialize themselves. The framework then calls   
         
          reduce(WritableComparable, Iterator, OutputCollector, Reporter)
          method for each  
          pair in the grouped inputs. Applications can then override the           
         
          Closeable.close() method to perform any required cleanup.
/*Reduce的三个子操作,混洗,排序,规约*/
Reducer has 3 primary phases: shuffle, sort and reduce.
/*混洗,Map的排序输出作为Reduce的输入,所有Mapper中的相应Region会被送到相应的Reducer去执行,通过http协议*/
Shuffle
Input to the Reducer is the sorted output of the
            mappers. In this phase the framework fetches the relevant partition
            of the output of all the mappers, via HTTP.
/*对于不用Mapper传来的数据,Reducer还要进行一次分组,保证相同的Key被Group到一起,假如Partitioner执行Hash,那么不同的Key在Hash之后可能被放到一个Region里面,然后被传送到同一个Reducer,所以在这里还要进行一个Group*/
Sort
The framework groups Reducer inputs by keys (since
            different mappers may have output the same key) in this stage.
/*shuffle和sort是同时执行的*/
The shuffle and sort phases occur simultaneously; while
            map-outputs are being fetched they are merged.
/*有点晕了,这里!!!指的是单独用于Group的操作?*/
Secondary Sort
If equivalence rules for grouping the intermediate keys are
              required to be different from those for grouping keys before
              reduction, then one may specify a Comparator via
              
              JobConf.setOutputValueGroupingComparator(Class). Since
              
              JobConf.setOutputKeyComparatorClass(Class) can be used to
              control how intermediate keys are grouped, these can be used in
              conjunction to simulate secondary sort on values.
Reduce
In this phase the
            
            reduce(WritableComparable, Iterator, OutputCollector, Reporter)
            method is called for each  
            pair in the grouped inputs.
The output of the reduce task is typically written to the
            
            FileSystem via
            
            OutputCollector.collect(WritableComparable, Writable).
Applications can use the Reporter to report
            progress, set application-level status messages and update
            Counters, or just indicate that they are alive.
The output of the Reducer is not sorted.
How Many Reduces?
The right number of reduces seems to be 0.95 or
            1.75 multiplied by (no. of nodes> *
            mapred.tasktracker.reduce.tasks.maximum).
With 0.95 all of the reduces can launch immediately
            and start transfering map outputs as the maps finish. With
            1.75 the faster nodes will finish their first round of
            reduces and launch a second wave of reduces doing a much better job
            of load balancing.
Increasing the number of reduces increases the framework overhead,
            but increases load balancing and lowers the cost of failures.
The scaling factors above are slightly less than whole numbers to
            reserve a few reduce slots in the framework for speculative-tasks and
            failed tasks.
Reducer NONE
It is legal to set the number of reduce-tasks to zero if
            no reduction is desired.
In this case the outputs of the map-tasks go directly to the
            FileSystem, into the output path set by
            
            setOutputPath(Path). The framework does not sort the
            map-outputs before writing them out to the FileSystem.
            
/*partition的输入是map-outputs*/
Partitioner
          Partitioner partitions the key space.
Partitioner controls the partitioning of the keys of the
          intermediate map-outputs. The key (or a subset of the key) is used to
          derive the partition, typically by a hash function. The total
          number of partitions is the same as the number of reduce tasks for the
          job. Hence this controls which of the m reduce tasks the
          intermediate key (and hence the record) is sent to for reduction.
          HashPartitioner is the default Partitioner.
Reporter
          Reporter is a facility for Map/Reduce applications to report
          progress, set application-level status messages and update
          Counters.
Mapper and Reducer implementations can use
          the Reporter to report progress or just indicate
          that they are alive. In scenarios where the application takes a
          significant amount of time to process individual key/value pairs,
          this is crucial since the framework might assume that the task has
          timed-out and kill that task. Another way to avoid this is to
          set the configuration parameter mapred.task.timeout to a
          high-enough value (or even set it to zero for no time-outs).
         
Applications can also update Counters using the
          Reporter.
/*map reduce以及中间结果都可以用该类来写文件*/
OutputCollector
          OutputCollector is a generalization of the facility provided by
          the Map/Reduce framework to collect data output by the
          Mapper or the Reducer (either the
          intermediate outputs or the output of the job).
Hadoop Map/Reduce comes bundled with a
        
        library of generally useful mappers, reducers, and partitioners.
/*里面讲到了Debugging的一些方法*/
Job Configuration
        JobConf represents a Map/Reduce job configuration.
JobConf is the primary interface for a user to describe
        a Map/Reduce job to the Hadoop framework for execution. The framework
        tries to faithfully execute the job as described by JobConf,
        however:
  • f
                Some configuration parameters may have been marked as
                
                final by administrators and hence cannot be altered.
             

  •             While some job parameters are straight-forward to set (e.g.
                
                setNumReduceTasks(int)), other parameters interact subtly with
                the rest of the framework and/or job configuration and are
                more complex to set (e.g.
                
                setNumMapTasks(int)).
             

JobConf is typically used to specify the
        Mapper, combiner (if any), Partitioner,
        Reducer, InputFormat,
        OutputFormat and OutputCommitter
        implementations. JobConf also
        indicates the set of input files
        (
setInputPaths(JobConf, Path...)
        /
addInputPath(JobConf, Path)
)
        and (
setInputPaths(JobConf, String)
        /
addInputPaths(JobConf, String)
)
        and where the output files should be written
        (
setOutputPath(Path)
).
Optionally, JobConf is used to specify other advanced
        facets of the job such as the Comparator to be used, files
        to be put in the DistributedCache, whether intermediate
        and/or job outputs are to be compressed (and how), debugging via
        user-provided scripts
        (
setMapDebugScript(String)
/
setReduceDebugScript(String)
)
        , whether job tasks can be executed in a speculative manner
        (
setMapSpeculativeExecution(boolean)
)/(
setReduceSpeculativeExecution(boolean)
)
        , maximum number of attempts per task
        (
setMaxMapAttempts(int)
/
setMaxReduceAttempts(int)
)
        , percentage of tasks failure which can be tolerated by the job
        (
setMaxMapTaskFailuresPercent(int)
/
setMaxReduceTaskFailuresPercent(int)
)
        etc.
Of course, users can use
        
set(String, String)
/
get(String, String)
        to set/get arbitrary parameters needed by applications. However, use the
        DistributedCache for large amounts of (read-only) data.
Task Execution & Environment
The TaskTracker executes the Mapper/
        Reducer  task as a child process in a separate jvm.
        
The child-task inherits the environment of the parent
        TaskTracker. The user can specify additional options to the
        child-jvm via the mapred.child.java.opts configuration
        parameter in the JobConf such as non-standard paths for the
        run-time linker to search shared libraries via
        -Djava.library.path= etc. If the
        mapred.child.java.opts contains the symbol @taskid@
        it is interpolated with value of taskid of the map/reduce
        task.
Here is an example with multiple arguments and substitutions,
        showing jvm GC logging, and start of a passwordless JVM JMX agent so that
        it can connect with jconsole and the likes to watch child memory,
        threads and get thread dumps. It also sets the maximum heap-size of the
        child jvm to 512MB and adds an additional path to the
        java.library.path of the child-jvm.
         
            mapred.child.java.opts
            
              
                    -Xmx512M -Djava.library.path=/home/mycompany/lib
                    -verbose:gc -Xloggc:/tmp/@taskid@.gc
              
                    -Dcom.sun.management.jmxremote.authenticate=false
                    -Dcom.sun.management.jmxremote.ssl=false
            
         
        
Memory management
Users/admins can also specify the maximum virtual memory
        of the launched child-task, and any sub-process it launches
        recursively, using mapred.child.ulimit. Note that
        the value set here is a per process limit.
        The value for mapred.child.ulimit should be specified
        in kilo bytes (KB). And also the value must be greater than
        or equal to the -Xmx passed to JavaVM, else the VM might not start.
        
Note: mapred.child.java.opts are used only for
        configuring the launched child tasks from task tracker. Configuring
        the memory options for daemons is documented in
        
        cluster_setup.html
The memory available to some parts of the framework is also
        configurable. In map and reduce tasks, performance may be influenced
        by adjusting parameters influencing the concurrency of operations and
        the frequency with which data will hit disk. Monitoring the filesystem
        counters for a job- particularly relative to byte counts from the map
        and into the reduce- is invaluable to the tuning of these
        parameters.
Users can choose to override default limits of Virtual Memory and RAM
          enforced by the task tracker, if memory management is enabled.
          Users can set the following parameter per job:
         
NameTypeDescription
         
mapred.task.maxvmemint
            A number, in bytes, that represents the maximum Virtual Memory
            task-limit for each task of the job. A task will be killed if
            it consumes more Virtual Memory than this number.
         
         
mapred.task.maxpmemint
            A number, in bytes, that represents the maximum RAM task-limit
            for each task of the job. This number can be optionally used by
            Schedulers to prevent over-scheduling of tasks on a node based
            on RAM needs.  
         
        
/*map的数据序列化,以及缓冲和存储*/
Map Parameters
A record emitted from a map will be serialized into a buffer and
          metadata will be stored into accounting buffers. As described in the
          following options, when either the serialization buffer or the
          metadata exceed a threshold, the contents of the buffers will be
          sorted and written to disk in the background while the map continues
          to output records. If either buffer fills completely while the spill
          is in progress, the map thread will block. When the map is finished,
          any remaining records are written to disk and all on-disk segments
          are merged into a single file. Minimizing the number of spills to
          disk can decrease map time, but a larger buffer also decreases the
          memory available to the mapper.
            
NameTypeDescription
            
io.sort.mbint
                The cumulative size of the serialization and accounting
                buffers storing records emitted from the map, in megabytes.
               
            
io.sort.record.percentfloat
                The ratio of serialization to accounting space can be
                adjusted. Each serialized record requires 16 bytes of
                accounting information in addition to its serialized size to
                effect the sort. This percentage of space allocated from
                io.sort.mb affects the probability of a spill to
                disk being caused by either exhaustion of the serialization
                buffer or the accounting space. Clearly, for a map outputting
                small records, a higher value than the default will likely
                decrease the number of spills to disk.
            
io.sort.spill.percentfloat
                This is the threshold for the accounting and serialization
                buffers. When this percentage of either buffer has filled,
                their contents will be spilled to disk in the background. Let
                io.sort.record.percent be r,
                io.sort.mb be x, and this value be
                q. The maximum number of records collected before the
                collection thread will spill is r * x * q * 2^16.
                Note that a higher value may decrease the number of- or even
                eliminate- merges, but will also increase the probability of
                the map task getting blocked. The lowest average map times are
                usually obtained by accurately estimating the size of the map
                output and preventing multiple spills.
         
Other notes
  • If either spill threshold is exceeded while a spill is in
                progress, collection will continue until the spill is finished.
                For example, if io.sort.buffer.spill.percent is set
                to 0.33, and the remainder of the buffer is filled while the spill
                runs, the next spill will include all the collected records, or
                0.66 of the buffer, and will not generate additional spills. In
                other words, the thresholds are defining triggers, not
                blocking.
  • A record larger than the serialization buffer will first
                trigger a spill, then be spilled to a separate file. It is
                undefined whether or not this record will first pass through the
                combiner.

Shuffle/Reduce Parameters
As described previously, each reduce fetches the output assigned
          to it by the Partitioner via HTTP into memory and periodically
          merges these outputs to disk. If intermediate compression of map
          outputs is turned on, each output is decompressed into memory. The
          following options affect the frequency of these merges to disk prior
          to the reduce and the memory allocated to map output during the
          reduce.
            
NameTypeDescription
            
io.sort.factorint
                Specifies the number of segments on disk to be merged at
                the same time. It limits the number of open files and
                compression codecs during the merge. If the number of files
                exceeds this limit, the merge will proceed in several passes.
                Though this limit also applies to the map, most jobs should be
                configured so that hitting this limit is unlikely
                there.
            
mapred.inmem.merge.thresholdint
                The number of sorted map outputs fetched into memory
                before being merged to disk. Like the spill thresholds in the
                preceding note, this is not defining a unit of partition, but
                a trigger. In practice, this is usually set very high (1000)
                or disabled (0), since merging in-memory segments is often
                less expensive than merging from disk (see notes following
                this table). This threshold influences only the frequency of
                in-memory merges during the shuffle.
            
mapred.job.shuffle.merge.percentfloat
                The memory threshold for fetched map outputs before an
                in-memory merge is started, expressed as a percentage of
                memory allocated to storing map outputs in memory. Since map
                outputs that can't fit in memory can be stalled, setting this
                high may decrease parallelism between the fetch and merge.
                Conversely, values as high as 1.0 have been effective for
                reduces whose input can fit entirely in memory. This parameter
                influences only the frequency of in-memory merges during the
                shuffle.
            
mapred.job.shuffle.input.buffer.percentfloat
                The percentage of memory- relative to the maximum heapsize
                as typically specified in mapred.child.java.opts-
                that can be allocated to storing map outputs during the
                shuffle. Though some memory should be set aside for the
                framework, in general it is advantageous to set this high
                enough to store large and numerous map outputs.
            
mapred.job.reduce.input.buffer.percentfloat
                The percentage of memory relative to the maximum heapsize
                in which map outputs may be retained during the reduce. When
                the reduce begins, map outputs will be merged to disk until
                those that remain are under the resource limit this defines.
                By default, all map outputs are merged to disk before the
                reduce begins to maximize the memory available to the reduce.
                For less memory-intensive reduces, this should be increased to
                avoid trips to disk.
         
Other notes
  • If a map output is larger than 25 percent of the memory
                allocated to copying map outputs, it will be written directly to
                disk without first staging through memory.
  • When running with a combiner, the reasoning about high merge
                thresholds and large buffers may not hold. For merges started
                before all map outputs have been fetched, the combiner is run
                while spilling to disk. In some cases, one can obtain better
                reduce times by spending resources combining map outputs- making
                disk spills small and parallelizing spilling and fetching- rather
                than aggressively increasing buffer sizes.
  • When merging in-memory map outputs to disk to begin the
                reduce, if an intermediate merge is necessary because there are
                segments to spill and at least io.sort.factor
                segments already on disk, the in-memory map outputs will be part
                of the intermediate merge.

Directory Structure
The task tracker has local directory,
         ${mapred.local.dir}/taskTracker/ to create localized
        cache and localized job. It can define multiple local directories
        (spanning multiple disks) and then each filename is assigned to a
        semi-random local directory. When the job starts, task tracker
        creates a localized job directory relative to the local directory
        specified in the configuration. Thus the task tracker directory
        structure looks the following:

  • ${mapred.local.dir}/taskTracker/archive/ :
            The distributed cache. This directory holds the localized distributed
            cache. Thus localized distributed cache is shared among all
            the tasks and jobs

  • ${mapred.local.dir}/taskTracker/jobcache/$jobid/ :
            The localized job directory
            

    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/
              : The job-specific shared directory. The tasks can use this space as
              scratch space and share files among them. This directory is exposed
              to the users through the configuration property  
              job.local.dir. The directory can accessed through
              api
              JobConf.getJobLocalDir(). It is available as System property also.
              So, users (streaming etc.) can call
              System.getProperty("job.local.dir") to access the
              directory.

    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/
              : The jars directory, which has the job jar file and expanded jar.
              The job.jar is the application's jar file that is
              automatically distributed to each machine. It is expanded in jars
              directory before the tasks for the job start. The job.jar location
              is accessible to the application through the api
               
              JobConf.getJar() . To access the unjarred directory,
              JobConf.getJar().getParent() can be called.

    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml
              : The job.xml file, the generic job configuration, localized for
              the job.

    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid
              : The task directory for each task attempt. Each task directory
              again has the following structure :
              

      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml
                : A job.xml file, task localized job configuration, Task localization
                means that properties have been set that are specific to
                this particular task within the job. The properties localized for
                each task are described below.

      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output
                : A directory for intermediate output files. This contains the
                temporary map reduce data generated by the framework
                such as map output files etc.

      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work
                : The curernt working directory of the task.
                With
        jvm reuse
        enabled for tasks, this
                directory will be the directory on which the jvm has started

      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp
                : The temporary directory for the task.
                (User can specify the property mapred.child.tmp to set
                the value of temporary directory for map and reduce tasks. This
                defaults to ./tmp. If the value is not an absolute path,
                it is prepended with task's working directory. Otherwise, it is
                directly assigned. The directory will be created if it doesn't exist.
                Then, the child java tasks are executed with option
                -Djava.io.tmpdir='the absolute path of the tmp dir'.
                Anp pipes and streaming are set with environment variable,
                TMPDIR='the absolute path of the tmp dir'). This
                directory is created, if mapred.child.tmp has the value
                ./tmp

              

            

Task JVM Reuse
Jobs can enable task JVMs to be reused by specifying the job
        configuration mapred.job.reuse.jvm.num.tasks. If the
        value is 1 (the default), then JVMs are not reused
        (i.e. 1 task per JVM). If it is -1, there is no limit to the number
        of tasks a JVM can run (of the same job). One can also specify some
        value greater than 1 using the api
        
        JobConf.setNumTasksToExecutePerJvm(int)
The following properties are localized in the job configuration
         for each task's execution:
         
NameTypeDescription
         
mapred.job.idStringThe job id
         
mapred.jarString
              job.jar location in job directory
         
job.local.dir String
               The job specific shared scratch space
         
mapred.tip.id String
               The task id
         
mapred.task.id String
               The task attempt id
         
mapred.task.is.map boolean
              Is this a map task
         
mapred.task.partition int
              The id of the task within the job
         
map.input.file String
               The filename that the map is reading from
         
map.input.start long
               The offset of the start of the map input split
         
map.input.length long
              The number of bytes in the map input split
         
mapred.work.output.dir String
              The task's temporary output directory
        
The standard output (stdout) and error (stderr) streams of the task
        are read by the TaskTracker and logged to
        ${HADOOP_LOG_DIR}/userlogs
The
DistributedCache
can also be used
        to distribute both jars and native libraries for use in the map
        and/or reduce tasks. The child-jvm always has its
        current working directory added to the
        java.library.path and LD_LIBRARY_PATH.
        And hence the cached libraries can be loaded via
        
        System.loadLibrary or
        
        System.load. More details on how to load shared libraries through
        distributed cache are documented at
        
        native_libraries.html
Job Submission and Monitoring
        JobClient is the primary interface by which user-job interacts
        with the JobTracker.
JobClient provides facilities to submit jobs, track their
        progress, access component-tasks' reports and logs, get the Map/Reduce
        cluster's status information and so on.
The job submission process involves:
  • Checking the input and output specifications of the job.
  • Computing the InputSplit values for the job.

  •             Setting up the requisite accounting information for the
                DistributedCache of the job, if necessary.
             

  •             Copying the job's jar and configuration to the Map/Reduce system
                directory on the FileSystem.
             

  •             Submitting the job to the JobTracker and optionally
                monitoring it's status.
             
    Job history files are also logged to user specified directory
            hadoop.job.history.user.location
            which defaults to job output directory. The files are stored in
            "_logs/history/" in the specified directory. Hence, by default they
            will be in mapred.output.dir/_logs/history. User can stop
            logging by giving the value none for
            hadoop.job.history.user.location
    User can view the history logs summary in specified directory
            using the following command
    /*查看日志信息*/
            
    $ bin/hadoop job -history output-dir

            This command will print job details, failed and killed tip
            details.
            More details about the job such as successful tasks and
            task attempts made for each task can be viewed using the  
            following command
          
    $ bin/hadoop job -history all output-dir
    User can use
            
    OutputLogFilter
            to filter log files from the output directory listing.
    Normally the user creates the application, describes various facets
            of the job via JobConf, and then uses the
            JobClient to submit the job and monitor its progress.
    Job Control
    Users may need to chain Map/Reduce jobs to accomplish complex
              tasks which cannot be done via a single Map/Reduce job. This is fairly
              easy since the output of the job typically goes to distributed
              file-system, and the output, in turn, can be used as the input for the
              next job.
    However, this also means that the onus on ensuring jobs are
              complete (success/failure) lies squarely on the clients. In such
              cases, the various job-control options are:

    •               
                    runJob(JobConf) : Submits the job and returns only after the
                    job has completed.
                  

    •               
                    submitJob(JobConf) : Only submits the job, then poll the
                    returned handle to the
                    
                    RunningJob to query status and make scheduling decisions.
                  

    •               
                    JobConf.setJobEndNotificationURI(String) : Sets up a
                    notification upon job-completion, thus avoiding polling.
                  

    Job Input
            InputFormat describes the input-specification for a Map/Reduce job.
            
    The Map/Reduce framework relies on the InputFormat of
            the job to:
  • Validate the input-specification of the job.

  •             Split-up the input file(s) into logical InputSplit
                instances, each of which is then assigned to an individual
                Mapper.
             

  •             Provide the RecordReader implementation used to
                glean input records from the logical InputSplit for
                processing by the Mapper.
             
    The default behavior of file-based InputFormat
            implementations, typically sub-classes of
            
            FileInputFormat, is to split the input into logical
            InputSplit instances based on the total size, in bytes, of
            the input files. However, the FileSystem blocksize of the
            input files is treated as an upper bound for input splits. A lower bound
            on the split size can be set via mapred.min.split.size.
    Clearly, logical splits based on input-size is insufficient for many
            applications since record boundaries must be respected. In such cases,
            the application should implement a RecordReader, who is
            responsible for respecting record-boundaries and presents a
            record-oriented view of the logical InputSplit to the
            individual task.
            TextInputFormat is the default InputFormat.
    If TextInputFormat is the InputFormat for a
            given job, the framework detects input-files with the .gz
            extensions and automatically decompresses them using the
            appropriate CompressionCodec. However, it must be noted that
            compressed files with the above extensions cannot be split and
            each compressed file is processed in its entirety by a single mapper.
    InputSplit
              InputSplit represents the data to be processed by an individual
              Mapper.
    Typically InputSplit presents a byte-oriented view of
              the input, and it is the responsibility of RecordReader
              to process and present a record-oriented view.
              FileSplit is the default InputSplit. It sets
              map.input.file to the path of the input file for the
              logical split.
    RecordReader
              RecordReader reads  pairs from an
              InputSplit.
    Typically the RecordReader converts the byte-oriented
              view of the input, provided by the InputSplit, and
              presents a record-oriented to the Mapper implementations
              for processing. RecordReader thus assumes the
              responsibility of processing record boundaries and presents the tasks
              with keys and values.
    Job Output
            OutputFormat describes the output-specification for a Map/Reduce
            job.
    The Map/Reduce framework relies on the OutputFormat of
            the job to:

  •             Validate the output-specification of the job; for example, check that
                the output directory doesn't already exist.
             

  •             Provide the RecordWriter implementation used to
                write the output files of the job. Output files are stored in a
                FileSystem.
             
    TextOutputFormat is the default
            OutputFormat.
    OutputCommitter
            OutputCommitter describes the commit of task output for a
            Map/Reduce job.
    The Map/Reduce framework relies on the OutputCommitter
            of the job to:

  •             Setup the job during initialization. For example, create
                the temporary output directory for the job during the
                initialization of the job.
                Job setup is done by a separate task when the job is
                in PREP state and after initializing tasks. Once the setup task
                completes, the job will be moved to RUNNING state.
             

  •             Cleanup the job after the job completion. For example, remove the
                temporary output directory after the job completion.
                Job cleanup is done by a separate task at the end of the job.
                Job is declared SUCCEDED/FAILED/KILLED after the cleanup
                task completes.
             

  •             Setup the task temporary output.
                Task setup is done as part of the same task, during task initialization.
             

  •             Check whether a task needs a commit. This is to avoid the commit
                procedure if a task does not need commit.
             

  •             Commit of the task output.
                Once task is done, the task will commit it's output if required.  
             

  •             Discard the task commit.
                If the task has been failed/killed, the output will be cleaned-up.
                If task could not cleanup (in exception block), a separate task
                will be launched with same attempt-id to do the cleanup.
             
    FileOutputCommitter is the default
            OutputCommitter. Job setup/cleanup tasks occupy
            map or reduce slots, whichever is free on the TaskTracker. And
            JobCleanup task, TaskCleanup tasks and JobSetup task have the highest
            priority, and in that order.
    Task Side-Effect Files
    In some applications, component tasks need to create and/or write to
              side-files, which differ from the actual job-output files.
    In such cases there could be issues with two instances of the same
              Mapper or Reducer running simultaneously (for
              example, speculative tasks) trying to open and/or write to the same
              file (path) on the FileSystem. Hence the
              application-writer will have to pick unique names per task-attempt
              (using the attemptid, say attempt_200709221812_0001_m_000000_0),
              not just per task.
    To avoid these issues the Map/Reduce framework, when the
              OutputCommitter is FileOutputCommitter,
              maintains a special
              ${mapred.output.dir}/_temporary/_${taskid} sub-directory
              accessible via ${mapred.work.output.dir}
              for each task-attempt on the FileSystem where the output
              of the task-attempt is stored. On successful completion of the
              task-attempt, the files in the
              ${mapred.output.dir}/_temporary/_${taskid} (only)
              are promoted to ${mapred.output.dir}. Of course,
              the framework discards the sub-directory of unsuccessful task-attempts.
              This process is completely transparent to the application.
    The application-writer can take advantage of this feature by
              creating any side-files required in ${mapred.work.output.dir}
              during execution of a task via
             
              FileOutputFormat.getWorkOutputPath(), and the framework will promote them
              similarly for succesful task-attempts, thus eliminating the need to
              pick unique paths per task-attempt.
    Note: The value of ${mapred.work.output.dir} during
              execution of a particular task-attempt is actually
              ${mapred.output.dir}/_temporary/_{$taskid}, and this value is
              set by the Map/Reduce framework. So, just create any side-files in the
              path  returned by
             
              FileOutputFormat.getWorkOutputPath() from map/reduce
              task to take advantage of this feature.
    The entire discussion holds true for maps of jobs with
               reducer=NONE (i.e. 0 reduces) since output of the map, in that case,
               goes directly to HDFS.
    RecordWriter
              RecordWriter writes the output  
              pairs to an output file.
    RecordWriter implementations write the job outputs to the
              FileSystem.
    Other Useful Features
    Submitting Jobs to Queues
    Users submit jobs to Queues. Queues, as collection of jobs,
              allow the system to provide specific functionality. For example,
              queues use ACLs to control which users
              who can submit jobs to them. Queues are expected to be primarily
              used by Hadoop Schedulers.
    Hadoop comes configured with a single mandatory queue, called
              'default'. Queue names are defined in the
              mapred.queue.names property of the Hadoop site
              configuration. Some job schedulers, such as the
             
    Capacity Scheduler
    ,
              support multiple queues.
    A job defines the queue it needs to be submitted to through the
              mapred.job.queue.name property, or through the
             
    setQueueName(String)
              API. Setting the queue name is optional. If a job is submitted
              without an associated queue name, it is submitted to the 'default'
              queue.
    Counters
    Counters represent global counters, defined either by
              the Map/Reduce framework or applications. Each Counter can
              be of any Enum type. Counters of a particular
              Enum are bunched into groups of type
              Counters.Group.
    Applications can define arbitrary Counters (of type
              Enum) and update them via
             
              Reporter.incrCounter(Enum, long) or
             
              Reporter.incrCounter(String, String, long)
              in the map and/or
              reduce methods. These counters are then globally
              aggregated by the framework.
    DistributedCache
              DistributedCache distributes application-specific, large, read-only
              files efficiently.
    DistributedCache is a facility provided by the
              Map/Reduce framework to cache files (text, archives, jars and so on)
              needed by applications.
    Applications specify the files to be cached via urls (hdfs://)
              in the JobConf. The DistributedCache
              assumes that the files specified via hdfs:// urls are already present
              on the FileSystem.
    The framework will copy the necessary files to the slave node
              before any tasks for the job are executed on that node. Its
              efficiency stems from the fact that the files are only copied once
              per job and the ability to cache archives which are un-archived on
              the slaves.
    DistributedCache tracks the modification timestamps of
              the cached files. Clearly the cache files should not be modified by
              the application or externally while the job is executing.
    DistributedCache can be used to distribute simple,
              read-only data/text files and more complex types such as archives and
              jars. Archives (zip, tar, tgz and tar.gz files) are
              un-archived at the slave nodes. Files
              have execution permissions set.
    The files/archives can be distributed by setting the property
              mapred.cache.{files|archives}. If more than one
              file/archive has to be distributed, they can be added as comma
              separated paths. The properties can also be set by APIs
             
              DistributedCache.addCacheFile(URI,conf)/
             
              DistributedCache.addCacheArchive(URI,conf) and
             
              DistributedCache.setCacheFiles(URIs,conf)/
             
              DistributedCache.setCacheArchives(URIs,conf)
              where URI is of the form
              hdfs://host:port/absolute-path#link-name.
              In Streaming, the files can be distributed through command line
              option -cacheFile/-cacheArchive.
    Optionally users can also direct the DistributedCache
              to symlink the cached file(s) into the current working
              directory of the task via the
             
              DistributedCache.createSymlink(Configuration) api. Or by setting
              the configuration property mapred.create.symlink
              as yes. The DistributedCache will use the
              fragment of the URI as the name of the symlink.
              For example, the URI
              hdfs://namenode:port/lib.so.1#lib.so
              will have the symlink name as lib.so in task's cwd
              for the file lib.so.1 in distributed cache.
    The DistributedCache can also be used as a
              rudimentary software distribution mechanism for use in the
              map and/or reduce tasks. It can be used to distribute both
              jars and native libraries. The
             
              DistributedCache.addArchiveToClassPath(Path, Configuration) or
             
              DistributedCache.addFileToClassPath(Path, Configuration) api
              can be used to cache files/jars and also add them to the
              classpath of child-jvm. The same can be done by setting
              the configuration properties
              mapred.job.classpath.{files|archives}. Similarly the
              cached files that are symlinked into the working directory of the
              task can be used to distribute native libraries and load them.
    Tool
    The
    Tool

              interface supports the handling of generic Hadoop command-line options.
             
    Tool is the standard for any Map/Reduce tool or
              application. The application should delegate the handling of
              standard command-line options to
             
              GenericOptionsParser via         
             
              ToolRunner.run(Tool, String[]) and only handle its custom
              arguments.
                The generic Hadoop command-line options are:
                
                  -conf
                
                
                
                  -D
                
                
                
                  -fs
                
                
                
                  -jt
                
             
    IsolationRunner
              IsolationRunner is a utility to help debug Map/Reduce programs.
    To use the IsolationRunner, first set
              keep.failed.tasks.files to true
              (also see keep.tasks.files.pattern).
                Next, go to the node on which the failed task ran and go to the
                TaskTracker's local directory and run the
                IsolationRunner:
                
    $ cd /taskTracker/${taskid}/work
                
                  $ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml
                
             
    IsolationRunner will run the failed task in a single
              jvm, which can be in the debugger, over precisely the same input.
    Profiling
    Profiling is a utility to get a representative (2 or 3) sample
              of built-in java profiler for a sample of maps and reduces.
    User can specify whether the system should collect profiler
              information for some of the tasks in the job by setting the
              configuration property mapred.task.profile. The
              value can be set using the api
             
              JobConf.setProfileEnabled(boolean). If the value is set
              true, the task profiling is enabled. The profiler
              information is stored in the user log directory. By default,
              profiling is not enabled for the job.  
    Once user configures that profiling is needed, she/he can use
              the configuration property
              mapred.task.profile.{maps|reduces} to set the ranges
              of map/reduce tasks to profile. The value can be set using the api
             
              JobConf.setProfileTaskRange(boolean,String).
              By default, the specified range is 0-2.
    User can also specify the profiler configuration arguments by
              setting the configuration property
              mapred.task.profile.params. The value can be specified
              using the api
             
              JobConf.setProfileParams(String). If the string contains a
              %s, it will be replaced with the name of the profiling
              output file when the task runs. These parameters are passed to the
              task child JVM on the command line. The default value for
              the profiling parameters is
              -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s
             
    Debugging
    The Map/Reduce framework provides a facility to run user-provided
              scripts for debugging. When a map/reduce task fails, a user can run
              a debug script, to process task logs for example. The script is
              given access to the task's stdout and stderr outputs, syslog and
              jobconf. The output from the debug script's stdout and stderr is
              displayed on the console diagnostics and also as part of the
              job UI.
    In the following sections we discuss how to submit a debug script
              with a job. The script file needs to be distributed and submitted to
              the framework.
    How to distribute the script file:
              The user needs to use  
             
    DistributedCache
              to distribute and symlink the script file.
    How to submit the script:
    A quick way to submit the debug script is to set values for the
              properties mapred.map.task.debug.script and
              mapred.reduce.task.debug.script, for debugging map and
              reduce tasks respectively. These properties can also be set by using APIs
             
              JobConf.setMapDebugScript(String)  and
             
              JobConf.setReduceDebugScript(String) . In streaming mode, a debug
              script can be submitted with the command-line options
              -mapdebug and -reducedebug, for debugging
              map and reduce tasks respectively.
    The arguments to the script are the task's stdout, stderr,
              syslog and jobconf files. The debug command, run on the node where
              the map/reduce task failed, is:
             
    $script $stdout $stderr $syslog $jobconf  
    Pipes programs have the c++ program name as a fifth argument
              for the command. Thus for the pipes programs the command is

             
    $script $stdout $stderr $syslog $jobconf $program   
             
    Default Behavior:
    For pipes, a default script is run to process core dumps under
              gdb, prints stack trace and gives info about running threads.
    JobControl
              JobControl is a utility which encapsulates a set of Map/Reduce jobs
              and their dependencies.
    Data Compression
    Hadoop Map/Reduce provides facilities for the application-writer to
              specify compression for both intermediate map-outputs and the
              job-outputs i.e. output of the reduces. It also comes bundled with
             
              CompressionCodec implementation for the
             
    zlib
    compression
              algorithm. The
    gzip
    file format is also
              supported.
    Hadoop also provides native implementations of the above compression
              codecs for reasons of both performance (zlib) and non-availability of
              Java libraries. More details on their usage and availability are
              available
    here
    .
    Intermediate Outputs
    Applications can control compression of intermediate map-outputs
                via the
                
                JobConf.setCompressMapOutput(boolean) api and the
                CompressionCodec to be used via the
                
                JobConf.setMapOutputCompressorClass(Class) api.
    Job Outputs
    Applications can control compression of job-outputs via the
                
                FileOutputFormat.setCompressOutput(JobConf, boolean) api and the
                CompressionCodec to be used can be specified via the
                
                FileOutputFormat.setOutputCompressorClass(JobConf, Class) api.
    If the job outputs are to be stored in the
                
                SequenceFileOutputFormat, the required
                SequenceFile.CompressionType (i.e. RECORD /
                BLOCK - defaults to RECORD) can be
                specified via the
                
                SequenceFileOutputFormat.setOutputCompressionType(JobConf,
                SequenceFile.CompressionType) api.
    Skipping Bad Records
    Hadoop provides an option where a certain set of bad input
              records can be skipped when processing map inputs. Applications
              can control this feature through the  
             
              SkipBadRecords class.
    This feature can be used when map tasks crash deterministically
              on certain input. This usually happens due to bugs in the
              map function. Usually, the user would have to fix these bugs.
              This is, however, not possible sometimes. The bug may be in third
              party libraries, for example, for which the source code is not
              available. In such cases, the task never completes successfully even
              after multiple attempts, and the job fails. With this feature, only
              a small portion of data surrounding the
              bad records is lost, which may be acceptable for some applications
              (those performing statistical analysis on very large data, for
              example).
    By default this feature is disabled. For enabling it,
              refer to
              SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) and
             
              SkipBadRecords.setReducerMaxSkipGroups(Configuration, long).
             
    With this feature enabled, the framework gets into 'skipping
              mode' after a certain number of map failures. For more details,
              see
              SkipBadRecords.setAttemptsToStartSkipping(Configuration, int).
              In 'skipping mode', map tasks maintain the range of records being
              processed. To do this, the framework relies on the processed record
              counter. See
              SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS and
             
              SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS.
              This counter enables the framework to know how many records have
              been processed successfully, and hence, what record range caused
              a task to crash. On further attempts, this range of records is
              skipped.
    The number of records skipped depends on how frequently the
              processed record counter is incremented by the application.
              It is recommended that this counter be incremented after every
              record is processed. This may not be possible in some applications
              that typically batch their processing. In such cases, the framework
              may skip additional records surrounding the bad record. Users can
              control the number of skipped records through
             
              SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) and
             
              SkipBadRecords.setReducerMaxSkipGroups(Configuration, long).
              The framework tries to narrow the range of skipped records using a
              binary search-like approach. The skipped range is divided into two
              halves and only one half gets executed. On subsequent
              failures, the framework figures out which half contains
              bad records. A task will be re-executed till the
              acceptable skipped value is met or all task attempts are exhausted.
              To increase the number of task attempts, use
             
              JobConf.setMaxMapAttempts(int) and
             
              JobConf.setMaxReduceAttempts(int).
             
    Skipped records are written to HDFS in the sequence file
              format, for later analysis. The location can be changed through
             
              SkipBadRecords.setSkipOutputPath(JobConf, Path).
             
                   
                   
                   
                   
                   
                   
                   

    本文来自ChinaUnix博客,如果查看原文请点:http://blog.chinaunix.net/u3/99156/showart_2157843.html
  • 您需要登录后才可以回帖 登录 | 注册

    本版积分规则 发表回复

      

    北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
    未成年举报专区
    中国互联网协会会员  联系我们:huangweiwei@itpub.net
    感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

    清除 Cookies - ChinaUnix - Archiver - WAP - TOP