- 论坛徽章:
- 0
|
这一段时间在忙自己的MapReduce追踪系统。今天总算把MapReduce的任务调度策略的实时追踪系统写完并且调试通过了。说实话,确实比较幸苦。感谢这段时间女朋友的陪伴,虽然比较多的时间只是趴在我桌子旁边睡觉。
有时间可以考虑把系统截图发上来。让更多hadoop爱好者一起讨论。
为了完成对Hadoop源码的剖析,今天我继续写一点东西。
Hadoop中的RPC机制。
RPC已经被很多库实现了,感觉在Sun的NFS中的RPC机制就有,还有apache组织的xmlrpc,还有java的rmi,很多都是实现这个RPC,即远程过程调用。引入RPC这一层是软件设计的伟大创举,使得分布式程序跟单节点程序一样易于编写。
一个例子就是:
对象1 funcName(对象2,对象3,对象4...)
在Hadoop中,这些对象都必须能在网络上传输。不是每一个对象都满足这个条件。
所以今天先讲Hadoop中的对象传输机制。
在Hadoop中,能被传输的对象都实现了Writable接口。
我们先看下这个接口。
/**
* Copyright 2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
/** A simple, efficient, serialization protocol, based on {@link DataInput} and
* {@link DataOutput}.
*
* Implementations typically implement a static read(DataInput)
* method which constructs a new instance, calls {@link
* #readFields(DataInput)}, and returns the instance.
*
* @author Doug Cutting
*/
/**
* write/read for network transfer class
*/
public interface Writable {
/** Writes the fields of this object to out. */
void write(DataOutput out) throws IOException;
/**
* Reads the fields of this object from in. For efficiency,
* implementations should attempt to re-use storage in the existing object
* where possible.
*/
void readFields(DataInput in) throws IOException;
}
这个接口非常简单,两个方法。
如果一个类实现了这个接口,也就是说这个类有这两个方法的实现。那么它就可以在网络上进行传输。
我们拿一个类来做例子。
/**
* Copyright 2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.dfs;
import org.apache.hadoop.io.*;
import java.io.*;
import java.util.*;
/**************************************************
* A Block is a Hadoop FS primitive, identified by a long.
*
* @author Mike Cafarella
**************************************************/
public class Block implements Writable, Comparable {
static { // register a ctor
WritableFactories.setFactory(Block.class, new WritableFactory() {
public Writable newInstance() {
return new Block();
}
});
}
static Random r = new Random();
/**
*/
public static boolean isBlockFilename(File f) {
if (f.getName().startsWith("blk_")) {
return true;
} else {
return false;
}
}
long blkid;
long len;
/**
*/
public Block() {
this.blkid = r.nextLong();
this.len = 0;
}
/**
*/
public Block(long blkid, long len) {
this.blkid = blkid;
this.len = len;
}
/**
* Find the blockid from the given filename
*/
public Block(File f, long len) {
String name = f.getName();
name = name.substring("blk_".length());
this.blkid = Long.parseLong(name);
this.len = len;
}
/**
*/
public long getBlockId() {
return blkid;
}
/**
*/
public String getBlockName() {
return "blk_" + String.valueOf(blkid);
}
/**
*/
public long getNumBytes() {
return len;
}
public void setNumBytes(long len) {
this.len = len;
}
/**
*/
public String toString() {
return getBlockName();
}
// ///////////////////////////////////
// Writable
// ///////////////////////////////////
public void write(DataOutput out) throws IOException {
out.writeLong(blkid);
out.writeLong(len);
}
public void readFields(DataInput in) throws IOException {
this.blkid = in.readLong();
this.len = in.readLong();
}
// ///////////////////////////////////
// Comparable
// ///////////////////////////////////
public int compareTo(Object o) {
Block b = (Block) o;
if (getBlockId() b.getBlockId()) {
return -1;
} else if (getBlockId() == b.getBlockId()) {
return 0;
} else {
return 1;
}
}
public boolean equals(Object o) {
Block b = (Block) o;
return (this.compareTo(b) == 0);
}
}
这个类中很明显有两个方法,readFields和write
Block在hadoop中是一个数据块的意思。因为一个文件在HDFS中是要被切分成很多块的。
很容易看出来,write就是把自己的字段从DataOutput这个接口发出去,readFields就是从DataInput这个接口里读进来初始化自己的字段。
这个设计的巧妙之处就在于这里。我们在传输对象的时候,建立一个Socket,那么我们可以获取这个Socket的DataInputStream和DataOutputStream.然后传给这两个函数。这样就很轻松低耦合的实现了对象传输。
Writable这个类在hadoop中是一个非常重要的基础类,整个Hadoop的大厦都建立在RPC机制上。而Writable接口功不可没。
没什么时间就写这么多,该做点别的事情了,最近看python源码剖析看得非常激情澎湃。有志同道合的网友欢迎来交流。
下次有时间再来写RPC机制的另外一部分。代理机制。
本文来自ChinaUnix博客,如果查看原文请点:http://blog.chinaunix.net/u3/105041/showart_2139430.html |
|