免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 2840 | 回复: 0

[Hadoop&HBase] Hadoop中的RPC机制(1) [复制链接]

论坛徽章:
0
发表于 2010-01-04 21:38 |显示全部楼层

这一段时间在忙自己的MapReduce追踪系统。今天总算把MapReduce的任务调度策略的实时追踪系统写完并且调试通过了。说实话,确实比较幸苦。感谢这段时间女朋友的陪伴,虽然比较多的时间只是趴在我桌子旁边睡觉。

有时间可以考虑把系统截图发上来。让更多hadoop爱好者一起讨论。

为了完成对Hadoop源码的剖析,今天我继续写一点东西。

Hadoop中的RPC机制。

RPC已经被很多库实现了,感觉在Sun的NFS中的RPC机制就有,还有apache组织的xmlrpc,还有java的rmi,很多都是实现这个RPC,即远程过程调用。引入RPC这一层是软件设计的伟大创举,使得分布式程序跟单节点程序一样易于编写。

一个例子就是:
对象1 funcName(对象2,对象3,对象4...)
在Hadoop中,这些对象都必须能在网络上传输。不是每一个对象都满足这个条件。

所以今天先讲Hadoop中的对象传输机制。

在Hadoop中,能被传输的对象都实现了Writable接口。
我们先看下这个接口。



/**
* Copyright 2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
/** A simple, efficient, serialization protocol, based on {@link DataInput} and
* {@link DataOutput}.
*
* Implementations typically implement a static read(DataInput)
* method which constructs a new instance, calls {@link
* #readFields(DataInput)}, and returns the instance.
*
* @author Doug Cutting
*/
/**
* write/read for network transfer class
*/
public interface Writable {
    /** Writes the fields of this object to out. */
    void write(DataOutput out) throws IOException;
    /**
     * Reads the fields of this object from in. For efficiency,
     * implementations should attempt to re-use storage in the existing object
     * where possible.
     */
    void readFields(DataInput in) throws IOException;
}
这个接口非常简单,两个方法。
如果一个类实现了这个接口,也就是说这个类有这两个方法的实现。那么它就可以在网络上进行传输。

我们拿一个类来做例子。

/**
* Copyright 2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.dfs;
import org.apache.hadoop.io.*;
import java.io.*;
import java.util.*;
/**************************************************
* A Block is a Hadoop FS primitive, identified by a long.
*
* @author Mike Cafarella
**************************************************/
public class Block implements Writable, Comparable {
    static { // register a ctor
        WritableFactories.setFactory(Block.class, new WritableFactory() {
            public Writable newInstance() {
                return new Block();
            }
        });
    }
    static Random r = new Random();
    /**
     */
    public static boolean isBlockFilename(File f) {
        if (f.getName().startsWith("blk_")) {
            return true;
        } else {
            return false;
        }
    }
    long blkid;
    long len;
    /**
     */
    public Block() {
        this.blkid = r.nextLong();
        this.len = 0;
    }
    /**
     */
    public Block(long blkid, long len) {
        this.blkid = blkid;
        this.len = len;
    }
    /**
     * Find the blockid from the given filename
     */
    public Block(File f, long len) {
        String name = f.getName();
        name = name.substring("blk_".length());
        this.blkid = Long.parseLong(name);
        this.len = len;
    }
    /**
     */
    public long getBlockId() {
        return blkid;
    }
    /**
     */
    public String getBlockName() {
        return "blk_" + String.valueOf(blkid);
    }
    /**
     */
    public long getNumBytes() {
        return len;
    }
    public void setNumBytes(long len) {
        this.len = len;
    }
    /**
     */
    public String toString() {
        return getBlockName();
    }
    // ///////////////////////////////////
    // Writable
    // ///////////////////////////////////
    public void write(DataOutput out) throws IOException {
        out.writeLong(blkid);
        out.writeLong(len);
    }
    public void readFields(DataInput in) throws IOException {
        this.blkid = in.readLong();
        this.len = in.readLong();
    }
    // ///////////////////////////////////
    // Comparable
    // ///////////////////////////////////
    public int compareTo(Object o) {
        Block b = (Block) o;
        if (getBlockId()  b.getBlockId()) {
            return -1;
        } else if (getBlockId() == b.getBlockId()) {
            return 0;
        } else {
            return 1;
        }
    }
    public boolean equals(Object o) {
        Block b = (Block) o;
        return (this.compareTo(b) == 0);
    }
}

这个类中很明显有两个方法,readFields和write
Block在hadoop中是一个数据块的意思。因为一个文件在HDFS中是要被切分成很多块的。
很容易看出来,write就是把自己的字段从DataOutput这个接口发出去,readFields就是从DataInput这个接口里读进来初始化自己的字段。
这个设计的巧妙之处就在于这里。我们在传输对象的时候,建立一个Socket,那么我们可以获取这个Socket的DataInputStream和DataOutputStream.然后传给这两个函数。这样就很轻松低耦合的实现了对象传输。

Writable这个类在hadoop中是一个非常重要的基础类,整个Hadoop的大厦都建立在RPC机制上。而Writable接口功不可没。

没什么时间就写这么多,该做点别的事情了,最近看python源码剖析看得非常激情澎湃。有志同道合的网友欢迎来交流。

下次有时间再来写RPC机制的另外一部分。代理机制。


本文来自ChinaUnix博客,如果查看原文请点:http://blog.chinaunix.net/u3/105041/showart_2139430.html
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP