ITPub博客

首页 > 大数据 > 数据分析 > sparksql应用样例

sparksql应用样例

原创 数据分析 作者:jack22220613 时间:2015-03-05 11:37:49 0 删除 编辑
参考其它技术帖子内容试验后贴在这里,作为备注

1、sqlContext
--数据以people.txt为例
[root@cdh0 ~]# vi people.txt
mechel,29
andy,30
jusdin,19
mechel,30

1)通过定义case class,使用反射推断Schema(case class方式)
scala> val sqlContext= new org.apache.spark.sql.SQLContext(sc)  
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@45508523

scala>

scala> import sqlContext.createSchemaRDD
import sqlContext.createSchemaRDD

scala>

scala> case class Person(name:String,age:Int)
defined class Person

scala> val rddpeople=sc.textFile("hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt))  
15/03/05 10:17:23 INFO MemoryStore: ensureFreeSpace(258986) called with curMem=0, maxMem=278302556
15/03/05 10:17:23 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 252.9 KB, free 265.2 MB)
15/03/05 10:17:23 INFO MemoryStore: ensureFreeSpace(21187) called with curMem=258986, maxMem=278302556
15/03/05 10:17:23 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.7 KB, free 265.1 MB)
15/03/05 10:17:23 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:48676 (size: 20.7 KB, free: 265.4 MB)
15/03/05 10:17:23 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/03/05 10:17:23 INFO SparkContext: Created broadcast 0 from textFile at :17
rddpeople: org.apache.spark.rdd.RDD[Person] = MappedRDD[3] at map at :17

scala>

scala> rddpeople.registerTempTable("rddTable")  

scala>

scala>

scala> sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
15/03/05 10:20:27 ERROR GPLNativeCodeLoader: Could not load native gpl library
java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path
        at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1886)
        at java.lang.Runtime.loadLibrary0(Runtime.java:849)
        at java.lang.System.loadLibrary(System.java:1088)
        at com.hadoop.compression.lzo.GPLNativeCodeLoader.(GPLNativeCodeLoader.java:32)
        at com.hadoop.compression.lzo.LzoCodec.(LzoCodec.java:71)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:270)
        at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1986)
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1951)
        at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)
        at org.apache.hadoop.io.compress.CompressionCodecFactory.(CompressionCodecFactory.java:175)
        at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
        at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
        at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:184)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:197)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1328)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:780)
        at $line13.$read$$iwC$$iwC$$iwC$$iwC.(:16)
        at $line13.$read$$iwC$$iwC$$iwC.(:21)
        at $line13.$read$$iwC$$iwC.(:23)
        at $line13.$read$$iwC.(:25)
        at $line13.$read.(:27)
        at $line13.$read$.(:31)
        at $line13.$read$.()
        at $line13.$eval$.(:7)
        at $line13.$eval$.()
        at $line13.$eval.$print()
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/03/05 10:20:27 ERROR LzoCodec: Cannot load native-lzo without native-hadoop
15/03/05 10:20:27 INFO FileInputFormat: Total input paths to process : 1
15/03/05 10:20:27 INFO SparkContext: Starting job: collect at :16
15/03/05 10:20:27 INFO DAGScheduler: Got job 0 (collect at :16) with 2 output partitions (allowLocal=false)
15/03/05 10:20:27 INFO DAGScheduler: Final stage: Stage 0(collect at :16)
15/03/05 10:20:27 INFO DAGScheduler: Parents of final stage: List()
15/03/05 10:20:27 INFO DAGScheduler: Missing parents: List()
15/03/05 10:20:27 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[7] at map at :16), which has no missing parents
15/03/05 10:20:27 INFO MemoryStore: ensureFreeSpace(6448) called with curMem=280173, maxMem=278302556
15/03/05 10:20:27 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.3 KB, free 265.1 MB)
15/03/05 10:20:27 INFO MemoryStore: ensureFreeSpace(3414) called with curMem=286621, maxMem=278302556
15/03/05 10:20:27 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.3 KB, free 265.1 MB)
15/03/05 10:20:27 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:48676 (size: 3.3 KB, free: 265.4 MB)
15/03/05 10:20:27 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/03/05 10:20:27 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838
15/03/05 10:20:27 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[7] at map at :16)
15/03/05 10:20:27 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/03/05 10:20:27 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 1331 bytes)
15/03/05 10:20:27 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, ANY, 1331 bytes)
15/03/05 10:20:27 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/03/05 10:20:27 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/03/05 10:20:27 INFO HadoopRDD: Input split: hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.txt:0+19
15/03/05 10:20:27 INFO HadoopRDD: Input split: hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.txt:19+19
15/03/05 10:20:27 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/03/05 10:20:27 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/03/05 10:20:27 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/03/05 10:20:27 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/03/05 10:20:27 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/03/05 10:20:28 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1719 bytes result sent to driver
15/03/05 10:20:28 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1734 bytes result sent to driver
15/03/05 10:20:28 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 244 ms on localhost (1/2)
15/03/05 10:20:28 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 233 ms on localhost (2/2)
15/03/05 10:20:28 INFO DAGScheduler: Stage 0 (collect at :16) finished in 0.265 s
15/03/05 10:20:28 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/03/05 10:20:28 INFO DAGScheduler: Job 0 finished: collect at :16, took 0.375604 s
Name: jusdin

scala> var query=sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19")
query: org.apache.spark.sql.SchemaRDD =
SchemaRDD[10] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
Project [name#0]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  PhysicalRDD [name#0,age#1], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36

scala>

scala> query.queryExecution.sparkPlan
res2: org.apache.spark.sql.execution.SparkPlan =
Project [name#0]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  PhysicalRDD [name#0,age#1], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36


scala>

scala>

scala> query.queryExecution.optimizedPlan  
res3: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [name#0]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36


scala> query.printSchema
root
 |-- name: string (nullable = true)


scala> query.queryExecution
res5: org.apache.spark.sql.SQLContext#QueryExecution =
== Parsed Logical Plan ==
'Project ['name]
 'Filter (('age >= 13) && ('age <= 19))
  'UnresolvedRelation None, rddTable, None

== Analyzed Logical Plan ==
Project [name#0]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36

== Optimized Logical Plan ==
Project [name#0]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  LogicalRDD [name#0,age#1], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36

== Physical Plan ==
Project [name#0]
 Filter ((age#1 >= 13) && (age#1 <= 19))
  PhysicalRDD [name#0,age#1], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36

Code Generation: false
== RDD ==

scala>

2)通过可编程接口,定义Schema,并应用到RDD上(applySchema 方式)


scala> import org.apache.spark.sql._  
import org.apache.spark.sql._

scala> val schemaString = "name age"  
schemaString: String = name age

scala>

scala> val schema =  
     |   StructType(  
     |     schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))  
schema: org.apache.spark.sql.catalyst.types.StructType = StructType(ArraySeq(StructField(name,StringType,true), StructField(age,StringType,true)))

scala>

scala> val rowRDD = sc.textFile("hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.txt").map(_.split(",")).map(p => Row(p(0), p(1).trim))  
15/03/05 11:01:35 INFO MemoryStore: ensureFreeSpace(259058) called with curMem=290035, maxMem=278302556
15/03/05 11:01:35 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 253.0 KB, free 264.9 MB)
15/03/05 11:01:35 INFO MemoryStore: ensureFreeSpace(21187) called with curMem=549093, maxMem=278302556
15/03/05 11:01:35 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 20.7 KB, free 264.9 MB)
15/03/05 11:01:35 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:48676 (size: 20.7 KB, free: 265.4 MB)
15/03/05 11:01:35 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
15/03/05 11:01:35 INFO SparkContext: Created broadcast 2 from textFile at :18
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.expressions.Row] = MappedRDD[14] at map at :18

scala>

scala> val rddpeople2 = sqlContext.applySchema(rowRDD, schema)  
rddpeople2: org.apache.spark.sql.SchemaRDD =
SchemaRDD[15] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
PhysicalRDD [name#2,age#3], MappedRDD[14] at map at :18

scala> rddpeople2.registerTempTable("rddTable2")

scala> sqlContext.sql("SELECT name FROM rddTable2 WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
15/03/05 11:02:42 INFO FileInputFormat: Total input paths to process : 1
15/03/05 11:02:42 INFO SparkContext: Starting job: collect at :19
15/03/05 11:02:42 INFO DAGScheduler: Got job 1 (collect at :19) with 2 output partitions (allowLocal=false)
15/03/05 11:02:42 INFO DAGScheduler: Final stage: Stage 1(collect at :19)
15/03/05 11:02:42 INFO DAGScheduler: Parents of final stage: List()
15/03/05 11:02:42 INFO DAGScheduler: Missing parents: List()
15/03/05 11:02:42 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[17] at map at :19), which has no missing parents
15/03/05 11:02:42 INFO MemoryStore: ensureFreeSpace(6528) called with curMem=570280, maxMem=278302556
15/03/05 11:02:42 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 6.4 KB, free 264.9 MB)
15/03/05 11:02:42 INFO MemoryStore: ensureFreeSpace(3463) called with curMem=576808, maxMem=278302556
15/03/05 11:02:42 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 3.4 KB, free 264.9 MB)
15/03/05 11:02:42 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:48676 (size: 3.4 KB, free: 265.4 MB)
15/03/05 11:02:42 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0
15/03/05 11:02:42 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:838
15/03/05 11:02:42 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 (MappedRDD[17] at map at :19)
15/03/05 11:02:42 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
15/03/05 11:02:42 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, ANY, 1331 bytes)
15/03/05 11:02:42 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, ANY, 1331 bytes)
15/03/05 11:02:42 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
15/03/05 11:02:42 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
15/03/05 11:02:42 INFO HadoopRDD: Input split: hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.txt:0+19
15/03/05 11:02:42 INFO HadoopRDD: Input split: hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.txt:19+19
15/03/05 11:02:42 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1774 bytes result sent to driver
15/03/05 11:02:42 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1789 bytes result sent to driver
15/03/05 11:02:42 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 46 ms on localhost (1/2)
15/03/05 11:02:42 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 49 ms on localhost (2/2)
15/03/05 11:02:42 INFO DAGScheduler: Stage 1 (collect at :19) finished in 0.050 s
15/03/05 11:02:42 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/03/05 11:02:42 INFO DAGScheduler: Job 1 finished: collect at :19, took 0.073002 s
Name: jusdin

scala>

scala>

3)parquet

scala>

scala> rddpeople.saveAsParquetFile("hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.parquet")
15/03/05 11:06:39 INFO BlockManager: Removing broadcast 3
15/03/05 11:06:39 INFO BlockManager: Removing block broadcast_3_piece0
15/03/05 11:06:39 INFO MemoryStore: Block broadcast_3_piece0 of size 3463 dropped from memory (free 277725748)
15/03/05 11:06:39 INFO BlockManagerInfo: Removed broadcast_3_piece0 on localhost:48676 in memory (size: 3.4 KB, free: 265.4 MB)
15/03/05 11:06:39 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0
15/03/05 11:06:39 INFO BlockManager: Removing block broadcast_3
15/03/05 11:06:39 INFO MemoryStore: Block broadcast_3 of size 6528 dropped from memory (free 277732276)
15/03/05 11:06:39 INFO ContextCleaner: Cleaned broadcast 3
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
15/03/05 11:06:39 INFO SparkContext: Starting job: runJob at ParquetTableOperations.scala:325
15/03/05 11:06:39 INFO DAGScheduler: Got job 2 (runJob at ParquetTableOperations.scala:325) with 2 output partitions (allowLocal=false)
15/03/05 11:06:39 INFO DAGScheduler: Final stage: Stage 2(runJob at ParquetTableOperations.scala:325)
15/03/05 11:06:39 INFO DAGScheduler: Parents of final stage: List()
15/03/05 11:06:39 INFO DAGScheduler: Missing parents: List()
15/03/05 11:06:39 INFO DAGScheduler: Submitting Stage 2 (MapPartitionsRDD[20] at mapPartitions at ExistingRDD.scala:36), which has no missing parents
15/03/05 11:06:40 INFO MemoryStore: ensureFreeSpace(69600) called with curMem=570280, maxMem=278302556
15/03/05 11:06:40 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 68.0 KB, free 264.8 MB)
15/03/05 11:06:40 INFO MemoryStore: ensureFreeSpace(25303) called with curMem=639880, maxMem=278302556
15/03/05 11:06:40 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 24.7 KB, free 264.8 MB)
15/03/05 11:06:40 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:48676 (size: 24.7 KB, free: 265.3 MB)
15/03/05 11:06:40 INFO BlockManagerMaster: Updated info of block broadcast_4_piece0
15/03/05 11:06:40 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:838
15/03/05 11:06:40 INFO DAGScheduler: Submitting 2 missing tasks from Stage 2 (MapPartitionsRDD[20] at mapPartitions at ExistingRDD.scala:36)
15/03/05 11:06:40 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
15/03/05 11:06:40 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, localhost, ANY, 1331 bytes)
15/03/05 11:06:40 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 5, localhost, ANY, 1331 bytes)
15/03/05 11:06:40 INFO Executor: Running task 0.0 in stage 2.0 (TID 4)
15/03/05 11:06:40 INFO Executor: Running task 1.0 in stage 2.0 (TID 5)
15/03/05 11:06:40 INFO HadoopRDD: Input split: hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.txt:0+19
15/03/05 11:06:40 INFO HadoopRDD: Input split: hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.txt:19+19
15/03/05 11:06:40 INFO CodecConfig: Compression: GZIP
15/03/05 11:06:40 INFO CodecConfig: Compression: GZIP
15/03/05 11:06:40 INFO ParquetOutputFormat: Parquet block size to 134217728
15/03/05 11:06:40 INFO ParquetOutputFormat: Parquet block size to 134217728
15/03/05 11:06:40 INFO ParquetOutputFormat: Parquet page size to 1048576
15/03/05 11:06:40 INFO ParquetOutputFormat: Parquet page size to 1048576
15/03/05 11:06:40 INFO ParquetOutputFormat: Parquet dictionary page size to 1048576
15/03/05 11:06:40 INFO ParquetOutputFormat: Parquet dictionary page size to 1048576
15/03/05 11:06:40 INFO ParquetOutputFormat: Dictionary is on
15/03/05 11:06:40 INFO ParquetOutputFormat: Dictionary is on
15/03/05 11:06:40 INFO ParquetOutputFormat: Validation is off
15/03/05 11:06:40 INFO ParquetOutputFormat: Validation is off
15/03/05 11:06:40 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0
15/03/05 11:06:40 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0
15/03/05 11:06:40 INFO ZlibFactory: Successfully loaded & initialized native-zlib library
15/03/05 11:06:40 INFO CodecPool: Got brand-new compressor [.gz]
15/03/05 11:06:40 INFO CodecPool: Got brand-new compressor [.gz]
15/03/05 11:06:40 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 30,303,907
15/03/05 11:06:40 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 30,303,865
15/03/05 11:06:40 INFO ColumnChunkPageWriteStore: written 71B for [name] BINARY: 1 values, 16B raw, 34B comp, 1 pages, encodings: [PLAIN, RLE, BIT_PACKED]
15/03/05 11:06:40 INFO ColumnChunkPageWriteStore: written 84B for [name] BINARY: 3 values, 34B raw, 49B comp, 1 pages, encodings: [PLAIN, RLE, BIT_PACKED]
15/03/05 11:06:40 INFO ColumnChunkPageWriteStore: written 57B for [age] INT32: 1 values, 4B raw, 24B comp, 1 pages, encodings: [PLAIN, BIT_PACKED]
15/03/05 11:06:40 INFO ColumnChunkPageWriteStore: written 62B for [age] INT32: 3 values, 12B raw, 29B comp, 1 pages, encodings: [PLAIN, BIT_PACKED]
15/03/05 11:06:40 INFO FileOutputCommitter: Saved output of task 'attempt_201503051106_0022_r_000001_5' to hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.parquet/_temporary/0/task_201503051106_0022_r_000001
15/03/05 11:06:40 INFO FileOutputCommitter: Saved output of task 'attempt_201503051106_0022_r_000000_4' to hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.parquet/_temporary/0/task_201503051106_0022_r_000000
15/03/05 11:06:40 INFO Executor: Finished task 1.0 in stage 2.0 (TID 5). 1811 bytes result sent to driver
15/03/05 11:06:40 INFO Executor: Finished task 0.0 in stage 2.0 (TID 4). 1811 bytes result sent to driver
15/03/05 11:06:40 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 5) in 611 ms on localhost (1/2)
15/03/05 11:06:40 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) in 614 ms on localhost (2/2)
15/03/05 11:06:40 INFO DAGScheduler: Stage 2 (runJob at ParquetTableOperations.scala:325) finished in 0.616 s
15/03/05 11:06:40 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
15/03/05 11:06:40 INFO DAGScheduler: Job 2 finished: runJob at ParquetTableOperations.scala:325, took 0.745369 s
15/03/05 11:06:40 INFO ParquetFileReader: Initiating action with parallelism: 5

scala>


[root@cdh0 ~]# hadoop dfs -ls /user/hive/warehouse/db1.db/spark_person/
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

Found 2 items
drwxrwxrwt   - hue  hive          0 2015-03-05 09:30 /user/hive/warehouse/db1.db/spark_person/.hive-staging_hive_2015-03-05_09-30-52_667_1673020012782548-1
-rw-r--r--   3 root hive         38 2015-03-05 09:30 /user/hive/warehouse/db1.db/spark_person/people.txt
[root@cdh0 ~]#
[root@cdh0 ~]#
[root@cdh0 ~]#
You have new mail in /var/spool/mail/root
[root@cdh0 ~]#
[root@cdh0 ~]#
[root@cdh0 ~]# hadoop dfs -ls /user/hive/warehouse/db1.db/spark_person/
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

Found 3 items
drwxrwxrwt   - hue  hive          0 2015-03-05 09:30 /user/hive/warehouse/db1.db/spark_person/.hive-staging_hive_2015-03-05_09-30-52_667_1673020012782548-1
drwxrwxrwt   - root hive          0 2015-03-05 11:06 /user/hive/warehouse/db1.db/spark_person/people.parquet
-rw-r--r--   3 root hive         38 2015-03-05 09:30 /user/hive/warehouse/db1.db/spark_person/people.txt
[root@cdh0 ~]#
[root@cdh0 ~]# hadoop dfs -ls /user/hive/warehouse/db1.db/spark_person/people.parquet
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

Found 4 items
-rw-r--r--   3 root hive          0 2015-03-05 11:06 /user/hive/warehouse/db1.db/spark_person/people.parquet/_SUCCESS
-rw-r--r--   3 root hive        576 2015-03-05 11:06 /user/hive/warehouse/db1.db/spark_person/people.parquet/_metadata
-rw-r--r--   3 root hive        538 2015-03-05 11:06 /user/hive/warehouse/db1.db/spark_person/people.parquet/part-r-1.parquet
-rw-r--r--   3 root hive        521 2015-03-05 11:06 /user/hive/warehouse/db1.db/spark_person/people.parquet/part-r-2.parquet
[root@cdh0 ~]#
[root@cdh0 ~]#


scala> val parquetpeople = sqlContext.parquetFile("hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.parquet")  
parquetpeople: org.apache.spark.sql.SchemaRDD =
SchemaRDD[23] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
ParquetTableScan [name#12,age#13], (ParquetRelation hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.parquet, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.SQLContext@45508523, []), []

scala> parquetpeople.registerTempTable("parquetTable")

scala> sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)
15/03/05 11:20:25 INFO MemoryStore: ensureFreeSpace(264119) called with curMem=665183, maxMem=278302556
15/03/05 11:20:25 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 257.9 KB, free 264.5 MB)
15/03/05 11:20:25 INFO MemoryStore: ensureFreeSpace(21748) called with curMem=929302, maxMem=278302556
15/03/05 11:20:25 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 21.2 KB, free 264.5 MB)
15/03/05 11:20:25 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:48676 (size: 21.2 KB, free: 265.3 MB)
15/03/05 11:20:25 INFO BlockManagerMaster: Updated info of block broadcast_5_piece0
15/03/05 11:20:25 INFO SparkContext: Created broadcast 5 from NewHadoopRDD at ParquetTableOperations.scala:120
15/03/05 11:20:25 INFO FileInputFormat: Total input paths to process : 2
15/03/05 11:20:25 INFO ParquetInputFormat: Total input paths to process : 2
15/03/05 11:20:25 INFO ParquetFileReader: Initiating action with parallelism: 5
15/03/05 11:20:25 INFO ParquetFileReader: reading summary file: hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.parquet/_metadata
15/03/05 11:20:25 INFO deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
15/03/05 11:20:25 INFO deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
15/03/05 11:20:25 INFO ParquetInputFormat: There were no row groups that could be dropped due to filter predicates
15/03/05 11:20:25 INFO SparkContext: Starting job: collect at :19
15/03/05 11:20:25 INFO DAGScheduler: Got job 3 (collect at :19) with 2 output partitions (allowLocal=false)
15/03/05 11:20:25 INFO DAGScheduler: Final stage: Stage 3(collect at :19)
15/03/05 11:20:25 INFO DAGScheduler: Parents of final stage: List()
15/03/05 11:20:25 INFO DAGScheduler: Missing parents: List()
15/03/05 11:20:25 INFO DAGScheduler: Submitting Stage 3 (MappedRDD[25] at map at :19), which has no missing parents
15/03/05 11:20:25 INFO MemoryStore: ensureFreeSpace(6208) called with curMem=951050, maxMem=278302556
15/03/05 11:20:25 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 6.1 KB, free 264.5 MB)
15/03/05 11:20:25 INFO MemoryStore: ensureFreeSpace(3402) called with curMem=957258, maxMem=278302556
15/03/05 11:20:25 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 3.3 KB, free 264.5 MB)
15/03/05 11:20:25 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:48676 (size: 3.3 KB, free: 265.3 MB)
15/03/05 11:20:25 INFO BlockManagerMaster: Updated info of block broadcast_6_piece0
15/03/05 11:20:25 INFO SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:838
15/03/05 11:20:25 INFO DAGScheduler: Submitting 2 missing tasks from Stage 3 (MappedRDD[25] at map at :19)
15/03/05 11:20:25 INFO TaskSchedulerImpl: Adding task set 3.0 with 2 tasks
15/03/05 11:20:25 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 6, localhost, ANY, 2274 bytes)
15/03/05 11:20:25 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID 7, localhost, ANY, 2274 bytes)
15/03/05 11:20:25 INFO Executor: Running task 0.0 in stage 3.0 (TID 6)
15/03/05 11:20:25 INFO Executor: Running task 1.0 in stage 3.0 (TID 7)
15/03/05 11:20:25 INFO NewHadoopRDD: Input split: ParquetInputSplit{part: hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.parquet/part-r-1.parquet start: 0 length: 146 hosts: [] blocks: 1 requestedSchema: same as file fileSchema: message root {
  optional binary name (UTF8);
  required int32 age;
}
 extraMetadata: {org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":false,"metadata":{}}]}} readSupportMetadata: {org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":false,"metadata":{}}]}, org.apache.spark.sql.parquet.row.requested_schema={"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":false,"metadata":{}}]}}}
15/03/05 11:20:25 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
15/03/05 11:20:25 INFO NewHadoopRDD: Input split: ParquetInputSplit{part: hdfs://cdh0:8020/user/hive/warehouse/db1.db/spark_person/people.parquet/part-r-2.parquet start: 0 length: 128 hosts: [] blocks: 1 requestedSchema: same as file fileSchema: message root {
  optional binary name (UTF8);
  required int32 age;
}
 extraMetadata: {org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":false,"metadata":{}}]}} readSupportMetadata: {org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":false,"metadata":{}}]}, org.apache.spark.sql.parquet.row.requested_schema={"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":false,"metadata":{}}]}}}
15/03/05 11:20:25 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
15/03/05 11:20:25 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 3 records.
15/03/05 11:20:25 INFO InternalParquetRecordReader: at row 0. reading next block
15/03/05 11:20:25 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 1 records.
15/03/05 11:20:25 INFO InternalParquetRecordReader: at row 0. reading next block
15/03/05 11:20:25 INFO CodecPool: Got brand-new decompressor [.gz]
15/03/05 11:20:25 INFO CodecPool: Got brand-new decompressor [.gz]
15/03/05 11:20:25 INFO InternalParquetRecordReader: block read in memory in 19 ms. row count = 1
15/03/05 11:20:25 INFO InternalParquetRecordReader: block read in memory in 21 ms. row count = 3
15/03/05 11:20:25 INFO Executor: Finished task 0.0 in stage 3.0 (TID 6). 1789 bytes result sent to driver
15/03/05 11:20:25 INFO Executor: Finished task 1.0 in stage 3.0 (TID 7). 1802 bytes result sent to driver
15/03/05 11:20:25 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 7) in 81 ms on localhost (1/2)
15/03/05 11:20:25 INFO DAGScheduler: Stage 3 (collect at :19) finished in 0.085 s
15/03/05 11:20:25 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 6) in 84 ms on localhost (2/2)
15/03/05 11:20:25 INFO DAGScheduler: Job 3 finished: collect at :19, took 0.112587 s
15/03/05 11:20:25 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
Name: mechel
Name: mechel
Name: andy

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/10037372/viewspace-1449008/,如需转载,请注明出处,否则将追究法律责任。

上一篇: spark检索hdfs记录
请登录后发表评论 登录
全部评论

注册时间:2009-05-13

  • 博文量
    94
  • 访问量
    351318