jiangwen127 发表于 2010-01-22 15:37

hadoop map/reduce文章阅读笔记


                                文章来自
这里
Map/Reduce - User Interfaces
This section provides a reasonable amount of detail on every user-facing
      aspect of the Map/Reduce framwork. This should help users implement,
      configure and tune their jobs in a fine-grained manner. However, please
      note that the javadoc for each class/interface remains the most
      comprehensive documentation available; this is only meant to be a tutorial.
      
Let us first take the Mapper and Reducer
      interfaces. Applications typically implement them to provide the
      map and reduce methods.
We will then discuss other core interfaces including
      JobConf, JobClient, Partitioner,
      OutputCollector, Reporter,
      InputFormat, OutputFormat,
      OutputCommitter and others.
Finally, we will wrap up by discussing some useful features of the
      framework such as the DistributedCache,
      IsolationRunner etc.
Payload
Applications typically implement the Mapper and
      Reducer interfaces to provide the map and
      reduce methods. These form the core of the job.
Mapper
          Mapper maps input key/value pairs to a set of intermediate
          key/value pairs.
/*map产生的中间对和原始的对不一定是相同的*/
Maps are the individual tasks that transform input records into
          intermediate records. The transformed intermediate records do not need
          to be of the same type as the input records. A given input pair may
          map to zero or many output pairs.
/*InputSplit将输入文件切分,将切分后的数据传给map,至于怎样在这些数据中产生对,是由自定义的map类来实现的,*/
The Hadoop Map/Reduce framework spawns one map task for each
          InputSplit generated by the InputFormat for
          the job.
Overall, Mapper implementations are passed the
          JobConf for the job via the
         
          JobConfigurable.configure(JobConf) method and override it to
          initialize themselves. The framework then calls
         
          map(WritableComparable, Writable, OutputCollector, Reporter) for
          each key/value pair in the InputSplit for that task.      
          Applications can then override the
         
          Closeable.close() method to perform any required cleanup.
/*怎样输出pair对*/
Output pairs do not need to be of the same types as input pairs. A
          given input pair may map to zero or many output pairs.Output pairs
          are collected with calls to
         
          OutputCollector.collect(WritableComparable,Writable).
Applications can use the Reporter to report
          progress, set application-level status messages and update
          Counters, or just indicate that they are alive.
/*OutputKeyComparatorClass的作用,在Reduce之前进行Group操作。这里应该还有一个Region数据按Key的合并操作*/
All intermediate values associated with a given output key are
          subsequently grouped by the framework, and passed to the
          Reducer(s) todetermine the final output. Users can
          control the grouping by specifying a Comparator via
         
          JobConf.setOutputKeyComparatorClass(Class).
/*map的输出是排序的,每个节点上的Region数目与Reducer的数目相等,每个Reducer处理相同Region的数据(也就是包含相同Key的数据)。*/
The Mapper outputs are sorted and then
          partitioned per Reducer. The total number of partitions is
          the same as the number of reduce tasks for the job. Users can control
          which keys (and hence records) go to which Reducer by
          implementing a custom Partitioner.
/*Combiner的作用,执行Map之后在本地进行aggregation,就是从多个对生成这样的Key list,这样可以减少在Map和Reducer之间传输的数据量*/
Users can optionally specify a combiner, via
         
          JobConf.setCombinerClass(Class), to perform local aggregation of
          the intermediate outputs, which helps to cut down the amount of data
          transferred from the Mapper to the Reducer.
/*中间数据的存储形式,以及压缩方法*/
The intermediate, sorted outputs are always stored in a simple
          (key-len, key, value-len, value) format.
          Applications can control if, and how, the
          intermediate outputs are to be compressed and the
         
          CompressionCodec to be used via the JobConf.
/*设置Maper Num的原则*/
How Many Maps?
The number of maps is usually driven by the total size of the
            inputs, that is, the total number of blocks of the input files.
The right level of parallelism for maps seems to be around 10-100
            maps per-node, although it has been set up to 300 maps for very
            cpu-light map tasks. Task setup takes awhile, so it is best if the
            maps take at least a minute to execute.
Thus, if you expect 10TB of input data and have a blocksize of
            128MB, you'll end up with 82,000 maps, unless
            
            setNumMapTasks(int) (which only provides a hint to the framework)
            is used to set it even higher.
Reducer
          Reducer reduces a set of intermediate values which share a key to
          a smaller set of values.
The number of reduces for the job is set by the user
          via
          JobConf.setNumReduceTasks(int).
Overall, Reducer implementations are passed the
          JobConf for the job via the
         
          JobConfigurable.configure(JobConf) method and can override it to
          initialize themselves. The framework then calls   
         
          reduce(WritableComparable, Iterator, OutputCollector, Reporter)
          method for each
          pair in the grouped inputs. Applications can then override the         
         
          Closeable.close() method to perform any required cleanup.
/*Reduce的三个子操作,混洗,排序,规约*/
Reducer has 3 primary phases: shuffle, sort and reduce.
/*混洗,Map的排序输出作为Reduce的输入,所有Mapper中的相应Region会被送到相应的Reducer去执行,通过http协议*/
Shuffle
Input to the Reducer is the sorted output of the
            mappers. In this phase the framework fetches the relevant partition
            of the output of all the mappers, via HTTP.
/*对于不用Mapper传来的数据,Reducer还要进行一次分组,保证相同的Key被Group到一起,假如Partitioner执行Hash,那么不同的Key在Hash之后可能被放到一个Region里面,然后被传送到同一个Reducer,所以在这里还要进行一个Group*/
Sort
The framework groups Reducer inputs by keys (since
            different mappers may have output the same key) in this stage.
/*shuffle和sort是同时执行的*/
The shuffle and sort phases occur simultaneously; while
            map-outputs are being fetched they are merged.
/*有点晕了,这里!!!指的是单独用于Group的操作?*/
Secondary Sort
If equivalence rules for grouping the intermediate keys are
            required to be different from those for grouping keys before
            reduction, then one may specify a Comparator via
            
            JobConf.setOutputValueGroupingComparator(Class). Since
            
            JobConf.setOutputKeyComparatorClass(Class) can be used to
            control how intermediate keys are grouped, these can be used in
            conjunction to simulate secondary sort on values.
Reduce
In this phase the
            
            reduce(WritableComparable, Iterator, OutputCollector, Reporter)
            method is called for each
            pair in the grouped inputs.
The output of the reduce task is typically written to the
            
            FileSystem via
            
            OutputCollector.collect(WritableComparable, Writable).
Applications can use the Reporter to report
            progress, set application-level status messages and update
            Counters, or just indicate that they are alive.
The output of the Reducer is not sorted.
How Many Reduces?
The right number of reduces seems to be 0.95 or
            1.75 multiplied by (no. of nodes> *
            mapred.tasktracker.reduce.tasks.maximum).
With 0.95 all of the reduces can launch immediately
            and start transfering map outputs as the maps finish. With
            1.75 the faster nodes will finish their first round of
            reduces and launch a second wave of reduces doing a much better job
            of load balancing.
Increasing the number of reduces increases the framework overhead,
            but increases load balancing and lowers the cost of failures.
The scaling factors above are slightly less than whole numbers to
            reserve a few reduce slots in the framework for speculative-tasks and
            failed tasks.
Reducer NONE
It is legal to set the number of reduce-tasks to zero if
            no reduction is desired.
In this case the outputs of the map-tasks go directly to the
            FileSystem, into the output path set by
            
            setOutputPath(Path). The framework does not sort the
            map-outputs before writing them out to the FileSystem.
            
/*partition的输入是map-outputs*/
Partitioner
          Partitioner partitions the key space.
Partitioner controls the partitioning of the keys of the
          intermediate map-outputs. The key (or a subset of the key) is used to
          derive the partition, typically by a hash function. The total
          number of partitions is the same as the number of reduce tasks for the
          job. Hence this controls which of the m reduce tasks the
          intermediate key (and hence the record) is sent to for reduction.
          HashPartitioner is the default Partitioner.
Reporter
          Reporter is a facility for Map/Reduce applications to report
          progress, set application-level status messages and update
          Counters.
Mapper and Reducer implementations can use
          the Reporter to report progress or just indicate
          that they are alive. In scenarios where the application takes a
          significant amount of time to process individual key/value pairs,
          this is crucial since the framework might assume that the task has
          timed-out and kill that task. Another way to avoid this is to
          set the configuration parameter mapred.task.timeout to a
          high-enough value (or even set it to zero for no time-outs).
         
Applications can also update Counters using the
          Reporter.
/*map reduce以及中间结果都可以用该类来写文件*/
OutputCollector
          OutputCollector is a generalization of the facility provided by
          the Map/Reduce framework to collect data output by the
          Mapper or the Reducer (either the
          intermediate outputs or the output of the job).
Hadoop Map/Reduce comes bundled with a
      
      library of generally useful mappers, reducers, and partitioners.
/*里面讲到了Debugging的一些方法*/
Job Configuration
      JobConf represents a Map/Reduce job configuration.
JobConf is the primary interface for a user to describe
      a Map/Reduce job to the Hadoop framework for execution. The framework
      tries to faithfully execute the job as described by JobConf,
      however:
[*]f
            Some configuration parameters may have been marked as
            
            final by administrators and hence cannot be altered.
          [*]
            While some job parameters are straight-forward to set (e.g.
            
            setNumReduceTasks(int)), other parameters interact subtly with
            the rest of the framework and/or job configuration and are
            more complex to set (e.g.
            
            setNumMapTasks(int)).
         
JobConf is typically used to specify the
      Mapper, combiner (if any), Partitioner,
      Reducer, InputFormat,
      OutputFormat and OutputCommitter
      implementations. JobConf also
      indicates the set of input files
      (
setInputPaths(JobConf, Path...)
      /
addInputPath(JobConf, Path)
)
      and (
setInputPaths(JobConf, String)
      /
addInputPaths(JobConf, String)
)
      and where the output files should be written
      (
setOutputPath(Path)
).
Optionally, JobConf is used to specify other advanced
      facets of the job such as the Comparator to be used, files
      to be put in the DistributedCache, whether intermediate
      and/or job outputs are to be compressed (and how), debugging via
      user-provided scripts
      (
setMapDebugScript(String)
/
setReduceDebugScript(String)
)
      , whether job tasks can be executed in a speculative manner
      (
setMapSpeculativeExecution(boolean)
)/(
setReduceSpeculativeExecution(boolean)
)
      , maximum number of attempts per task
      (
setMaxMapAttempts(int)
/
setMaxReduceAttempts(int)
)
      , percentage of tasks failure which can be tolerated by the job
      (
setMaxMapTaskFailuresPercent(int)
/
setMaxReduceTaskFailuresPercent(int)
)
      etc.
Of course, users can use
      
set(String, String)
/
get(String, String)
      to set/get arbitrary parameters needed by applications. However, use the
      DistributedCache for large amounts of (read-only) data.
Task Execution & Environment
The TaskTracker executes the Mapper/
      Reducertask as a child process in a separate jvm.
      
The child-task inherits the environment of the parent
      TaskTracker. The user can specify additional options to the
      child-jvm via the mapred.child.java.opts configuration
      parameter in the JobConf such as non-standard paths for the
      run-time linker to search shared libraries via
      -Djava.library.path= etc. If the
      mapred.child.java.opts contains the symbol @taskid@
      it is interpolated with value of taskid of the map/reduce
      task.
Here is an example with multiple arguments and substitutions,
      showing jvm GC logging, and start of a passwordless JVM JMX agent so that
      it can connect with jconsole and the likes to watch child memory,
      threads and get thread dumps. It also sets the maximum heap-size of the
      child jvm to 512MB and adds an additional path to the
      java.library.path of the child-jvm.
         
            mapred.child.java.opts
            
            
                  -Xmx512M -Djava.library.path=/home/mycompany/lib
                  -verbose:gc -Xloggc:/tmp/@taskid@.gc
            
                  -Dcom.sun.management.jmxremote.authenticate=false
                  -Dcom.sun.management.jmxremote.ssl=false
            
         
      
Memory management
Users/admins can also specify the maximum virtual memory
      of the launched child-task, and any sub-process it launches
      recursively, using mapred.child.ulimit. Note that
      the value set here is a per process limit.
      The value for mapred.child.ulimit should be specified
      in kilo bytes (KB). And also the value must be greater than
      or equal to the -Xmx passed to JavaVM, else the VM might not start.
      
Note: mapred.child.java.opts are used only for
      configuring the launched child tasks from task tracker. Configuring
      the memory options for daemons is documented in
      
      cluster_setup.html
The memory available to some parts of the framework is also
      configurable. In map and reduce tasks, performance may be influenced
      by adjusting parameters influencing the concurrency of operations and
      the frequency with which data will hit disk. Monitoring the filesystem
      counters for a job- particularly relative to byte counts from the map
      and into the reduce- is invaluable to the tuning of these
      parameters.
Users can choose to override default limits of Virtual Memory and RAM
          enforced by the task tracker, if memory management is enabled.
          Users can set the following parameter per job:
         
NameTypeDescription
         
mapred.task.maxvmemint
            A number, in bytes, that represents the maximum Virtual Memory
            task-limit for each task of the job. A task will be killed if
            it consumes more Virtual Memory than this number.
         
         
mapred.task.maxpmemint
            A number, in bytes, that represents the maximum RAM task-limit
            for each task of the job. This number can be optionally used by
            Schedulers to prevent over-scheduling of tasks on a node based
            on RAM needs.
         
      
/*map的数据序列化,以及缓冲和存储*/
Map Parameters
A record emitted from a map will be serialized into a buffer and
          metadata will be stored into accounting buffers. As described in the
          following options, when either the serialization buffer or the
          metadata exceed a threshold, the contents of the buffers will be
          sorted and written to disk in the background while the map continues
          to output records. If either buffer fills completely while the spill
          is in progress, the map thread will block. When the map is finished,
          any remaining records are written to disk and all on-disk segments
          are merged into a single file. Minimizing the number of spills to
          disk can decrease map time, but a larger buffer also decreases the
          memory available to the mapper.
            
NameTypeDescription
            
io.sort.mbint
                The cumulative size of the serialization and accounting
                buffers storing records emitted from the map, in megabytes.
               
            
io.sort.record.percentfloat
                The ratio of serialization to accounting space can be
                adjusted. Each serialized record requires 16 bytes of
                accounting information in addition to its serialized size to
                effect the sort. This percentage of space allocated from
                io.sort.mb affects the probability of a spill to
                disk being caused by either exhaustion of the serialization
                buffer or the accounting space. Clearly, for a map outputting
                small records, a higher value than the default will likely
                decrease the number of spills to disk.
            
io.sort.spill.percentfloat
                This is the threshold for the accounting and serialization
                buffers. When this percentage of either buffer has filled,
                their contents will be spilled to disk in the background. Let
                io.sort.record.percent be r,
                io.sort.mb be x, and this value be
                q. The maximum number of records collected before the
                collection thread will spill is r * x * q * 2^16.
                Note that a higher value may decrease the number of- or even
                eliminate- merges, but will also increase the probability of
                the map task getting blocked. The lowest average map times are
                usually obtained by accurately estimating the size of the map
                output and preventing multiple spills.
         
Other notes
[*]If either spill threshold is exceeded while a spill is in
            progress, collection will continue until the spill is finished.
            For example, if io.sort.buffer.spill.percent is set
            to 0.33, and the remainder of the buffer is filled while the spill
            runs, the next spill will include all the collected records, or
            0.66 of the buffer, and will not generate additional spills. In
            other words, the thresholds are defining triggers, not
            blocking.[*]A record larger than the serialization buffer will first
            trigger a spill, then be spilled to a separate file. It is
            undefined whether or not this record will first pass through the
            combiner.
Shuffle/Reduce Parameters
As described previously, each reduce fetches the output assigned
          to it by the Partitioner via HTTP into memory and periodically
          merges these outputs to disk. If intermediate compression of map
          outputs is turned on, each output is decompressed into memory. The
          following options affect the frequency of these merges to disk prior
          to the reduce and the memory allocated to map output during the
          reduce.
            
NameTypeDescription
            
io.sort.factorint
                Specifies the number of segments on disk to be merged at
                the same time. It limits the number of open files and
                compression codecs during the merge. If the number of files
                exceeds this limit, the merge will proceed in several passes.
                Though this limit also applies to the map, most jobs should be
                configured so that hitting this limit is unlikely
                there.
            
mapred.inmem.merge.thresholdint
                The number of sorted map outputs fetched into memory
                before being merged to disk. Like the spill thresholds in the
                preceding note, this is not defining a unit of partition, but
                a trigger. In practice, this is usually set very high (1000)
                or disabled (0), since merging in-memory segments is often
                less expensive than merging from disk (see notes following
                this table). This threshold influences only the frequency of
                in-memory merges during the shuffle.
            
mapred.job.shuffle.merge.percentfloat
                The memory threshold for fetched map outputs before an
                in-memory merge is started, expressed as a percentage of
                memory allocated to storing map outputs in memory. Since map
                outputs that can't fit in memory can be stalled, setting this
                high may decrease parallelism between the fetch and merge.
                Conversely, values as high as 1.0 have been effective for
                reduces whose input can fit entirely in memory. This parameter
                influences only the frequency of in-memory merges during the
                shuffle.
            
mapred.job.shuffle.input.buffer.percentfloat
                The percentage of memory- relative to the maximum heapsize
                as typically specified in mapred.child.java.opts-
                that can be allocated to storing map outputs during the
                shuffle. Though some memory should be set aside for the
                framework, in general it is advantageous to set this high
                enough to store large and numerous map outputs.
            
mapred.job.reduce.input.buffer.percentfloat
                The percentage of memory relative to the maximum heapsize
                in which map outputs may be retained during the reduce. When
                the reduce begins, map outputs will be merged to disk until
                those that remain are under the resource limit this defines.
                By default, all map outputs are merged to disk before the
                reduce begins to maximize the memory available to the reduce.
                For less memory-intensive reduces, this should be increased to
                avoid trips to disk.
         
Other notes
[*]If a map output is larger than 25 percent of the memory
            allocated to copying map outputs, it will be written directly to
            disk without first staging through memory.[*]When running with a combiner, the reasoning about high merge
            thresholds and large buffers may not hold. For merges started
            before all map outputs have been fetched, the combiner is run
            while spilling to disk. In some cases, one can obtain better
            reduce times by spending resources combining map outputs- making
            disk spills small and parallelizing spilling and fetching- rather
            than aggressively increasing buffer sizes.[*]When merging in-memory map outputs to disk to begin the
            reduce, if an intermediate merge is necessary because there are
            segments to spill and at least io.sort.factor
            segments already on disk, the in-memory map outputs will be part
            of the intermediate merge.
Directory Structure
The task tracker has local directory,
         ${mapred.local.dir}/taskTracker/ to create localized
      cache and localized job. It can define multiple local directories
      (spanning multiple disks) and then each filename is assigned to a
      semi-random local directory. When the job starts, task tracker
      creates a localized job directory relative to the local directory
      specified in the configuration. Thus the task tracker directory
      structure looks the following:
[*]
${mapred.local.dir}/taskTracker/archive/ :
      The distributed cache. This directory holds the localized distributed
      cache. Thus localized distributed cache is shared among all
      the tasks and jobs [*]
${mapred.local.dir}/taskTracker/jobcache/$jobid/ :
      The localized job directory
      [*]
${mapred.local.dir}/taskTracker/jobcache/$jobid/work/
      : The job-specific shared directory. The tasks can use this space as
      scratch space and share files among them. This directory is exposed
      to the users through the configuration property
      job.local.dir. The directory can accessed through
      api
      JobConf.getJobLocalDir(). It is available as System property also.
      So, users (streaming etc.) can call
      System.getProperty("job.local.dir") to access the
      directory.[*]
${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/
      : The jars directory, which has the job jar file and expanded jar.
      The job.jar is the application's jar file that is
      automatically distributed to each machine. It is expanded in jars
      directory before the tasks for the job start. The job.jar location
      is accessible to the application through the api
         
      JobConf.getJar() . To access the unjarred directory,
      JobConf.getJar().getParent() can be called.[*]
${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml
      : The job.xml file, the generic job configuration, localized for
      the job. [*]
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid
      : The task directory for each task attempt. Each task directory
      again has the following structure :
      [*]
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml
      : A job.xml file, task localized job configuration, Task localization
      means that properties have been set that are specific to
      this particular task within the job. The properties localized for
      each task are described below.[*]
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output
      : A directory for intermediate output files. This contains the
      temporary map reduce data generated by the framework
      such as map output files etc. [*]
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work
      : The curernt working directory of the task.
      With
jvm reuse
enabled for tasks, this
      directory will be the directory on which the jvm has started[*]
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp
      : The temporary directory for the task.
      (User can specify the property mapred.child.tmp to set
      the value of temporary directory for map and reduce tasks. This
      defaults to ./tmp. If the value is not an absolute path,
      it is prepended with task's working directory. Otherwise, it is
      directly assigned. The directory will be created if it doesn't exist.
      Then, the child java tasks are executed with option
      -Djava.io.tmpdir='the absolute path of the tmp dir'.
      Anp pipes and streaming are set with environment variable,
      TMPDIR='the absolute path of the tmp dir'). This
      directory is created, if mapred.child.tmp has the value
      ./tmp

      

      

Task JVM Reuse
Jobs can enable task JVMs to be reused by specifying the job
      configuration mapred.job.reuse.jvm.num.tasks. If the
      value is 1 (the default), then JVMs are not reused
      (i.e. 1 task per JVM). If it is -1, there is no limit to the number
      of tasks a JVM can run (of the same job). One can also specify some
      value greater than 1 using the api
      
      JobConf.setNumTasksToExecutePerJvm(int)
The following properties are localized in the job configuration
         for each task's execution:
         
NameTypeDescription
         
mapred.job.idStringThe job id
         
mapred.jarString
            job.jar location in job directory
         
job.local.dir String
               The job specific shared scratch space
         
mapred.tip.id String
               The task id
         
mapred.task.id String
               The task attempt id
         
mapred.task.is.map boolean
            Is this a map task
         
mapred.task.partition int
            The id of the task within the job
         
map.input.file String
               The filename that the map is reading from
         
map.input.start long
               The offset of the start of the map input split
         
map.input.length long
            The number of bytes in the map input split
         
mapred.work.output.dir String
            The task's temporary output directory
      
The standard output (stdout) and error (stderr) streams of the task
      are read by the TaskTracker and logged to
      ${HADOOP_LOG_DIR}/userlogs
The
DistributedCache
can also be used
      to distribute both jars and native libraries for use in the map
      and/or reduce tasks. The child-jvm always has its
      current working directory added to the
      java.library.path and LD_LIBRARY_PATH.
      And hence the cached libraries can be loaded via
      
      System.loadLibrary or
      
      System.load. More details on how to load shared libraries through
      distributed cache are documented at
      
      native_libraries.html
Job Submission and Monitoring
      JobClient is the primary interface by which user-job interacts
      with the JobTracker.
JobClient provides facilities to submit jobs, track their
      progress, access component-tasks' reports and logs, get the Map/Reduce
      cluster's status information and so on.
The job submission process involves:
[*]Checking the input and output specifications of the job.[*]Computing the InputSplit values for the job.[*]
            Setting up the requisite accounting information for the
            DistributedCache of the job, if necessary.
          [*]
            Copying the job's jar and configuration to the Map/Reduce system
            directory on the FileSystem.
          [*]
            Submitting the job to the JobTracker and optionally
            monitoring it's status.
         
Job history files are also logged to user specified directory
      hadoop.job.history.user.location
      which defaults to job output directory. The files are stored in
      "_logs/history/" in the specified directory. Hence, by default they
      will be in mapred.output.dir/_logs/history. User can stop
      logging by giving the value none for
      hadoop.job.history.user.location
User can view the history logs summary in specified directory
      using the following command
/*查看日志信息*/
      
$ bin/hadoop job -history output-dir

      This command will print job details, failed and killed tip
      details.
      More details about the job such as successful tasks and
      task attempts made for each task can be viewed using the
      following command
      
$ bin/hadoop job -history all output-dir
User can use
      
OutputLogFilter
      to filter log files from the output directory listing.
Normally the user creates the application, describes various facets
      of the job via JobConf, and then uses the
      JobClient to submit the job and monitor its progress.
Job Control
Users may need to chain Map/Reduce jobs to accomplish complex
          tasks which cannot be done via a single Map/Reduce job. This is fairly
          easy since the output of the job typically goes to distributed
          file-system, and the output, in turn, can be used as the input for the
          next job.
However, this also means that the onus on ensuring jobs are
          complete (success/failure) lies squarely on the clients. In such
          cases, the various job-control options are:
[*]
            
            runJob(JobConf) : Submits the job and returns only after the
            job has completed.
            [*]
            
            submitJob(JobConf) : Only submits the job, then poll the
            returned handle to the
            
            RunningJob to query status and make scheduling decisions.
            [*]
            
            JobConf.setJobEndNotificationURI(String) : Sets up a
            notification upon job-completion, thus avoiding polling.
            
Job Input
      InputFormat describes the input-specification for a Map/Reduce job.
      
The Map/Reduce framework relies on the InputFormat of
      the job to:
[*]Validate the input-specification of the job.[*]
            Split-up the input file(s) into logical InputSplit
            instances, each of which is then assigned to an individual
            Mapper.
          [*]
            Provide the RecordReader implementation used to
            glean input records from the logical InputSplit for
            processing by the Mapper.
         
The default behavior of file-based InputFormat
      implementations, typically sub-classes of
      
      FileInputFormat, is to split the input into logical
      InputSplit instances based on the total size, in bytes, of
      the input files. However, the FileSystem blocksize of the
      input files is treated as an upper bound for input splits. A lower bound
      on the split size can be set via mapred.min.split.size.
Clearly, logical splits based on input-size is insufficient for many
      applications since record boundaries must be respected. In such cases,
      the application should implement a RecordReader, who is
      responsible for respecting record-boundaries and presents a
      record-oriented view of the logical InputSplit to the
      individual task.
      TextInputFormat is the default InputFormat.
If TextInputFormat is the InputFormat for a
      given job, the framework detects input-files with the .gz
      extensions and automatically decompresses them using the
      appropriate CompressionCodec. However, it must be noted that
      compressed files with the above extensions cannot be split and
      each compressed file is processed in its entirety by a single mapper.
InputSplit
          InputSplit represents the data to be processed by an individual
          Mapper.
Typically InputSplit presents a byte-oriented view of
          the input, and it is the responsibility of RecordReader
          to process and present a record-oriented view.
          FileSplit is the default InputSplit. It sets
          map.input.file to the path of the input file for the
          logical split.
RecordReader
          RecordReader readspairs from an
          InputSplit.
Typically the RecordReader converts the byte-oriented
          view of the input, provided by the InputSplit, and
          presents a record-oriented to the Mapper implementations
          for processing. RecordReader thus assumes the
          responsibility of processing record boundaries and presents the tasks
          with keys and values.
Job Output
      OutputFormat describes the output-specification for a Map/Reduce
      job.
The Map/Reduce framework relies on the OutputFormat of
      the job to:
[*]
            Validate the output-specification of the job; for example, check that
            the output directory doesn't already exist.
          [*]
            Provide the RecordWriter implementation used to
            write the output files of the job. Output files are stored in a
            FileSystem.
         
TextOutputFormat is the default
      OutputFormat.
OutputCommitter
      OutputCommitter describes the commit of task output for a
      Map/Reduce job.
The Map/Reduce framework relies on the OutputCommitter
      of the job to:
[*]
            Setup the job during initialization. For example, create
            the temporary output directory for the job during the
            initialization of the job.
            Job setup is done by a separate task when the job is
            in PREP state and after initializing tasks. Once the setup task
            completes, the job will be moved to RUNNING state.
          [*]
            Cleanup the job after the job completion. For example, remove the
            temporary output directory after the job completion.
            Job cleanup is done by a separate task at the end of the job.
            Job is declared SUCCEDED/FAILED/KILLED after the cleanup
            task completes.
          [*]
            Setup the task temporary output.
            Task setup is done as part of the same task, during task initialization.
          [*]
            Check whether a task needs a commit. This is to avoid the commit
            procedure if a task does not need commit.
          [*]
            Commit of the task output.
            Once task is done, the task will commit it's output if required.
          [*]
            Discard the task commit.
            If the task has been failed/killed, the output will be cleaned-up.
            If task could not cleanup (in exception block), a separate task
            will be launched with same attempt-id to do the cleanup.
         
FileOutputCommitter is the default
      OutputCommitter. Job setup/cleanup tasks occupy
      map or reduce slots, whichever is free on the TaskTracker. And
      JobCleanup task, TaskCleanup tasks and JobSetup task have the highest
      priority, and in that order.
Task Side-Effect Files
In some applications, component tasks need to create and/or write to
          side-files, which differ from the actual job-output files.
In such cases there could be issues with two instances of the same
          Mapper or Reducer running simultaneously (for
          example, speculative tasks) trying to open and/or write to the same
          file (path) on the FileSystem. Hence the
          application-writer will have to pick unique names per task-attempt
          (using the attemptid, say attempt_200709221812_0001_m_000000_0),
          not just per task.
To avoid these issues the Map/Reduce framework, when the
          OutputCommitter is FileOutputCommitter,
          maintains a special
          ${mapred.output.dir}/_temporary/_${taskid} sub-directory
          accessible via ${mapred.work.output.dir}
          for each task-attempt on the FileSystem where the output
          of the task-attempt is stored. On successful completion of the
          task-attempt, the files in the
          ${mapred.output.dir}/_temporary/_${taskid} (only)
          are promoted to ${mapred.output.dir}. Of course,
          the framework discards the sub-directory of unsuccessful task-attempts.
          This process is completely transparent to the application.
The application-writer can take advantage of this feature by
          creating any side-files required in ${mapred.work.output.dir}
          during execution of a task via
         
          FileOutputFormat.getWorkOutputPath(), and the framework will promote them
          similarly for succesful task-attempts, thus eliminating the need to
          pick unique paths per task-attempt.
Note: The value of ${mapred.work.output.dir} during
          execution of a particular task-attempt is actually
          ${mapred.output.dir}/_temporary/_{$taskid}, and this value is
          set by the Map/Reduce framework. So, just create any side-files in the
          pathreturned by
         
          FileOutputFormat.getWorkOutputPath() from map/reduce
          task to take advantage of this feature.
The entire discussion holds true for maps of jobs with
         reducer=NONE (i.e. 0 reduces) since output of the map, in that case,
         goes directly to HDFS.
RecordWriter
          RecordWriter writes the output
          pairs to an output file.
RecordWriter implementations write the job outputs to the
          FileSystem.
Other Useful Features
Submitting Jobs to Queues
Users submit jobs to Queues. Queues, as collection of jobs,
          allow the system to provide specific functionality. For example,
          queues use ACLs to control which users
          who can submit jobs to them. Queues are expected to be primarily
          used by Hadoop Schedulers.
Hadoop comes configured with a single mandatory queue, called
          'default'. Queue names are defined in the
          mapred.queue.names property of the Hadoop site
          configuration. Some job schedulers, such as the
         
Capacity Scheduler
,
          support multiple queues.
A job defines the queue it needs to be submitted to through the
          mapred.job.queue.name property, or through the
         
setQueueName(String)
          API. Setting the queue name is optional. If a job is submitted
          without an associated queue name, it is submitted to the 'default'
          queue.
Counters
Counters represent global counters, defined either by
          the Map/Reduce framework or applications. Each Counter can
          be of any Enum type. Counters of a particular
          Enum are bunched into groups of type
          Counters.Group.
Applications can define arbitrary Counters (of type
          Enum) and update them via
         
          Reporter.incrCounter(Enum, long) or
         
          Reporter.incrCounter(String, String, long)
          in the map and/or
          reduce methods. These counters are then globally
          aggregated by the framework.
DistributedCache
          DistributedCache distributes application-specific, large, read-only
          files efficiently.
DistributedCache is a facility provided by the
          Map/Reduce framework to cache files (text, archives, jars and so on)
          needed by applications.
Applications specify the files to be cached via urls (hdfs://)
          in the JobConf. The DistributedCache
          assumes that the files specified via hdfs:// urls are already present
          on the FileSystem.
The framework will copy the necessary files to the slave node
          before any tasks for the job are executed on that node. Its
          efficiency stems from the fact that the files are only copied once
          per job and the ability to cache archives which are un-archived on
          the slaves.
DistributedCache tracks the modification timestamps of
          the cached files. Clearly the cache files should not be modified by
          the application or externally while the job is executing.
DistributedCache can be used to distribute simple,
          read-only data/text files and more complex types such as archives and
          jars. Archives (zip, tar, tgz and tar.gz files) are
          un-archived at the slave nodes. Files
          have execution permissions set.
The files/archives can be distributed by setting the property
          mapred.cache.{files|archives}. If more than one
          file/archive has to be distributed, they can be added as comma
          separated paths. The properties can also be set by APIs
         
          DistributedCache.addCacheFile(URI,conf)/
         
          DistributedCache.addCacheArchive(URI,conf) and
         
          DistributedCache.setCacheFiles(URIs,conf)/
         
          DistributedCache.setCacheArchives(URIs,conf)
          where URI is of the form
          hdfs://host:port/absolute-path#link-name.
          In Streaming, the files can be distributed through command line
          option -cacheFile/-cacheArchive.
Optionally users can also direct the DistributedCache
          to symlink the cached file(s) into the current working
          directory of the task via the
         
          DistributedCache.createSymlink(Configuration) api. Or by setting
          the configuration property mapred.create.symlink
          as yes. The DistributedCache will use the
          fragment of the URI as the name of the symlink.
          For example, the URI
          hdfs://namenode:port/lib.so.1#lib.so
          will have the symlink name as lib.so in task's cwd
          for the file lib.so.1 in distributed cache.
The DistributedCache can also be used as a
          rudimentary software distribution mechanism for use in the
          map and/or reduce tasks. It can be used to distribute both
          jars and native libraries. The
         
          DistributedCache.addArchiveToClassPath(Path, Configuration) or
         
          DistributedCache.addFileToClassPath(Path, Configuration) api
          can be used to cache files/jars and also add them to the
          classpath of child-jvm. The same can be done by setting
          the configuration properties
          mapred.job.classpath.{files|archives}. Similarly the
          cached files that are symlinked into the working directory of the
          task can be used to distribute native libraries and load them.
Tool
The
Tool

          interface supports the handling of generic Hadoop command-line options.
         
Tool is the standard for any Map/Reduce tool or
          application. The application should delegate the handling of
          standard command-line options to
         
          GenericOptionsParser via         
         
          ToolRunner.run(Tool, String[]) and only handle its custom
          arguments.
            The generic Hadoop command-line options are:
            
            -conf
            
            
            
            -D
            
            
            
            -fs
            
            
            
            -jt
            
         
IsolationRunner
          IsolationRunner is a utility to help debug Map/Reduce programs.
To use the IsolationRunner, first set
          keep.failed.tasks.files to true
          (also see keep.tasks.files.pattern).
            Next, go to the node on which the failed task ran and go to the
            TaskTracker's local directory and run the
            IsolationRunner:
            
$ cd /taskTracker/${taskid}/work
            
            $ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml
            
         
IsolationRunner will run the failed task in a single
          jvm, which can be in the debugger, over precisely the same input.
Profiling
Profiling is a utility to get a representative (2 or 3) sample
          of built-in java profiler for a sample of maps and reduces.
User can specify whether the system should collect profiler
          information for some of the tasks in the job by setting the
          configuration property mapred.task.profile. The
          value can be set using the api
         
          JobConf.setProfileEnabled(boolean). If the value is set
          true, the task profiling is enabled. The profiler
          information is stored in the user log directory. By default,
          profiling is not enabled for the job.
Once user configures that profiling is needed, she/he can use
          the configuration property
          mapred.task.profile.{maps|reduces} to set the ranges
          of map/reduce tasks to profile. The value can be set using the api
         
          JobConf.setProfileTaskRange(boolean,String).
          By default, the specified range is 0-2.
User can also specify the profiler configuration arguments by
          setting the configuration property
          mapred.task.profile.params. The value can be specified
          using the api
         
          JobConf.setProfileParams(String). If the string contains a
          %s, it will be replaced with the name of the profiling
          output file when the task runs. These parameters are passed to the
          task child JVM on the command line. The default value for
          the profiling parameters is
          -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s
         
Debugging
The Map/Reduce framework provides a facility to run user-provided
          scripts for debugging. When a map/reduce task fails, a user can run
          a debug script, to process task logs for example. The script is
          given access to the task's stdout and stderr outputs, syslog and
          jobconf. The output from the debug script's stdout and stderr is
          displayed on the console diagnostics and also as part of the
          job UI.
In the following sections we discuss how to submit a debug script
          with a job. The script file needs to be distributed and submitted to
          the framework.
How to distribute the script file:
          The user needs to use
         
DistributedCache
          to distribute and symlink the script file.
How to submit the script:
A quick way to submit the debug script is to set values for the
          properties mapred.map.task.debug.script and
          mapred.reduce.task.debug.script, for debugging map and
          reduce tasks respectively. These properties can also be set by using APIs
         
          JobConf.setMapDebugScript(String)and
         
          JobConf.setReduceDebugScript(String) . In streaming mode, a debug
          script can be submitted with the command-line options
          -mapdebug and -reducedebug, for debugging
          map and reduce tasks respectively.
The arguments to the script are the task's stdout, stderr,
          syslog and jobconf files. The debug command, run on the node where
          the map/reduce task failed, is:
         
$script $stdout $stderr $syslog $jobconf
Pipes programs have the c++ program name as a fifth argument
          for the command. Thus for the pipes programs the command is

         
$script $stdout $stderr $syslog $jobconf $program   
         
Default Behavior:
For pipes, a default script is run to process core dumps under
          gdb, prints stack trace and gives info about running threads.
JobControl
          JobControl is a utility which encapsulates a set of Map/Reduce jobs
          and their dependencies.
Data Compression
Hadoop Map/Reduce provides facilities for the application-writer to
          specify compression for both intermediate map-outputs and the
          job-outputs i.e. output of the reduces. It also comes bundled with
         
          CompressionCodec implementation for the
         
zlib
compression
          algorithm. The
gzip
file format is also
          supported.
Hadoop also provides native implementations of the above compression
          codecs for reasons of both performance (zlib) and non-availability of
          Java libraries. More details on their usage and availability are
          available
here
.
Intermediate Outputs
Applications can control compression of intermediate map-outputs
            via the
            
            JobConf.setCompressMapOutput(boolean) api and the
            CompressionCodec to be used via the
            
            JobConf.setMapOutputCompressorClass(Class) api.
Job Outputs
Applications can control compression of job-outputs via the
            
            FileOutputFormat.setCompressOutput(JobConf, boolean) api and the
            CompressionCodec to be used can be specified via the
            
            FileOutputFormat.setOutputCompressorClass(JobConf, Class) api.
If the job outputs are to be stored in the
            
            SequenceFileOutputFormat, the required
            SequenceFile.CompressionType (i.e. RECORD /
            BLOCK - defaults to RECORD) can be
            specified via the
            
            SequenceFileOutputFormat.setOutputCompressionType(JobConf,
            SequenceFile.CompressionType) api.
Skipping Bad Records
Hadoop provides an option where a certain set of bad input
          records can be skipped when processing map inputs. Applications
          can control this feature through the
         
          SkipBadRecords class.
This feature can be used when map tasks crash deterministically
          on certain input. This usually happens due to bugs in the
          map function. Usually, the user would have to fix these bugs.
          This is, however, not possible sometimes. The bug may be in third
          party libraries, for example, for which the source code is not
          available. In such cases, the task never completes successfully even
          after multiple attempts, and the job fails. With this feature, only
          a small portion of data surrounding the
          bad records is lost, which may be acceptable for some applications
          (those performing statistical analysis on very large data, for
          example).
By default this feature is disabled. For enabling it,
          refer to
          SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) and
         
          SkipBadRecords.setReducerMaxSkipGroups(Configuration, long).
         
With this feature enabled, the framework gets into 'skipping
          mode' after a certain number of map failures. For more details,
          see
          SkipBadRecords.setAttemptsToStartSkipping(Configuration, int).
          In 'skipping mode', map tasks maintain the range of records being
          processed. To do this, the framework relies on the processed record
          counter. See
          SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS and
         
          SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS.
          This counter enables the framework to know how many records have
          been processed successfully, and hence, what record range caused
          a task to crash. On further attempts, this range of records is
          skipped.
The number of records skipped depends on how frequently the
          processed record counter is incremented by the application.
          It is recommended that this counter be incremented after every
          record is processed. This may not be possible in some applications
          that typically batch their processing. In such cases, the framework
          may skip additional records surrounding the bad record. Users can
          control the number of skipped records through
         
          SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) and
         
          SkipBadRecords.setReducerMaxSkipGroups(Configuration, long).
          The framework tries to narrow the range of skipped records using a
          binary search-like approach. The skipped range is divided into two
          halves and only one half gets executed. On subsequent
          failures, the framework figures out which half contains
          bad records. A task will be re-executed till the
          acceptable skipped value is met or all task attempts are exhausted.
          To increase the number of task attempts, use
         
          JobConf.setMaxMapAttempts(int) and
         
          JobConf.setMaxReduceAttempts(int).
         
Skipped records are written to HDFS in the sequence file
          format, for later analysis. The location can be changed through
         
          SkipBadRecords.setSkipOutputPath(JobConf, Path).
         
               
               
               
               
               
               
               

本文来自ChinaUnix博客,如果查看原文请点:http://blog.chinaunix.net/u3/99156/showart_2157843.html
页: [1]
查看完整版本: hadoop map/reduce文章阅读笔记