Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.
*출처 : http://java8.tistory.com/39
1. 성공케이스
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | packageorg.mystudy.testcase;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;publicclassTestCase1 {    JavaSparkContext sc = null;    privateTestCase1() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");        sc = newJavaSparkContext("local[2]", "First Spark App");    }    publicstaticvoidmain(String... strings) {        TestCase1 t = newTestCase1();        t.proc1();        t.proc2();    }    privatevoidproc1() {        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        JavaRDD<integer> rdd3 = rdd2.map(a -> a + 1);        System.out.println(rdd3.collect());    }    privatevoidproc2() {        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        intnum2 = 3;        JavaRDD<integer> rdd3 = rdd2.map(a -> a + num2);        System.out.println(rdd3.collect());    }}</integer></integer></integer></integer> | 
좋은 케이스 : 에러 없이 잘... 작동한다.
JAVA8의 람다식이다.
2. 실패사례 - 전역변수(멤버필드)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | packageorg.mystudy.testcase;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;publicclassTestCase2 {    privateintnum1 = 4;    JavaSparkContext sc = null;    privateTestCase2() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");        sc = newJavaSparkContext("local[2]", "First Spark App");    }    publicstaticvoidmain(String... strings) {        TestCase2 t = newTestCase2();        System.out.println("t:"+t);        t.proc3();    }    privatevoidproc3() {        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        JavaRDD<integer> rdd3 = rdd2.map(a -> a + this.num1);                // Exception 발생        System.out.println(rdd3.collect());    }}</integer></integer> | 
Exception 발생
람다식에 this.num1 이 사용되었다. this는 TestCase2 자체를 의미하므로, 현재 TestCase2 가 Serializable 을 구현하지 않았으므로 아래와 같은 Exception 이 발생한다.
2-1 해결책
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | packageorg.mystudy.testcase;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;publicclassTestCase2Sol1 {    privateintnum1 = 4;    JavaSparkContext sc = null;    privateTestCase2Sol1() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");        sc = newJavaSparkContext("local[2]", "First Spark App");    }    publicstaticvoidmain(String... strings) {        TestCase2Sol1 t = newTestCase2Sol1();        t.proc3();    }    privatevoidproc3() {        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        intnum1 = this.num1;                                       // 해결        JavaRDD<integer> rdd3 = rdd2.map(a -> a + num1);             // 해결        System.out.println(rdd3.collect());    }}</integer></integer> | 
[러닝 스파크] 책에서 소개하는 방식으로...
this.num1의 값을 지역변수로 재할당해서 사용하면 된다.
2-2 이렇게도 해결할 수 있을까? 안돼~
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | packageorg.mystudy.testcase;importjava.io.Serializable;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;publicclassTestCase2Sol2 implementsSerializable {    privateintnum1 = 4;    privateJavaSparkContext sc = null;    privateTestCase2Sol2() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");        sc = newJavaSparkContext("local[2]", "First Spark App");    }    publicstaticvoidmain(String... strings) {        TestCase2Sol2 t = newTestCase2Sol2();        System.out.println("t:"+t);        System.out.println("sc:"+t.sc);        t.proc3();    }    privatevoidproc3() {        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        JavaRDD<integer> rdd3 = rdd2.map(a -> a + this.num1);                // 여전히 Exception 발생        System.out.println(rdd3.collect());    }}</integer></integer> | 
implements Serializable 을 했음에도 Exception이 발생한다.
이유인즉은, JavaSparkContext 객체를 위 코드에서 클래스의 전역변수로 사용하고 있는데, 아무리 클래스에 Serializable을 구현해놓아도
멤버필드 즉, JavaSparkContext sc 는 기본적으로 직렬화가 안되는 모양이다;;;
16/04/08 00:10:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
t:org.mystudy.testcase.TestCase2Sol2@247667dd
sc:org.apache.spark.api.java.JavaSparkContext@6f099cef
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.map(RDD.scala:323)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)
at org.mystudy.testcase.TestCase2Sol2.proc3(TestCase2Sol2.java:28)
at org.mystudy.testcase.TestCase2Sol2.main(TestCase2Sol2.java:23)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.api.java.JavaSparkContext, value: org.apache.spark.api.java.JavaSparkContext@6f099cef)
- field (class: org.mystudy.testcase.TestCase2Sol2, name: sc, type: class org.apache.spark.api.java.JavaSparkContext)
- object (class org.mystudy.testcase.TestCase2Sol2, org.mystudy.testcase.TestCase2Sol2@247667dd)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.mystudy.testcase.TestCase2Sol2, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial org/mystudy/testcase/TestCase2Sol2.lambda$0:(Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.mystudy.testcase.TestCase2Sol2$$Lambda$4/1353512285, org.mystudy.testcase.TestCase2Sol2$$Lambda$4/1353512285@116a2108)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 13 more
2-3 이렇게 해결할 수 있다.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | packageorg.mystudy.testcase;importjava.io.Serializable;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;publicclassTestCase2Sol3 implementsSerializable {    privateintnum1 = 4;    privateTestCase2Sol3() {    }    publicstaticvoidmain(String... strings) {        TestCase2Sol3 t = newTestCase2Sol3();        t.proc3();    }    privatevoidproc3() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");        JavaSparkContext sc = newJavaSparkContext("local[2]", "First Spark App");        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        JavaRDD<integer> rdd3 = rdd2.map(a -> a + this.num1);            // 해결        System.out.println(rdd3.collect());    }}</integer></integer> | 
JavaSparkContext 를 지역변수로 사용하였다. 해결됨.
3.실패사례 - 함수사용(멤버메서드)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | packageorg.mystudy.testcase;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;publicclassTestCase3 {    privateTestCase3() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");    }    publicstaticvoidmain(String... strings) {        TestCase3 t = newTestCase3();        System.out.println("t:"+t);        t.proc3();    }    privateintadd(intnum) {        returnnum + 1;    }    privatevoidproc3() {        JavaSparkContext sc = newJavaSparkContext("local[2]", "First Spark App");        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        JavaRDD<integer> rdd3 = rdd2.map(a -> add(a));                       // Exception 발생        System.out.println(rdd3.collect());    }}</integer></integer> | 
this 를 사용했던 경우와 같은 문제이다. TestCase3 클래스를 Serializable 하지 않아서 생긴 문제이다.
t:org.mystudy.testcase.TestCase3@75a1cd57
16/04/08 00:17:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.map(RDD.scala:323)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)
at org.mystudy.testcase.TestCase3.proc3(TestCase3.java:26)
at org.mystudy.testcase.TestCase3.main(TestCase3.java:18)
Caused by: java.io.NotSerializableException: org.mystudy.testcase.TestCase3
Serialization stack:
- object not serializable (class: org.mystudy.testcase.TestCase3, value: org.mystudy.testcase.TestCase3@75a1cd57)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.mystudy.testcase.TestCase3, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial org/mystudy/testcase/TestCase3.lambda$0:(Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.mystudy.testcase.TestCase3$$Lambda$4/503353142, org.mystudy.testcase.TestCase3$$Lambda$4/503353142@7a1f8def)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 13 more
3-1 해결
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | packageorg.mystudy.testcase;importjava.io.Serializable;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;publicclassTestCase3Sol1 implementsSerializable {    privateTestCase3Sol1() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");    }    publicstaticvoidmain(String... strings) {        TestCase3Sol1 t = newTestCase3Sol1();        t.proc3();    }    privateintadd(intnum) {        returnnum + 1;    }    privatevoidproc3() {        JavaSparkContext sc = newJavaSparkContext("local[2]", "First Spark App");        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        JavaRDD<integer> rdd3 = rdd2.map(a -> add(a));                       // 해결        System.out.println(rdd3.collect());    }}</integer></integer> | 
Serializable 구현해서 해결하였다.
4.실패사례 - Function 등 인터페이스 문제
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | packageorg.mystudy.testcase;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.Function;publicclassTestCase4 {    privateTestCase4() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");    }    publicstaticvoidmain(String... strings) {        TestCase4 t = newTestCase4();        System.out.println("t:"+t);        t.proc3();    }    privatevoidproc3() {        classAAA implementsFunction<Integer, Integer> {            @Override            publicInteger call(Integer v1) throwsException {                returnv1 + 1;            }        }        JavaSparkContext sc = newJavaSparkContext("local[2]", "First Spark App");        JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        JavaRDD<Integer> rdd3 = rdd2.map(newAAA());                      // Exception        System.out.println(rdd3.collect());    }} | 
무엇이 문제일까?
내부클래스를 사용했더니... 그 내부클래스를 품고 있는 바깥클래스 즉, TestCase4의 Serializable 여부를 묻고 있다.
아무리 Function 인터페이스가 Serializable을 구현했다고 해도... 그 Function 을 구현한 AAA 라는 클래스가 바깥클래스의 정체성과 연관이 있나보다.
어쩌면 AAA 라는 내부클래스를 정의할때 org.mystudy.testcase.TestCase4$1AAA@60acd609 이렇게 사용하기에.... TestCase4$1, 결국 TestCase4 가 결정적인 역할을 하는 것 같다.
앞의 예제 this.num1 과 같이... 실제 전달하는 값은 num1 이지만, 결국 스파크에 전달되는 것은 num1을 포함하는 this가 전달되는 것과 마찬가지로..
스파크에 AAA만 전달되는 것 같지만 결국은 AAA를 포함하는 TestCase4 가 전달되는 것은 아닌가 싶다. 그래서 TestCase4 의 Serializable 여부를 묻고 있는 것이 아닌가???
t:org.mystudy.testcase.TestCase4@5e91993f
16/04/08 00:24:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.map(RDD.scala:323)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)
at org.mystudy.testcase.TestCase4.proc3(TestCase4.java:31)
at org.mystudy.testcase.TestCase4.main(TestCase4.java:19)
Caused by: java.io.NotSerializableException: org.mystudy.testcase.TestCase4
Serialization stack:
- object not serializable (class: org.mystudy.testcase.TestCase4, value: org.mystudy.testcase.TestCase4@5e91993f)
- field (class: org.mystudy.testcase.TestCase4$1AAA, name: this$0, type: class org.mystudy.testcase.TestCase4)
- object (class org.mystudy.testcase.TestCase4$1AAA, org.mystudy.testcase.TestCase4$1AAA@60acd609)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 13 more
4-1. 혹시나 이렇게 해보았지만...
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | packageorg.mystudy.testcase;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.Function;publicclassTestCase4Sol1 {    privateTestCase4Sol1() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");    }    publicstaticvoidmain(String... strings) {        TestCase4Sol1 t = newTestCase4Sol1();        System.out.println("t:"+t);        t.proc3();    }    classAAA implementsFunction<Integer, Integer> {        @Override        publicInteger call(Integer v1) throwsException {            returnv1 + 1;        }    }    privatevoidproc3() {        JavaSparkContext sc = newJavaSparkContext("local[2]", "First Spark App");        JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        JavaRDD<Integer> rdd3 = rdd2.map(newAAA());                      //Exception        System.out.println(rdd3.collect());    }} | 
혹시나 class를 함수밖으로 빼보았지만.... 동일한 Exception이 발생한다.
t:org.mystudy.testcase.TestCase4Sol1@5e91993f
16/04/08 00:33:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.map(RDD.scala:323)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)
at org.mystudy.testcase.TestCase4Sol1.proc3(TestCase4Sol1.java:30)
at org.mystudy.testcase.TestCase4Sol1.main(TestCase4Sol1.java:19)
Caused by: java.io.NotSerializableException: org.mystudy.testcase.TestCase4Sol1
Serialization stack:
- object not serializable (class: org.mystudy.testcase.TestCase4Sol1, value: org.mystudy.testcase.TestCase4Sol1@5e91993f)
- field (class: org.mystudy.testcase.TestCase4Sol1$AAA, name: this$0, type: class org.mystudy.testcase.TestCase4Sol1)
- object (class org.mystudy.testcase.TestCase4Sol1$AAA, org.mystudy.testcase.TestCase4Sol1$AAA@598260a6)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 13 more
4-2 외부클래스를 이용해서 해결
| 1 2 3 4 5 6 7 8 9 10 | packageorg.mystudy.testcase.vo;importorg.apache.spark.api.java.function.Function;publicclassAAA implementsFunction<Integer, Integer> {    @Override    publicInteger call(Integer v1) throwsException {        returnv1 + 1;    }} | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | packageorg.mystudy.testcase;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.mystudy.testcase.vo.AAA;publicclassTestCase4Sol2 {    privateTestCase4Sol2() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");    }    publicstaticvoidmain(String... strings) {        TestCase4Sol2 t = newTestCase4Sol2();        t.proc3();    }    privatevoidproc3() {        JavaSparkContext sc = newJavaSparkContext("local[2]", "First Spark App");        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        JavaRDD<integer> rdd3 = rdd2.map(newAAA());                        //해결        System.out.println(rdd3.collect());    }}</integer></integer> | 
외부 public 클래스를 이용했더니 해결되었다. AAA 클래스가 다른 클래스의 영향을 받지 않고, 순수하게 Function의 영향만 받아서, 문제가 생기지 않는가 보다.
5. 실패사례 - 익명 내부 클래스
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | packageorg.mystudy.testcase;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.Function;publicclassTestCase5 {    privateTestCase5() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");    }    publicstaticvoidmain(String... strings) {        TestCase5 t = newTestCase5();        System.out.println("t:"+t);        t.proc3();    }    privatevoidproc3() {        JavaSparkContext sc = newJavaSparkContext("local[2]", "First Spark App");        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        JavaRDD<integer> rdd3 = rdd2.map(newFunction<integer, integer="">() {      // Exception            @Override            publicInteger call(Integer v1) throwsException {                returnv1 + 1;            }        });        System.out.println(rdd3.collect());    }}</integer,></integer></integer> | 
왜 Exception 이 발생하는가? 책대로 하였는데-_-'''
책에서 익명 내부 클래스를 사용하라고 했는데;;;
여전히 TestCase5 를 걸고 넘어지고 있다. 그저... Serializable해주면 된다. 그런데 왜 그래야 하는가? 익명인데-___-:;;
아래의 파란색 표시를 보면, 익명이더라도... 참조가 TestCase5 로 되어있다-_-;;;;
t:org.mystudy.testcase.TestCase5@5e91993f
16/04/08 00:40:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.map(RDD.scala:323)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)
at org.mystudy.testcase.TestCase5.proc3(TestCase5.java:25)
at org.mystudy.testcase.TestCase5.main(TestCase5.java:19)
Caused by: java.io.NotSerializableException: org.mystudy.testcase.TestCase5
Serialization stack:
- object not serializable (class: org.mystudy.testcase.TestCase5, value: org.mystudy.testcase.TestCase5@5e91993f)
- field (class: org.mystudy.testcase.TestCase5$1, name: this$0, type: class org.mystudy.testcase.TestCase5)
- object (class org.mystudy.testcase.TestCase5$1, org.mystudy.testcase.TestCase5$1@60acd609)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 13 more
5.1 변수로 받아볼까? 안돼~
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | packageorg.mystudy.testcase;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.Function;publicclassTestCase5Sol1 {    privateTestCase5Sol1() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");    }    publicstaticvoidmain(String... strings) {        TestCase5Sol1 t = newTestCase5Sol1();        System.out.println("t:"+t);        t.proc3();    }    privatevoidproc3() {        JavaSparkContext sc = newJavaSparkContext("local[2]", "First Spark App");        JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        Function<Integer, Integer> f = newFunction<Integer, Integer>() {            @Override            publicInteger call(Integer v1) throwsException {                returnv1 + 1;            }        };        System.out.println("f:"+f);        JavaRDD<Integer> rdd3 = rdd2.map(f);  //Exception        System.out.println(rdd3.collect());    }} | 
변수로 받아보아도 안된다.
에러메시지를 보면 여전히 바깥클래스가 걸려있다.
t:org.mystudy.testcase.TestCase5Sol1@5e91993f
16/04/08 00:44:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
f:org.mystudy.testcase.TestCase5Sol1$1@363f0ba0
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.map(RDD.scala:323)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)
at org.mystudy.testcase.TestCase5Sol1.proc3(TestCase5Sol1.java:32)
at org.mystudy.testcase.TestCase5Sol1.main(TestCase5Sol1.java:19)
Caused by: java.io.NotSerializableException: org.mystudy.testcase.TestCase5Sol1
Serialization stack:
- object not serializable (class: org.mystudy.testcase.TestCase5Sol1, value: org.mystudy.testcase.TestCase5Sol1@5e91993f)
- field (class: org.mystudy.testcase.TestCase5Sol1$1, name: this$0, type: class org.mystudy.testcase.TestCase5Sol1)
- object (class org.mystudy.testcase.TestCase5Sol1$1, org.mystudy.testcase.TestCase5Sol1$1@363f0ba0)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 13 more
5-2. 역시나 Serializable
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | packageorg.mystudy.testcase;importjava.io.Serializable;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.Function;publicclassTestCase5Sol2 implementsSerializable {    privateTestCase5Sol2() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");    }    publicstaticvoidmain(String... strings) {        TestCase5Sol2 t = newTestCase5Sol2();        t.proc3();    }    publicvoidproc3() {        JavaSparkContext sc = newJavaSparkContext("local[2]", "First Spark App");        JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        Function<Integer, Integer> f = newFunction<Integer, Integer>() {            @Override            publicInteger call(Integer v1) throwsException {                returnv1 + 1;            }        };        JavaRDD<Integer> rdd3 = rdd2.map(f);          // 해결        System.out.println(rdd3.collect());    }} | 
그냥.. 쉽게 생각하면, Serializable 해주면 된다-_-;;
클래스에 Serializable 해줄때, 주의할 사항은.. 클래스의 멤버필드가 모두 Serializable 하는데 문제가 없어야 된다.
>>>>
Serializable이 싫다면,,,
JAVA8 의 람다식을 사용하자.
또는 완전 독립적인 클래스(Serializable이 구현된)를 사용하자.
조금 더 해보자...
6.성공케이스 - 외부 클래스의 함수
| 1 2 3 4 5 6 7 8 9 10 | packageorg.mystudy.testcase.vo;publicclassBBB {    publicintadd(intnum) {        returnnum + 1;    }    publicstaticintbbb(intnum) {        returnnum + 1;    }} | 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | packageorg.mystudy.testcase;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.mystudy.testcase.vo.BBB;publicclassTestCase6 {    privateTestCase6() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");    }    publicstaticvoidmain(String... strings) {        TestCase6 t = newTestCase6();        t.proc3();    }    privatevoidproc3() {        JavaSparkContext sc = newJavaSparkContext("local[2]", "First Spark App");        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        JavaRDD<integer> rdd3 = rdd2.map(a -> newBBB().add(a));         //성공                System.out.println(rdd3.collect());    }}</integer></integer> | 
왜 성공인지 모르겠다.-_- BBB 클래스는 Serializable 하지 않았는데...
일단, 위 2~5 사례는 rdd2.map(함수인스턴스 자체); 형태였는데..
지금의 사례는 rdd2.map(a -> 함수연산?); 이라...조금 다르다.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | packageorg.mystudy.testcase;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.mystudy.testcase.vo.BBB;publicclassTestCase7 {    privateTestCase7() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");    }    publicstaticvoidmain(String... strings) {        TestCase7 t = newTestCase7();        t.proc3();    }    privatevoidproc3() {        JavaSparkContext sc = newJavaSparkContext("local[2]", "First Spark App");        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        JavaRDD<integer> rdd3 = rdd2.map(a -> BBB.bbb(a));               //성공                    System.out.println(rdd3.collect());    }}</integer></integer> | 
static 함수도 잘 된다..왜????
7.실패... 밖으로 나와서 인스턴스를 만들었더니...-__-;;;
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | packageorg.mystudy.testcase;importjava.util.Arrays;importorg.apache.log4j.PropertyConfigurator;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.mystudy.testcase.vo.BBB;publicclassTestCase8 {    privateTestCase8() {        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");    }    publicstaticvoidmain(String... strings) {        TestCase8 t = newTestCase8();        System.out.println("t:"+t);        t.proc3();    }    privatevoidproc3() {        JavaSparkContext sc = newJavaSparkContext("local[2]", "First Spark App");        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));        BBB b = newBBB();        System.out.println("b:"+b);        JavaRDD<integer> rdd3 = rdd2.map(a -> b.add(a));                 //Exception                 System.out.println(rdd3.collect());    }}</integer></integer> | 
위의 잘되던 케이스에서...
람다 밖에서 BBB 인스턴스를 만들어서 넣어줬더니...
이제와서 BBB의 Serializable을 요구한다-_-;;
BBB 클래스에
public class BBB implements Serializable{ 와 같이 구현하면 에러가 사라진다...
이게 뭔가-_-;;;
t:org.mystudy.testcase.TestCase8@75a1cd57
16/04/08 01:09:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
b:org.mystudy.testcase.vo.BBB@681adc8f
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.map(RDD.scala:323)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)
at org.mystudy.testcase.TestCase8.proc3(TestCase8.java:27)
at org.mystudy.testcase.TestCase8.main(TestCase8.java:19)
Caused by: java.io.NotSerializableException: org.mystudy.testcase.vo.BBB
Serialization stack:
- object not serializable (class: org.mystudy.testcase.vo.BBB, value: org.mystudy.testcase.vo.BBB@681adc8f)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.mystudy.testcase.TestCase8, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/mystudy/testcase/TestCase8.lambda$0:(Lorg/mystudy/testcase/vo/BBB;Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.mystudy.testcase.TestCase8$$Lambda$4/1018067851, org.mystudy.testcase.TestCase8$$Lambda$4/1018067851@5e8c34a0)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 13 more
출처: http://java8.tistory.com/39 [버그 리포트]
