忘记密码   免费注册 查看新帖 |

ChinaUnix.net

  平台 论坛 博客 文库 频道自动化运维 虚拟化 储存备份 C/C++ PHP MySQL 嵌入式 Linux系统
最近访问板块 发新帖
查看: 9296 | 回复: 2

[Spark] java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWrita [复制链接]

论坛徽章:
1
戌狗
日期:2013-10-24 17:31:55
发表于 2015-11-21 19:45 |显示全部楼层
以下程序提交后报错(错误列在后边),请指教!

import com.twitter.chill.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
* Created by lyxing.bj on 2015/11/17.
*/
public class Spark_hbase_test implements Serializable {
    static String convertScanToString (Scan scan)throws IOException {
        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        return Base64.encodeBytes(proto.toByteArray());
    }

    public void start() throws Exception {

        JavaSparkContext sc = new JavaSparkContext("local", "spark-hbase-test", System.getenv("SPARK_HOME", System.getenv("JARS");
        
        Configuration conf = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(conf);
        Scan scan = new Scan();
        SingleColumnValueFilter scvf = new SingleColumnValueFilter(Bytes.toBytes("meta_data",Bytes.toBytes("HDSD00_01_185_D", CompareFilter.CompareOp.GREATER_OR_EQUAL,new BinaryComparator("2015-10-01 00:00:00".getBytes()));
        SingleColumnValueFilter scvf1 = new SingleColumnValueFilter(Bytes.toBytes("meta_data",Bytes.toBytes("HDSD00_01_185_D", CompareFilter.CompareOp.LESS_OR_EQUAL,new BinaryComparator("2015-10-31 00:00:00".getBytes()));
        scvf.setFilterIfMissing(true);
        scvf1.setFilterIfMissing(true);
        List<Filter> filters = new ArrayList<Filter>();
        filters.add(scvf);
        filters.add(scvf1);
        FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL,filters);

        scan.setFilter(fl);
        scan.addColumn(Bytes.toBytes("meta_data",Bytes.toBytes("HDSD00_01_185_D");
        scan.addColumn(Bytes.toBytes("meta_data",Bytes.toBytes("HDSD00_01_003");
        try {
            String tablename = "HDSC02_09";
            conf.set(HConstants.ZOOKEEPER_QUORUM, "node1.jkzldev.com,node2.jkzldev.com,node4.jkzldev.com:2181");

            conf.set("zookeeper.znode.parent","/hbase-unsecure");

            conf.set(TableInputFormat.INPUT_TABLE, tablename);
            conf.set(TableInputFormat.SCAN, convertScanToString(scan));

            JavaPairRDD<ImmutableBytesWritable, Result> hbaseRDD = sc.newAPIHadoopRDD(conf,
                    TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

            System.out.println("hbaseRDD'S count=" + hbaseRDD.count());
            List<Tuple2<ImmutableBytesWritable,Result>> hbaseRDD_list = hbaseRDD.collect();
            for (Tuple2<ImmutableBytesWritable,Result> tuple : hbaseRDD_list) {
                System.out.println("eeeeee-----" + tuple._2().getValue(Bytes.toBytes("meta_data"),Bytes.toBytes("HDSD00_01_185_D")) );
            }

            //构建RDD transformation
            JavaPairRDD<String,Integer> discharge = hbaseRDD.mapToPair(
                    new PairFunction<Tuple2<ImmutableBytesWritable,Result>,String,Integer>(){

                        public Tuple2<String,Integer> call(
                                Tuple2<ImmutableBytesWritable,Result> immutableBytesWritableResultTuple2
                        )throws Exception{
                            //时间
                            byte[] HDSD00_01_185_D = immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("meta_data"),Bytes.toBytes("HDSD00_01_185_D"));
                            //性别
                            byte[] HDSD00_01_003 = immutableBytesWritableResultTuple2._2.getValue(Bytes.toBytes("meta_data"),Bytes.toBytes("HDSD00_01_003"));

                            if(HDSD00_01_185_D != null && HDSD00_01_003 !=null ){
                                System.out.println("time+office" + Bytes.toString(HDSD00_01_185_D).substring(0,9) + Bytes.toString(HDSD00_01_003));
                                return new Tuple2<String, Integer>(Bytes.toString(HDSD00_01_185_D).substring(0,9) + Bytes.toString(HDSD00_01_003),1);
                            }
                            return null;
                        }
                    }
            );
            System.out.println("discharge's count=" + discharge.count());
            List<Tuple2<String,Integer>> discharge1 = discharge.collect();

            System.out.println("result=" + discharge1.get(1)._1 + discharge1.get(1)._2);
            //构建RDD action
            JavaPairRDD<String,Integer> counts = discharge.reduceByKey(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            });
            if(counts.count()>0){
                List<Tuple2<String, Integer>> one = counts.take(1);

                System.out.print(one.get(1)._1 + one.get(1)._2);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
//        new Spark_hbase_test().start();
        new Spark_hbase_test().start();
        System.exit(0);
    }
}

错误记录如下:
566a61e0x0, quorum=node1.jkzldev.com:2181,node2.jkzldev.com:2181,node4.jkzldev.com:2181, baseZNode=/hbase-unsecure
15/11/20 15:37:04 INFO ClientCnxn: Opening socket connection to server node1.jkzldev.com/172.17.110.116:2181. Will not attempt to authenticate using SASL (unknown error)
15/11/20 15:37:04 INFO ClientCnxn: Socket connection established to node1.jkzldev.com/172.17.110.116:2181, initiating session
15/11/20 15:37:04 INFO ClientCnxn: Session establishment complete on server node1.jkzldev.com/172.17.110.116:2181, sessionid = 0x151149ae9b600bf, negotiated timeout = 40000
15/11/20 15:37:04 INFO TableInputFormatBase: Input split length: 1 M bytes.
15/11/20 15:37:04 INFO ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x151149ae9b600bf
15/11/20 15:37:04 INFO ZooKeeper: Session: 0x151149ae9b600bf closed
15/11/20 15:37:04 INFO ClientCnxn: EventThread shut down
15/11/20 15:37:05 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
        - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 34 31 38 37 32 36 30 37 2d 39 5f 31 30 32 39 36 39 30 36 5f 30 30 30 36 32 33 34 32 37 5f 31 34 34 34 30 36 30 38 30 30 30 30 30 24 30)
        - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
        - object (class scala.Tuple2, (34 31 38 37 32 36 30 37 2d 39 5f 31 30 32 39 36 39 30 36 5f 30 30 30 36 32 33 34 32 37 5f 31 34 34 34 30 36 30 38 30 30 30 30 30 24 30,keyvalues={41872607-9_10282457_000614540_1443888000000$0/meta_data:HDSD00_01_185_D/1447233947980/Put/vlen=19/seqid=0}))
        - element of array (index: 0)
        - array (class [Lscala.Tuple2;, size 546)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:3
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
15/11/20 15:37:05 ERROR TaskSetManager: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
        - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 34 31 38 37 32 36 30 37 2d 39 5f 31 30 32 39 36 39 30 36 5f 30 30 30 36 32 33 34 32 37 5f 31 34 34 34 30 36 30 38 30 30 30 30 30 24 30)
        - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
        - object (class scala.Tuple2, (34 31 38 37 32 36 30 37 2d 39 5f 31 30 32 39 36 39 30 36 5f 30 30 30 36 32 33 34 32 37 5f 31 34 34 34 30 36 30 38 30 30 30 30 30 24 30,keyvalues={41872607-9_10282457_000614540_1443888000000$0/meta_data:HDSD00_01_185_D/1447233947980/Put/vlen=19/seqid=0}))
        - element of array (index: 0)
        - array (class [Lscala.Tuple2;, size 546); not retrying
15/11/20 15:37:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/11/20 15:37:05 INFO TaskSchedulerImpl: Cancelling stage 1
15/11/20 15:37:05 INFO DAGScheduler: Stage 1 (collect at Spark_hbase_test.java:231) failed in 0.227 s
15/11/20 15:37:05 INFO DAGScheduler: Job 1 failed: collect at Spark_hbase_test.java:231, took 0.241115 s

论坛徽章:
15
2015七夕节徽章
日期:2015-08-21 11:06:172017金鸡报晓
日期:2017-01-10 15:19:56极客徽章
日期:2016-12-07 14:07:30shanzhi
日期:2016-06-17 17:59:3115-16赛季CBA联赛之四川
日期:2016-04-13 14:36:562016猴年福章徽章
日期:2016-02-18 15:30:34IT运维版块每日发帖之星
日期:2016-01-28 06:20:0015-16赛季CBA联赛之新疆
日期:2016-01-25 14:01:34IT运维版块每周发帖之星
日期:2016-01-07 23:04:26数据库技术版块每日发帖之星
日期:2016-01-03 06:20:00数据库技术版块每日发帖之星
日期:2015-12-01 06:20:00IT运维版块每日发帖之星
日期:2015-11-10 06:20:00
发表于 2015-11-23 21:30 |显示全部楼层
我估计你baidu一下就能得到答案,从抛出的异常来看,我估计你看看这个程序就知道了
http://my.oschina.net/132722/blog/196350

论坛徽章:
1
戌狗
日期:2013-10-24 17:31:55
发表于 2016-05-16 09:00 |显示全部楼层
回复 2# heguangwu


    谢谢
您需要登录后才可以回帖 登录 | 注册

本版积分规则

【重磅资料】多云网络实战的相关问题汇总!
云网融合的多云网络

本文介绍如何管理私有云数据中心,构建数据中心互联和混合云解决方案。对于OTT 网络架构的深入理解,基本上来源于SIGCOM 的白皮书和一些公开视频。

Overlay SDN 控制器详解

云计算为了适应业务/APP 的快速开发和部署,会把网络分为两层:Overlay 和 Underlay 网络。本文主要讲Overlay网络层面的问题。

超级核心路由器演进

2016 年,网络连接已经采用100G/200G/400G(虽然 400GE 接口技术还未成熟),互联网出口也已经增长到了 T 级别。

获得资料 >>
  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号 北京市公安局海淀分局网监中心备案编号:11010802020122
广播电视节目制作经营许可证(京) 字第1234号 中国互联网协会会员  联系我们:wangnan@it168.com
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP