메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


*출처 : http://java8.tistory.com/39


1. 성공케이스

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package org.mystudy.testcase;
 
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
 
public class TestCase1 {
    JavaSparkContext sc = null;
 
    private TestCase1() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
        sc = new JavaSparkContext("local[2]", "First Spark App");
    }
 
    public static void main(String... strings) {
        TestCase1 t = new TestCase1();
        t.proc1();
        t.proc2();
    }
 
    private void proc1() {
        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        JavaRDD<integer> rdd3 = rdd2.map(a -> a + 1);
        System.out.println(rdd3.collect());
    }
 
    private void proc2() {
        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        int num2 = 3;
        JavaRDD<integer> rdd3 = rdd2.map(a -> a + num2);
        System.out.println(rdd3.collect());
    }
}
 
</integer></integer></integer></integer>

좋은 케이스 : 에러 없이 잘... 작동한다.

JAVA8의 람다식이다.



2. 실패사례 - 전역변수(멤버필드)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package org.mystudy.testcase;
 
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
 
public class TestCase2 {
    private int num1 = 4;
    JavaSparkContext sc = null;
 
    private TestCase2() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
        sc = new JavaSparkContext("local[2]", "First Spark App");
    }
 
    public static void main(String... strings) {
        TestCase2 t = new TestCase2();
        System.out.println("t:"+t);
        t.proc3();
    }
 
    private void proc3() {
        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        JavaRDD<integer> rdd3 = rdd2.map(a -> a + this.num1);                // Exception 발생
        System.out.println(rdd3.collect());
    }
 
}
 
 
</integer></integer>

Exception 발생

람다식에 this.num1 이 사용되었다. this는 TestCase2 자체를 의미하므로, 현재 TestCase2 가 Serializable 을 구현하지 않았으므로 아래와 같은 Exception 이 발생한다.


16/04/08 00:01:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
t:org.mystudy.testcase.TestCase2@247667dd
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.map(RDD.scala:323)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)
at org.mystudy.testcase.TestCase2.proc3(TestCase2.java:26)
at org.mystudy.testcase.TestCase2.main(TestCase2.java:21)
Caused by: java.io.NotSerializableException: org.mystudy.testcase.TestCase2
Serialization stack:
- object not serializable (class: org.mystudy.testcase.TestCase2, value: org.mystudy.testcase.TestCase2@247667dd)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.mystudy.testcase.TestCase2, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial org/mystudy/testcase/TestCase2.lambda$0:(Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.mystudy.testcase.TestCase2$$Lambda$4/503353142, org.mystudy.testcase.TestCase2$$Lambda$4/503353142@7a1f8def)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 13 more

 
 


2-1 해결책

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package org.mystudy.testcase;
 
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
 
public class TestCase2Sol1 {
    private int num1 = 4;
    JavaSparkContext sc = null;
 
    private TestCase2Sol1() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
        sc = new JavaSparkContext("local[2]", "First Spark App");
    }
 
    public static void main(String... strings) {
        TestCase2Sol1 t = new TestCase2Sol1();
        t.proc3();
    }
 
    private void proc3() {
        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        int num1 = this.num1;                                       // 해결
        JavaRDD<integer> rdd3 = rdd2.map(a -> a + num1);             // 해결
        System.out.println(rdd3.collect());
    }
}
 
</integer></integer>

[러닝 스파크] 책에서 소개하는 방식으로...

this.num1의 값을 지역변수로 재할당해서 사용하면 된다.



2-2 이렇게도 해결할 수 있을까? 안돼~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package org.mystudy.testcase;
 
import java.io.Serializable;
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
 
public class TestCase2Sol2 implements Serializable {
    private int num1 = 4;
    private JavaSparkContext sc = null;
 
    private TestCase2Sol2() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
        sc = new JavaSparkContext("local[2]", "First Spark App");
    }
 
    public static void main(String... strings) {
        TestCase2Sol2 t = new TestCase2Sol2();
        System.out.println("t:"+t);
        System.out.println("sc:"+t.sc);
        t.proc3();
    }
 
    private void proc3() {
        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        JavaRDD<integer> rdd3 = rdd2.map(a -> a + this.num1);                // 여전히 Exception 발생
        System.out.println(rdd3.collect());
    }
}
 
 
</integer></integer>

implements Serializable 을 했음에도 Exception이 발생한다.

이유인즉은, JavaSparkContext 객체를 위 코드에서 클래스의 전역변수로 사용하고 있는데, 아무리 클래스에 Serializable을 구현해놓아도

멤버필드 즉, JavaSparkContext sc 는 기본적으로 직렬화가 안되는 모양이다;;;

 

16/04/08 00:10:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

t:org.mystudy.testcase.TestCase2Sol2@247667dd

sc:org.apache.spark.api.java.JavaSparkContext@6f099cef

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)

at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.map(RDD.scala:323)

at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)

at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)

at org.mystudy.testcase.TestCase2Sol2.proc3(TestCase2Sol2.java:28)

at org.mystudy.testcase.TestCase2Sol2.main(TestCase2Sol2.java:23)

Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext

Serialization stack:

- object not serializable (class: org.apache.spark.api.java.JavaSparkContext, value: org.apache.spark.api.java.JavaSparkContext@6f099cef)

- field (class: org.mystudy.testcase.TestCase2Sol2, name: sc, type: class org.apache.spark.api.java.JavaSparkContext)

- object (class org.mystudy.testcase.TestCase2Sol2, org.mystudy.testcase.TestCase2Sol2@247667dd)

- element of array (index: 0)

- array (class [Ljava.lang.Object;, size 1)

- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)

- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.mystudy.testcase.TestCase2Sol2, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial org/mystudy/testcase/TestCase2Sol2.lambda$0:(Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1])

- writeReplace data (class: java.lang.invoke.SerializedLambda)

- object (class org.mystudy.testcase.TestCase2Sol2$$Lambda$4/1353512285, org.mystudy.testcase.TestCase2Sol2$$Lambda$4/1353512285@116a2108)

- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)

- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)

at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

... 13 more





2-3 이렇게 해결할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package org.mystudy.testcase;
 
import java.io.Serializable;
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
 
public class TestCase2Sol3 implements Serializable {
    private int num1 = 4;
 
    private TestCase2Sol3() {
 
    }
 
    public static void main(String... strings) {
        TestCase2Sol3 t = new TestCase2Sol3();
        t.proc3();
    }
 
    private void proc3() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
        JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        JavaRDD<integer> rdd3 = rdd2.map(a -> a + this.num1);            // 해결
        System.out.println(rdd3.collect());
    }
}
 
</integer></integer>

JavaSparkContext 를 지역변수로 사용하였다. 해결됨.



3.실패사례 - 함수사용(멤버메서드)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package org.mystudy.testcase;
 
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
 
public class TestCase3 {
 
    private TestCase3() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
    }
 
    public static void main(String... strings) {
        TestCase3 t = new TestCase3();
        System.out.println("t:"+t);
        t.proc3();
    }
    private int add(int num) {
        return num + 1;
    }
    private void proc3() {
        JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        JavaRDD<integer> rdd3 = rdd2.map(a -> add(a));                       // Exception 발생
        System.out.println(rdd3.collect());
    }
}
 
</integer></integer>

this 를 사용했던 경우와 같은 문제이다. TestCase3 클래스를 Serializable 하지 않아서 생긴 문제이다.

t:org.mystudy.testcase.TestCase3@75a1cd57

16/04/08 00:17:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)

at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.map(RDD.scala:323)

at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)

at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)

at org.mystudy.testcase.TestCase3.proc3(TestCase3.java:26)

at org.mystudy.testcase.TestCase3.main(TestCase3.java:18)

Caused by: java.io.NotSerializableException: org.mystudy.testcase.TestCase3

Serialization stack:

- object not serializable (class: org.mystudy.testcase.TestCase3, value: org.mystudy.testcase.TestCase3@75a1cd57)

- element of array (index: 0)

- array (class [Ljava.lang.Object;, size 1)

- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)

- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.mystudy.testcase.TestCase3, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial org/mystudy/testcase/TestCase3.lambda$0:(Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1])

- writeReplace data (class: java.lang.invoke.SerializedLambda)

- object (class org.mystudy.testcase.TestCase3$$Lambda$4/503353142, org.mystudy.testcase.TestCase3$$Lambda$4/503353142@7a1f8def)

- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)

- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)

at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

... 13 more





3-1 해결

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package org.mystudy.testcase;
 
import java.io.Serializable;
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
 
public class TestCase3Sol1 implements Serializable {
 
    private TestCase3Sol1() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
    }
 
    public static void main(String... strings) {
        TestCase3Sol1 t = new TestCase3Sol1();
        t.proc3();
    }
    private int add(int num) {
        return num + 1;
    }
    private void proc3() {
        JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        JavaRDD<integer> rdd3 = rdd2.map(a -> add(a));                       // 해결
        System.out.println(rdd3.collect());
    }
}
 
</integer></integer>

Serializable 구현해서 해결하였다.



4.실패사례 - Function 등 인터페이스 문제

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package org.mystudy.testcase;
 
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
 
public class TestCase4 {
 
    private TestCase4() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
    }
 
    public static void main(String... strings) {
        TestCase4 t = new TestCase4();
        System.out.println("t:"+t);
        t.proc3();
    }
 
    private void proc3() {
        class AAA implements Function<Integer, Integer> {
            @Override
            public Integer call(Integer v1) throws Exception {
                return v1 + 1;
            }
        }
        JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
        JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        JavaRDD<Integer> rdd3 = rdd2.map(new AAA());                      // Exception
        System.out.println(rdd3.collect());
    }
}

무엇이 문제일까?

내부클래스를 사용했더니... 그 내부클래스를 품고 있는 바깥클래스 즉, TestCase4의 Serializable 여부를 묻고 있다. 

아무리 Function 인터페이스가 Serializable을 구현했다고 해도... 그 Function 을 구현한 AAA 라는 클래스가 바깥클래스의 정체성과 연관이 있나보다.

어쩌면 AAA 라는 내부클래스를 정의할때 org.mystudy.testcase.TestCase4$1AAA@60acd609 이렇게 사용하기에....  TestCase4$1, 결국 TestCase4 가 결정적인 역할을 하는 것 같다.

앞의 예제 this.num1 과 같이... 실제 전달하는 값은 num1 이지만, 결국 스파크에 전달되는 것은 num1을 포함하는 this가 전달되는 것과 마찬가지로..

스파크에 AAA만 전달되는 것 같지만 결국은 AAA를 포함하는 TestCase4 가 전달되는 것은 아닌가 싶다. 그래서 TestCase4  Serializable 여부를 묻고 있는 것이 아닌가???


t:org.mystudy.testcase.TestCase4@5e91993f

16/04/08 00:24:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)

at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.map(RDD.scala:323)

at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)

at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)

at org.mystudy.testcase.TestCase4.proc3(TestCase4.java:31)

at org.mystudy.testcase.TestCase4.main(TestCase4.java:19)

Caused by: java.io.NotSerializableException: org.mystudy.testcase.TestCase4

Serialization stack:

- object not serializable (class: org.mystudy.testcase.TestCase4, value: org.mystudy.testcase.TestCase4@5e91993f)

- field (class: org.mystudy.testcase.TestCase4$1AAA, name: this$0, type: class org.mystudy.testcase.TestCase4)

- object (class org.mystudy.testcase.TestCase4$1AAA, org.mystudy.testcase.TestCase4$1AAA@60acd609)

- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)

- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)

at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

... 13 more






4-1. 혹시나 이렇게 해보았지만...

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package org.mystudy.testcase;
 
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
 
public class TestCase4Sol1 {
 
    private TestCase4Sol1() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
    }
 
    public static void main(String... strings) {
        TestCase4Sol1 t = new TestCase4Sol1();
        System.out.println("t:"+t);
        t.proc3();
    }
    class AAA implements Function<Integer, Integer> {
        @Override
        public Integer call(Integer v1) throws Exception {
            return v1 + 1;
        }
    }
    private void proc3() {
        JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
        JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        JavaRDD<Integer> rdd3 = rdd2.map(new AAA());                      //Exception
        System.out.println(rdd3.collect());
    }
}

혹시나 class를 함수밖으로 빼보았지만.... 동일한 Exception이 발생한다.

t:org.mystudy.testcase.TestCase4Sol1@5e91993f

16/04/08 00:33:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)

at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.map(RDD.scala:323)

at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)

at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)

at org.mystudy.testcase.TestCase4Sol1.proc3(TestCase4Sol1.java:30)

at org.mystudy.testcase.TestCase4Sol1.main(TestCase4Sol1.java:19)

Caused by: java.io.NotSerializableException: org.mystudy.testcase.TestCase4Sol1

Serialization stack:

- object not serializable (class: org.mystudy.testcase.TestCase4Sol1, value: org.mystudy.testcase.TestCase4Sol1@5e91993f)

- field (class: org.mystudy.testcase.TestCase4Sol1$AAA, name: this$0, type: class org.mystudy.testcase.TestCase4Sol1)

- object (class org.mystudy.testcase.TestCase4Sol1$AAA, org.mystudy.testcase.TestCase4Sol1$AAA@598260a6)

- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)

- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)

at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

... 13 more





4-2 외부클래스를 이용해서 해결

1
2
3
4
5
6
7
8
9
10
package org.mystudy.testcase.vo;
 
import org.apache.spark.api.java.function.Function;
 
public class AAA implements Function<Integer, Integer> {
    @Override
    public Integer call(Integer v1) throws Exception {
        return v1 + 1;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package org.mystudy.testcase;
 
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.mystudy.testcase.vo.AAA;
 
public class TestCase4Sol2 {
 
    private TestCase4Sol2() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
    }
 
    public static void main(String... strings) {
        TestCase4Sol2 t = new TestCase4Sol2();
        t.proc3();
    }
 
    private void proc3() {
        JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        JavaRDD<integer> rdd3 = rdd2.map(new AAA());                        //해결
        System.out.println(rdd3.collect());
    }
}
 
</integer></integer>

외부 public 클래스를 이용했더니 해결되었다. AAA 클래스가 다른 클래스의 영향을 받지 않고, 순수하게 Function의 영향만 받아서, 문제가 생기지 않는가 보다.



5. 실패사례 - 익명 내부 클래스

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package org.mystudy.testcase;
 
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
 
public class TestCase5 {
 
    private TestCase5() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
    }
 
    public static void main(String... strings) {
        TestCase5 t = new TestCase5();
        System.out.println("t:"+t);
        t.proc3();
    }
 
    private void proc3() {
        JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        JavaRDD<integer> rdd3 = rdd2.map(new Function<integer, integer="">() {      // Exception
            @Override
            public Integer call(Integer v1) throws Exception {
                return v1 + 1;
            }
 
        });
        System.out.println(rdd3.collect());
    }
}
 
</integer,></integer></integer>

왜 Exception 이 발생하는가? 책대로 하였는데-_-'''

책에서 익명 내부 클래스를 사용하라고 했는데;;;

여전히 TestCase5 를 걸고 넘어지고 있다. 그저... Serializable해주면 된다. 그런데 왜 그래야 하는가? 익명인데-___-:;;

아래의 파란색 표시를 보면, 익명이더라도... 참조가 TestCase5  로 되어있다-_-;;;;

t:org.mystudy.testcase.TestCase5@5e91993f

16/04/08 00:40:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)

at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.map(RDD.scala:323)

at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)

at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)

at org.mystudy.testcase.TestCase5.proc3(TestCase5.java:25)

at org.mystudy.testcase.TestCase5.main(TestCase5.java:19)

Caused by: java.io.NotSerializableException: org.mystudy.testcase.TestCase5

Serialization stack:

- object not serializable (class: org.mystudy.testcase.TestCase5, value: org.mystudy.testcase.TestCase5@5e91993f)

- field (class: org.mystudy.testcase.TestCase5$1, name: this$0, type: class org.mystudy.testcase.TestCase5)

- object (class org.mystudy.testcase.TestCase5$1, org.mystudy.testcase.TestCase5$1@60acd609)

- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)

- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)

at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

... 13 more





5.1 변수로 받아볼까? 안돼~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package org.mystudy.testcase;
 
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
 
public class TestCase5Sol1 {
 
    private TestCase5Sol1() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
    }
 
    public static void main(String... strings) {
        TestCase5Sol1 t = new TestCase5Sol1();
        System.out.println("t:"+t);
        t.proc3();
    }
 
    private void proc3() {
        JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
        JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        Function<Integer, Integer> f = new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer v1) throws Exception {
                return v1 + 1;
            }
        };
        System.out.println("f:"+f);
        JavaRDD<Integer> rdd3 = rdd2.map(f);  //Exception
        System.out.println(rdd3.collect());
    }
}

변수로 받아보아도 안된다.

에러메시지를 보면 여전히 바깥클래스가 걸려있다.

t:org.mystudy.testcase.TestCase5Sol1@5e91993f

16/04/08 00:44:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

f:org.mystudy.testcase.TestCase5Sol1$1@363f0ba0

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)

at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.map(RDD.scala:323)

at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)

at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)

at org.mystudy.testcase.TestCase5Sol1.proc3(TestCase5Sol1.java:32)

at org.mystudy.testcase.TestCase5Sol1.main(TestCase5Sol1.java:19)

Caused by: java.io.NotSerializableException: org.mystudy.testcase.TestCase5Sol1

Serialization stack:

- object not serializable (class: org.mystudy.testcase.TestCase5Sol1, value: org.mystudy.testcase.TestCase5Sol1@5e91993f)

- field (class: org.mystudy.testcase.TestCase5Sol1$1, name: this$0, type: class org.mystudy.testcase.TestCase5Sol1)

- object (class org.mystudy.testcase.TestCase5Sol1$1, org.mystudy.testcase.TestCase5Sol1$1@363f0ba0)

- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)

- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)

at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

... 13 more






5-2. 역시나 Serializable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package org.mystudy.testcase;
 
import java.io.Serializable;
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
 
public class TestCase5Sol2 implements Serializable {
 
    private TestCase5Sol2() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
    }
 
    public static void main(String... strings) {
        TestCase5Sol2 t = new TestCase5Sol2();
        t.proc3();
    }
 
    public void proc3() {
        JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
        JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        Function<Integer, Integer> f = new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer v1) throws Exception {
                return v1 + 1;
            }
        };
        JavaRDD<Integer> rdd3 = rdd2.map(f);          // 해결
        System.out.println(rdd3.collect());
    }
}

그냥.. 쉽게 생각하면, Serializable 해주면 된다-_-;;

클래스에 Serializable 해줄때, 주의할 사항은.. 클래스의 멤버필드가 모두 Serializable 하는데 문제가 없어야 된다.


>>>>


Serializable이 싫다면,,,

JAVA8 의 람다식을 사용하자. 

또는 완전 독립적인 클래스(Serializable이 구현된)를 사용하자.



조금 더 해보자...




6.성공케이스 - 외부 클래스의 함수

1
2
3
4
5
6
7
8
9
10
package org.mystudy.testcase.vo;
 
public class BBB {
    public int add(int num) {
        return num + 1;
    }
    public static int bbb(int num) {
        return num + 1;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package org.mystudy.testcase;
 
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.mystudy.testcase.vo.BBB;
 
public class TestCase6 {
 
    private TestCase6() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
    }
 
    public static void main(String... strings) {
        TestCase6 t = new TestCase6();
        t.proc3();
    }
 
    private void proc3() {
        JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        JavaRDD<integer> rdd3 = rdd2.map(a -> new BBB().add(a));         //성공       
        System.out.println(rdd3.collect());
    }
}
 
</integer></integer>

왜 성공인지 모르겠다.-_- BBB 클래스는 Serializable 하지 않았는데...

일단, 위 2~5 사례는 rdd2.map(함수인스턴스 자체); 형태였는데..

지금의 사례는 rdd2.map(a -> 함수연산?); 이라...조금 다르다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package org.mystudy.testcase;
 
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.mystudy.testcase.vo.BBB;
 
public class TestCase7 {
 
    private TestCase7() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
    }
 
    public static void main(String... strings) {
        TestCase7 t = new TestCase7();
        t.proc3();
    }
 
    private void proc3() {
        JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        JavaRDD<integer> rdd3 = rdd2.map(a -> BBB.bbb(a));               //성공           
        System.out.println(rdd3.collect());
    }
}
 
</integer></integer>

static 함수도 잘 된다..왜????



7.실패... 밖으로 나와서 인스턴스를 만들었더니...-__-;;;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package org.mystudy.testcase;
 
import java.util.Arrays;
 
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.mystudy.testcase.vo.BBB;
 
public class TestCase8 {
 
    private TestCase8() {
        PropertyConfigurator.configure("D:\workspace\spark\learning.spark\src\resources\log4j.properties");
    }
 
    public static void main(String... strings) {
        TestCase8 t = new TestCase8();
        System.out.println("t:"+t);
        t.proc3();
    }
 
    private void proc3() {
        JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
        JavaRDD<integer> rdd2 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
        BBB b = new BBB();
        System.out.println("b:"+b);
        JavaRDD<integer> rdd3 = rdd2.map(a -> b.add(a));                 //Exception        
        System.out.println(rdd3.collect());
    }
}
 
 
</integer></integer>

위의 잘되던 케이스에서...

람다 밖에서 BBB 인스턴스를 만들어서 넣어줬더니...

이제와서  BBB의 Serializable을 요구한다-_-;;


BBB 클래스에

public class BBB implements Serializable{ 와 같이 구현하면 에러가 사라진다...

이게 뭔가-_-;;;


t:org.mystudy.testcase.TestCase8@75a1cd57

16/04/08 01:09:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

b:org.mystudy.testcase.vo.BBB@681adc8f

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)

at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)

at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.map(RDD.scala:323)

at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:96)

at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)

at org.mystudy.testcase.TestCase8.proc3(TestCase8.java:27)

at org.mystudy.testcase.TestCase8.main(TestCase8.java:19)

Caused by: java.io.NotSerializableException: org.mystudy.testcase.vo.BBB

Serialization stack:

- object not serializable (class: org.mystudy.testcase.vo.BBB, value: org.mystudy.testcase.vo.BBB@681adc8f)

- element of array (index: 0)

- array (class [Ljava.lang.Object;, size 1)

- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)

- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.mystudy.testcase.TestCase8, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/mystudy/testcase/TestCase8.lambda$0:(Lorg/mystudy/testcase/vo/BBB;Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1])

- writeReplace data (class: java.lang.invoke.SerializedLambda)

- object (class org.mystudy.testcase.TestCase8$$Lambda$4/1018067851, org.mystudy.testcase.TestCase8$$Lambda$4/1018067851@5e8c34a0)

- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)

- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)

at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

... 13 more



출처: http://java8.tistory.com/39 [버그 리포트]

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로