ITPub博客

首页 > 大数据 > Spark > spark系列一:transaction操作实战

spark系列一:transaction操作实战

原创 Spark 作者:jauar513 时间:2019-04-10 19:06:06 0 删除 编辑
1、算子实战:
package cn.spark.study.core;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class mapdemo {
 public static void main(String[] args) {
 //map();
 //filter();
 //flatmap();
 //groupbykey();
 //reducebykey();
 // sortbykey();
 // join();
  cogroup();
 }
    public static void map(){
     SparkConf conf = new SparkConf()
    .setAppName("collectionparallelize")
    .setMaster("local");
  JavaSparkContext sc = new JavaSparkContext(conf);
  
  List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
  
  JavaRDD<Integer> numbersrdd = sc.parallelize(numbers);
  JavaRDD<Integer> trnumbersrdd = numbersrdd.map(new Function<Integer, Integer>(){
   private static final long serialVersionUID = 1L;
   @Override
   public Integer call(Integer v1) throws Exception {
    // TODO Auto-generated method stub
    return v1 * 2;
   }
   
  });
  trnumbersrdd.foreach(new VoidFunction<Integer>(){
  
   private static final long serialVersionUID = 1L;
   @Override
   public void call(Integer t) throws Exception {
    // TODO Auto-generated method stub
    System.out.println(t);
   }
   
  });
  sc.close();
    }
  
 public static void filter(){
      SparkConf conf = new SparkConf()
     .setAppName("collectionparallelize")
     .setMaster("local");
   JavaSparkContext sc = new JavaSparkContext(conf);
   
   List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
   
   JavaRDD<Integer> numbersrdd = sc.parallelize(numbers);
   JavaRDD<Integer> trannumbersrdd = numbersrdd.filter(new Function<Integer,Boolean>(){
  
    private static final long serialVersionUID = 1L;
    @Override
    public Boolean call(Integer v1) throws Exception {
     // TODO Auto-generated method stub
     return v1 % 2 == 0;
    }
    
   });
   trannumbersrdd.foreach(new VoidFunction<Integer>(){
   
    private static final long serialVersionUID = 1L;
    @Override
    public void call(Integer t) throws Exception {
     // TODO Auto-generated method stub
     System.out.println(t);
    }
    
   });
 
    }
 public static void flatmap(){
     SparkConf conf = new SparkConf()
    .setAppName("collectionparallelize")
    .setMaster("local");
  JavaSparkContext sc = new JavaSparkContext(conf);
  JavaRDD<String> lines = sc.textFile("C://Users//hlz//Desktop/test.txt");
  JavaRDD<String> wordsrdd = lines.flatMap(new FlatMapFunction<String,String>(){
   private static final long serialVersionUID = 1L;
   @Override
   public Iterable<String> call(String line) throws Exception {
    // TODO Auto-generated method stub
    return Arrays.asList(line.split(" "));
   }
   
  });
  wordsrdd.foreach(new VoidFunction<String>(){
   
   private static final long serialVersionUID = 1L;
   @Override
   public void call(String t) throws Exception {
    // TODO Auto-generated method stub
    System.out.println(t);
   }
   
  });
  sc.close();
}
 public static void groupbykey(){
     SparkConf conf = new SparkConf()
    .setAppName("collectionparallelize")
    .setMaster("local");
  JavaSparkContext sc = new JavaSparkContext(conf);
  
  @SuppressWarnings({ "unchecked", "rawtypes" })
  List<Tuple2<String,Integer>> classscore = Arrays.asList(new Tuple2<String,Integer>("class1",80),
    new Tuple2<String,Integer>("class2",75),
    new Tuple2<String,Integer>("class1",50),
    new Tuple2<String,Integer>("class2",75));
  JavaPairRDD<String, Integer> lines = sc.parallelizePairs(classscore);
  
  JavaPairRDD<String,Iterable<Integer>> wordsrdd = lines.groupByKey();
  wordsrdd.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>(){
   private static final long serialVersionUID = 1L;
   @Override
   public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
    // TODO Auto-generated method stub
    System.out.println("class:" + t._1);
    Iterator<Integer> it = t._2.iterator();
    while(it.hasNext())
    System.out.println(it.next());
    
    
    
   }
   });
  sc.close();
  }
 public static void reducebykey(){
     SparkConf conf = new SparkConf()
    .setAppName("collectionparallelize")
    .setMaster("local");
  JavaSparkContext sc = new JavaSparkContext(conf);
  
  @SuppressWarnings({ "unchecked", "rawtypes" })
  List<Tuple2<String,Integer>> classscore = Arrays.asList(new Tuple2<String,Integer>("class1",80),
    new Tuple2<String,Integer>("class2",75),
    new Tuple2<String,Integer>("class1",50),
    new Tuple2<String,Integer>("class2",75));
  JavaPairRDD<String, Integer> lines = sc.parallelizePairs(classscore);
  JavaPairRDD<String, Integer> linescount = lines.reduceByKey(new Function2<Integer,Integer,Integer>(){
   
   private static final long serialVersionUID = 1L;
   @Override
   public Integer call(Integer v1, Integer v2) throws Exception {
    // TODO Auto-generated method stub
    return v1 + v2;
   }
   });
  linescount.foreach(new VoidFunction<Tuple2<String,Integer>>(){
   
   private static final long serialVersionUID = 1L;
   @Override
   public void call(Tuple2<String, Integer> t) throws Exception {
    // TODO Auto-generated method stub
    System.out.println(t._1 + "total" + t._2 + "score");
   }
   
  });
  sc.close();
  }
 public static void sortbykey(){
     SparkConf conf = new SparkConf()
    .setAppName("collectionparallelize")
    .setMaster("local");
  JavaSparkContext sc = new JavaSparkContext(conf);
  
  @SuppressWarnings({ "unchecked", "rawtypes" })
  List<Tuple2<Integer,String>> classscore = Arrays.asList(new Tuple2<Integer,String>(80,"li"),
    new Tuple2<Integer,String>(75,"xiao"),
    new Tuple2<Integer,String>(50,"zhang"),
    new Tuple2<Integer,String>(75,"haha"));
  JavaPairRDD<Integer, String> lines = sc.parallelizePairs(classscore);
  JavaPairRDD<Integer, String> linestotal =  lines.sortByKey(false);
  linestotal.foreach(new VoidFunction<Tuple2<Integer,String>>(){
   
   private static final long serialVersionUID = 1L;
   @Override
   public void call(Tuple2<Integer, String> t) throws Exception {
    // TODO Auto-generated method stub
    System.out.println(t._1 + ":" + t._2);
   }
   });
  sc.close();
  }
 public static void join(){
     SparkConf conf = new SparkConf()
    .setAppName("collectionparallelize")
    .setMaster("local");
  JavaSparkContext sc = new JavaSparkContext(conf);
  
  @SuppressWarnings({ "unchecked", "rawtypes" })
  List<Tuple2<Integer,String>> student = Arrays.asList(new Tuple2<Integer,String>(80,"li"),
    new Tuple2<Integer,String>(65,"xiao"),
    new Tuple2<Integer,String>(50,"zhang"),
    new Tuple2<Integer,String>(75,"haha"));
  @SuppressWarnings("unchecked")
  List<Tuple2<Integer,Integer>> score = Arrays.asList(new Tuple2<Integer,Integer>(80,75),
    new Tuple2<Integer,Integer>(75,55),
    new Tuple2<Integer,Integer>(50,85),
    new Tuple2<Integer,Integer>(75,90));
  @SuppressWarnings("unused")
  JavaPairRDD<Integer, String> studentrdd = sc.parallelizePairs(student); 
  @SuppressWarnings("unused")
  JavaPairRDD<Integer, Integer> scorerdd = sc.parallelizePairs(score);
  JavaPairRDD<Integer, Tuple2<String,Integer>> joinrdd =  studentrdd.join(scorerdd);
  joinrdd.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>(){
   
   private static final long serialVersionUID = 1L;
   @Override
   public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception {
    // TODO Auto-generated method stub
    System.out.println("studentid:" + t._1);
    System.out.println("studentname:" + t._2._1);
    System.out.println("studentscore:" + t._2._2);
    System.out.println("==============");
   }
   
  });
  sc.close();
}
 public static void cogroup(){
     SparkConf conf = new SparkConf()
    .setAppName("collectionparallelize")
    .setMaster("local");
  JavaSparkContext sc = new JavaSparkContext(conf);
  
  @SuppressWarnings({ "unchecked", "rawtypes" })
  List<Tuple2<Integer,String>> student = Arrays.asList(new Tuple2<Integer,String>(80,"li"),
    new Tuple2<Integer,String>(65,"xiao"),
    new Tuple2<Integer,String>(50,"zhang"),
    new Tuple2<Integer,String>(75,"haha"));
  @SuppressWarnings("unchecked")
  List<Tuple2<Integer,Integer>> score = Arrays.asList(new Tuple2<Integer,Integer>(80,75),
    new Tuple2<Integer,Integer>(75,55),
    new Tuple2<Integer,Integer>(50,85),
    new Tuple2<Integer,Integer>(75,90));
  @SuppressWarnings("unused")
  JavaPairRDD<Integer, String> studentrdd = sc.parallelizePairs(student); 
  @SuppressWarnings("unused")
  JavaPairRDD<Integer, Integer> scorerdd = sc.parallelizePairs(score);
  JavaPairRDD<Integer, Tuple2<Iterable<String>,Iterable<Integer>>> cogrouprdd =  studentrdd.cogroup(scorerdd);
  cogrouprdd.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>(){
  
   private static final long serialVersionUID = 1L;
   @Override
   public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception {
    // TODO Auto-generated method stub
    
    System.out.println("studentid:" + t._1);
    System.out.println("studentname:" + t._2._1);
    System.out.println("studentscore:" + t._2._2);
    System.out.println("==============");
   }
   
  });
  sc.close();
}
}
2、scala版本:
package com.spark.study.core
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object trandemo {
  def main(args:Array[String]){
    //map()
    //filter()
    // flatmap()
    //groupbykey()
    //reducebykey()
    //sortbykey()
    //join()
    cogroup()
  def map(){
    val conf = new SparkConf()
                .setAppName("collectionparallelize")
                .setMaster("local");
     val sc = new SparkContext(conf);
     val numbers = Array(1,2,3,4,5,6,7,8,9,10)
     val numbersrdd = sc.parallelize(numbers, 5)
     val trnumbers= numbersrdd.map(number => number*2)
     trnumbers.foreach(number => println(number))
     
  } 
  }
 def filter(){
    val conf = new SparkConf()
                .setAppName("collectionparallelize")
                .setMaster("local");
     val sc = new SparkContext(conf);
     val numbers = Array(1,2,3,4,5,6,7,8,9,10)
     val numbersrdd = sc.parallelize(numbers, 5)
     val trnumbers= numbersrdd.filter(number => number % 2 == 0)
     trnumbers.foreach(number => println(number))
    
  }
 def flatmap(){
    val conf = new SparkConf()
                .setAppName("collectionparallelize")
                .setMaster("local");
     val sc = new SparkContext(conf);
     val lines = sc.textFile("hdfs://master:9000/test.txt",1)
     val linesp = lines.flatMap(line=>line.split(" "))
     linesp.foreach(line=>println(line))
}
 def groupbykey(){
    val conf = new SparkConf()
                .setAppName("collectionparallelize")
                .setMaster("local");
     val sc = new SparkContext(conf);
     val arr = Array(Tuple2("class1",59),Tuple2("class2",58),Tuple2("class1",89),Tuple2("class2",53))
     val arrrdd =sc.parallelize(arr, 3)
     val sortrdd = arrrdd.groupByKey()
     sortrdd.foreach(f => {println(f._1);
                     f._2.foreach(d =>println(d))})
    
}
 def reducebykey(){
    val conf = new SparkConf()
                .setAppName("collectionparallelize")
                .setMaster("local");
     val sc = new SparkContext(conf);
     val arr = Array(Tuple2("class1",59),Tuple2("class2",58),Tuple2("class1",89),Tuple2("class2",53))
     val arrrdd =sc.parallelize(arr, 3)
     val arrtotal = arrrdd.reduceByKey(_+ _)
     arrtotal.foreach(f=>println(f._1 + "total" + f._2))
}
 def sortbykey(){
    val conf = new SparkConf()
                .setAppName("collectionparallelize")
                .setMaster("local");
     val sc = new SparkContext(conf);
     val arr = Array(Tuple2(59,"li"),Tuple2(58,"hong"),Tuple2(89,"zhang"),Tuple2(53,"he"))
     val arrrdd =sc.parallelize(arr, 3)
     val arrtotal = arrrdd.sortByKey(false)
     arrtotal.foreach(f=>println(f._1 + ":" + f._2))
}
 def join(){
    val conf = new SparkConf()
                .setAppName("collectionparallelize")
                .setMaster("local");
     val sc = new SparkContext(conf);
     val student = Array(Tuple2(80,"li"),
    Tuple2(65,"xiao"),
    Tuple2(50,"zhang"),
    Tuple2(75,"haha"))
 
  val score = Array(Tuple2(80,75),
    Tuple2(75,55),
    Tuple2(50,85),
    Tuple2(75,90))
  val studentrdd = sc.parallelize(student)
 
  val scorerdd = sc.parallelize(score)
  val joinrdd =  studentrdd.join(scorerdd)
   joinrdd.foreach( f =>{println("studentid:"+ f._1);
                   println("studentname:"+f._2._1);
                   println("studentname:"+f._2._2)
                   println("======================")
                   })
   }
 def cogroup(){
    val conf = new SparkConf()
                .setAppName("collectionparallelize")
                .setMaster("local");
     val sc = new SparkContext(conf);
     val student = Array(Tuple2(80,"li"),
    Tuple2(65,"xiao"),
    Tuple2(50,"zhang"),
    Tuple2(75,"haha"))
 
  val score = Array(Tuple2(80,75),
    Tuple2(75,55),
    Tuple2(50,85),
    Tuple2(75,90))
  val studentrdd = sc.parallelize(student)
 
  val scorerdd = sc.parallelize(score)
  val joinrdd =  studentrdd.cogroup(scorerdd)
   joinrdd.foreach( f =>{println("studentid:"+ f._1);
                   println("studentname:"+f._2._1);
                   println("studentname:"+f._2._2)
                   println("======================")
                   })
   }
}


来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/30541278/viewspace-2153516/,如需转载,请注明出处,否则将追究法律责任。

全部评论

注册时间:2015-10-25

  • 博文量
    24
  • 访问量
    19095