From e0878377f140ef0f1ff50bbe511792ff48fd3b6e Mon Sep 17 00:00:00 2001 From: luoxiang <2806718453@qq.com> Date: Tue, 14 May 2019 23:15:10 +0800 Subject: [PATCH 1/2] modify --- code/spark/spark-core/README.md | 27 ++++++++++ code/spark/spark-core/pom.xml | 49 +++++++++++++++++++ .../java/rdd/java/TransformationTest.java | 36 ++++++++++++++ .../src/main/java/rdd/scala/Test.scala | 14 ++++++ .../java/rdd/scala/TransformationTest.scala | 23 +++++++++ notes/Spark-Transformation和Action.md | 0 6 files changed, 149 insertions(+) create mode 100644 code/spark/spark-core/README.md create mode 100644 code/spark/spark-core/pom.xml create mode 100644 code/spark/spark-core/src/main/java/rdd/java/TransformationTest.java create mode 100644 code/spark/spark-core/src/main/java/rdd/scala/Test.scala create mode 100644 code/spark/spark-core/src/main/java/rdd/scala/TransformationTest.scala create mode 100644 notes/Spark-Transformation和Action.md diff --git a/code/spark/spark-core/README.md b/code/spark/spark-core/README.md new file mode 100644 index 0000000..e1cd8da --- /dev/null +++ b/code/spark/spark-core/README.md @@ -0,0 +1,27 @@ +val list = List(3,6,9,10,12,21) +val listRDD = sc.parallelize(list) +val intsRDD = listRDD.map(_*10) +intsRDD.foreach(println) + +sc.parallelize(list).map(_*10).foreach(println) + + +sc.parallelize(list).filter(_>=10).foreach(println) + +val list = List(List(1, 2), List(3), List(), List(4, 5)) +sc.parallelize(list).flatMap(_.toList).map(_*10).foreach(println) + + +val list = List(1,2,3,4,5) +sc.parallelize(list).reduce((x,y) => x+y) +sc.parallelize(list).reduce(_+_) + + +val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6),("hadoop", 2)) +sc.parallelize(list).reduceByKey(_+_).foreach(println) + + + + val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6),("hadoop", 2)) +sc.parallelize(list).groupByKey().map(x=>(x._1,x._2.toList)).foreach(println) + diff --git a/code/spark/spark-core/pom.xml b/code/spark/spark-core/pom.xml new file mode 100644 index 0000000..d805edd --- /dev/null +++ b/code/spark/spark-core/pom.xml @@ -0,0 +1,49 @@ + + + 4.0.0 + + com.heibaiying + spark-core + 1.0 + + + 2.12.8 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + + + + org.apache.spark + spark-core_2.12 + 2.4.0 + + + org.scalatest + scalatest_2.12 + 3.0.1 + test + + + junit + junit + 4.12 + + + + + \ No newline at end of file diff --git a/code/spark/spark-core/src/main/java/rdd/java/TransformationTest.java b/code/spark/spark-core/src/main/java/rdd/java/TransformationTest.java new file mode 100644 index 0000000..80a2671 --- /dev/null +++ b/code/spark/spark-core/src/main/java/rdd/java/TransformationTest.java @@ -0,0 +1,36 @@ +package rdd.java; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + + +public class TransformationTest { + + private static JavaSparkContext sc = null; + + + @Before + public void prepare() { + SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("TransformationTest"); + sc = new JavaSparkContext(conf); + } + + @Test + public void map() { + List list = Arrays.asList(3, 6, 9, 10, 12, 21); + sc.parallelize(list).map(x -> x * 10).foreach(System.out::println); + } + + + @After + public void destroy() { + sc.close(); + } + +} diff --git a/code/spark/spark-core/src/main/java/rdd/scala/Test.scala b/code/spark/spark-core/src/main/java/rdd/scala/Test.scala new file mode 100644 index 0000000..a8e3af9 --- /dev/null +++ b/code/spark/spark-core/src/main/java/rdd/scala/Test.scala @@ -0,0 +1,14 @@ +package rdd.scala + +import org.apache.spark.{SparkConf, SparkContext} + +object Test extends App { + + + val conf = new SparkConf().setAppName("TransformationTest123").setMaster("local[2]") + val sc = new SparkContext(conf) + + val list = List(3, 6, 9, 10, 12, 21) + sc.parallelize(list).map(_ * 10).foreach(println) + +} diff --git a/code/spark/spark-core/src/main/java/rdd/scala/TransformationTest.scala b/code/spark/spark-core/src/main/java/rdd/scala/TransformationTest.scala new file mode 100644 index 0000000..5ed96aa --- /dev/null +++ b/code/spark/spark-core/src/main/java/rdd/scala/TransformationTest.scala @@ -0,0 +1,23 @@ +package rdd.scala + +import org.apache.spark.{SparkConf, SparkContext} +import org.junit.{Before, Test} + +class TransformationTest extends { + + var sc: SparkContext = _ + + @Before + def prepare(): Unit = { + val conf = new SparkConf().setAppName("TransformationTest").setMaster("local[2]") + sc = new SparkContext(conf) + } + + @Test + def map(): Unit = { + val list = List(3, 6, 9, 10, 12, 21) + sc.parallelize(list).map(_ * 10).foreach(println) + } + + +} \ No newline at end of file diff --git a/notes/Spark-Transformation和Action.md b/notes/Spark-Transformation和Action.md new file mode 100644 index 0000000..e69de29 From 458f57759a201017bd963957dea7a0c25790e050 Mon Sep 17 00:00:00 2001 From: luoxiang <2806718453@qq.com> Date: Wed, 15 May 2019 07:37:02 +0800 Subject: [PATCH 2/2] modify --- code/spark/spark-core/pom.xml | 6 ++++++ .../main/java/rdd/java/TransformationTest.java | 12 ++++++++++-- .../src/main/java/rdd/scala/Test.scala | 14 -------------- .../java/rdd/scala/TransformationTest.scala | 17 +++++++++-------- 4 files changed, 25 insertions(+), 24 deletions(-) delete mode 100644 code/spark/spark-core/src/main/java/rdd/scala/Test.scala diff --git a/code/spark/spark-core/pom.xml b/code/spark/spark-core/pom.xml index d805edd..139270c 100644 --- a/code/spark/spark-core/pom.xml +++ b/code/spark/spark-core/pom.xml @@ -43,6 +43,12 @@ junit 4.12 + + + com.thoughtworks.paranamer + paranamer + 2.8 + diff --git a/code/spark/spark-core/src/main/java/rdd/java/TransformationTest.java b/code/spark/spark-core/src/main/java/rdd/java/TransformationTest.java index 80a2671..19a21bf 100644 --- a/code/spark/spark-core/src/main/java/rdd/java/TransformationTest.java +++ b/code/spark/spark-core/src/main/java/rdd/java/TransformationTest.java @@ -12,19 +12,27 @@ import java.util.List; public class TransformationTest { + private static JavaSparkContext sc = null; @Before public void prepare() { - SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("TransformationTest"); + SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local[2]"); sc = new JavaSparkContext(conf); } @Test public void map() { List list = Arrays.asList(3, 6, 9, 10, 12, 21); - sc.parallelize(list).map(x -> x * 10).foreach(System.out::println); + /* + * 不要使用方法引用的形式 : System.out::println , 否则会抛出下面的异常: + * org.apache.spark.SparkException: Task not serializable + * Caused by: java.io.NotSerializableException: java.io.PrintStream + * 这是由于Spark程序中map、foreach等算子内部引用了类成员函数或变量时,需要该类所有成员都支持序列化, + * 如果该类某些成员变量不支持序列化,就会抛出上面的异常 + */ + sc.parallelize(list).map(x -> x * 10).foreach(x -> System.out.println(x)); } diff --git a/code/spark/spark-core/src/main/java/rdd/scala/Test.scala b/code/spark/spark-core/src/main/java/rdd/scala/Test.scala deleted file mode 100644 index a8e3af9..0000000 --- a/code/spark/spark-core/src/main/java/rdd/scala/Test.scala +++ /dev/null @@ -1,14 +0,0 @@ -package rdd.scala - -import org.apache.spark.{SparkConf, SparkContext} - -object Test extends App { - - - val conf = new SparkConf().setAppName("TransformationTest123").setMaster("local[2]") - val sc = new SparkContext(conf) - - val list = List(3, 6, 9, 10, 12, 21) - sc.parallelize(list).map(_ * 10).foreach(println) - -} diff --git a/code/spark/spark-core/src/main/java/rdd/scala/TransformationTest.scala b/code/spark/spark-core/src/main/java/rdd/scala/TransformationTest.scala index 5ed96aa..8fcd2a1 100644 --- a/code/spark/spark-core/src/main/java/rdd/scala/TransformationTest.scala +++ b/code/spark/spark-core/src/main/java/rdd/scala/TransformationTest.scala @@ -1,17 +1,13 @@ package rdd.scala import org.apache.spark.{SparkConf, SparkContext} -import org.junit.{Before, Test} +import org.junit.{After, Test} -class TransformationTest extends { +class TransformationTest { - var sc: SparkContext = _ + val conf: SparkConf = new SparkConf().setAppName("TransformationTest").setMaster("local[2]") + val sc = new SparkContext(conf) - @Before - def prepare(): Unit = { - val conf = new SparkConf().setAppName("TransformationTest").setMaster("local[2]") - sc = new SparkContext(conf) - } @Test def map(): Unit = { @@ -19,5 +15,10 @@ class TransformationTest extends { sc.parallelize(list).map(_ * 10).foreach(println) } + @After + def destroy(): Unit = { + sc.stop() + } + } \ No newline at end of file