Scala :
package com.xp.cnimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * Created by xupan on 2017/12/15. */object WordCount { def main(args: Array[String]) { //SparkContext:spark执行入口 val sc: SparkContext = new SparkContext( new SparkConf() .setAppName("WordCount") .setMaster("local") ) //textFile:指定读取数据的路径 //textFile(path: String, minPartitions: Int = defaultMinPartitions): //minPartitions:如果不指定,默认和hdfs的block息息相关 val textRdd: RDD[String] = sc.textFile("data/wordcount/wordcount.txt") //统计单词个数 val wordCount: RDD[(String, Int)] = textRdd.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).sortBy(_._2, false) //打印结果 wordCount.collect().take(10).foreach(print) //保存到hdfs wordCount.saveAsTextFile("hdfs://xupan001:8020/user/root/spark/output/wordcount") //释放资源 sc.stop() }}
java:
package com.xp.cn;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.*;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;/** * Created by xupan on 2017/12/15. */public class WordCountJava { public static void main(String[] args) { JavaSparkContext sc = new JavaSparkContext( new SparkConf().setAppName("WordCountJava") .setMaster("local") ); JavaRDDlineJavaRDD = sc.textFile("data/wordcount/wordcount.txt"); //切分 JavaRDD wordJavaRDD = lineJavaRDD.flatMap(new FlatMapFunction () { public Iterator call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); //计数 JavaPairRDD wordPairRDD = wordJavaRDD.mapToPair(new PairFunction () { public Tuple2 call(String word) throws Exception { return new Tuple2 (word, 1); } }); //聚合 JavaPairRDD reducePairRDD = wordPairRDD.reduceByKey(new Function2 () { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //反转 JavaPairRDD resverRDD = reducePairRDD.mapToPair(new PairFunction , Integer, String>() { public Tuple2 call(Tuple2 t) throws Exception { return new Tuple2 (t._2(), t._1()); } }); //排序 JavaPairRDD sortRDD = resverRDD.sortByKey(false); //再次反转 JavaPairRDD reResverRDD = sortRDD.mapToPair(new PairFunction , String, Integer>() { public Tuple2 call(Tuple2 t) throws Exception { //return new Tuple2 (t._2(), t._1()); return t.swap(); // } }); //打印结果 reResverRDD.foreach(new VoidFunction >() { public void call(Tuple2 pairs) throws Exception { System.out.println(pairs._1() + " : " + pairs._2()); } }); //关闭资源 sc.close(); }}
java1.8
package com.xp.cn;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import scala.Tuple2;import java.util.Arrays;/** * Created by xupan on 2017/12/15. */public class WordJavaLamb { public static void main(String[] args) { JavaSparkContext sc = new JavaSparkContext( new SparkConf().setAppName("WordCountJava") .setMaster("local") ); JavaRDDjavaRDD = sc.textFile("data/wordcount/wordcount.txt"); JavaRDD lineRDD = javaRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator()); JavaPairRDD result = lineRDD .mapToPair(word -> new Tuple2 (word, 1)) .reduceByKey((m, n) -> m + n)//聚合 .mapToPair(tp -> tp.swap())//交换位置 .sortByKey(false)//排序 .mapToPair(tp -> tp.swap());//交换位置 //打印结果 result.foreach(wordcount -> System.out.println(wordcount._1() + ":" + wordcount._2())); //关闭资源 sc.close(); }}