概述SparkSQL对文件的读取和保存都有通用的方式和不同格式文件读取、保存的简化方式。创建连接对象valsession=SparkSession.builder.master("local[*]").appName("read").getOrCreate()读取文件默认的读取文件的格式是.parquet文件,在Spark安装包/examples/src/main/resources下面有例子session.read.load("路径")通用格式通用个格式默认读取的是.parquet,我们可以修改这个默认值session.read.format("json").load("路径")简化格式简化格式只有一些常见的格式session.read.json("路径")-------------------------//支持格式csvformatjdbcjsonloadoptionoptionsorcparquetschematabletexttextFile读取MySQL的数据需要加入MySQL的依赖方式一session.read.format("jdbc")//数据库连接.option("url","jdbc:mysql://127.0.0.1:3306/world")//数据库表.option("dbtable","city")//数据库用户名.option("user","root")//数据库表密码.option("password","123456").load()方式二//创建一个Properties对象valproperties=newProperties()properties.put("user","root")properties.put("password","123456")session.read.jdbc("jdbc:mysql://127.0.0.1/world","city",properties)保存文件默认的保存文件的格式是.parquet文件//df是DataFrame或者DataSet对象df.write.save("路径")通用格式能指定保存的格式和模式格式:比如Json、csv…模式:有4种模式“error”:如果文件存在,则报错“append”:追加“overwrite”:覆写“ignore”:数据存在,则忽略df.write.mode("保存模式").format("文件格式").save("路径")简化格式直接保存为相应格式的文件df.write.json("路径")-------------------------//支持格式bucketByformatjdbcmodeoptionsparquetsavesortBycsvinsertIntojsonoptionorcpartitionBysaveAsTabletext保存数据到MySQL方式一df.write.mode("append").format("jdbc").option("url","jdbc:mysql://127.0.0.1/spark").option("dbtable","user").option("user","root").option("password","123456").save()方式二valproperties=newProperties()properties.put("user","root")properties.put("password","123456")df.write.mode("append").jdbc("jdbc:mysql://127.0.0.1/spark","user",properties)
概述SparkSQL提供了两种编程方式。通过SQL语句进行查询通过调用不同的方式查询使用创建session对象valsession=SparkSession.builder.master("local[*]").appName("use").getOrCreate()查询数据{"id":1,"name":"Bigdataboy","age":"18"}{"id":2,"name":"Bob","age":"16"}{"id":3,"name":"Black","age":"18"}使用SQL语句使用SQL语句查询,必须要映射一张表名createTempView(“表名”),这其实调用的是Spark内置的Hive进行查询。这样使用的是SparkSQL内嵌的Hive来进行查询valjsonDF=session.read.json("indata/data.json")//映射一张表名jsonDF.createTempView("user")//使用SQL语句valsqlRes:DataFrame=session.sql("selectname,agefromuser")sqlRes.show()-------------+----------+---+|name|age|+----------+---+|Bigdataboy|18||Bob|16||Black|18|+----------+---+使用方法valjsonDF=session.read.json("indata/data.json")valdfRes:DataFrame=jsonDF.select("name","age")dfRes.show()------------+----------+---+|name|age|+----------+---+|Bigdataboy|18||Bob|16||Black|18|+----------+---+
概述Spark最新的数据集,在DataFrame的基础上,通过样例类来映射数据的结构信息,在字段名称上多加了字段类型等。是强类型的数据集合。创建DataSet一般不会使用直接创建,都是通过RDD或者DataFrame转换过去。直接创建创建样例类caseclassUser(id:BigInt,name:String,age:String)创建DataFrame通过样例类进行映射转换//创建DSvalDS:Dataset[User]=List(User(1,"Mary","13")).toDS()DS.show()---------+---+----+---+|id|name|age|+---+----+---+|1|Mary|13|+---+----+---+通过DataFrame转换创建样例类注意类型的精度范围caseclassUser(id:BigInt,name:String,age:String)转换valjsonDF:DataFrame=session.read.json("indata/data.json")//DF创建DS,首先需要样例类valDFtoDS:Dataset[User]=jsonDF.as[User]DFtoDS.show()-------------+---+---+----------+|age|id|name|+---+---+----------+|18|1|Bigdataboy||16|2|Bob||18|3|Black|+---+---+----------+
概述SparkSQL的核心数据集,在RDD的基础上映射相应的字段名称,更像二维的数据表。创建SparkSession对象valss:SparkSession=SparkSession.builder.master("local[*]").appName("a").getOrCreate()创建DataFrame有三种创建方式一、通过文件方式支持的文件类型scala>spark.read.csvformatjdbcjsonloadoptionoptionsorcparquetschematabletexttextFile创建scala>valcsvdata=spark.read.csv("file:///root/sparkdata/a.csv")csvdata:org.apache.spark.sql.DataFrame=[_c0:string,_c1:string...1morefield]scala>csvdata.show()+---+-------+---+|_c0|_c1|_c2|+---+-------+---+|1|Bob|12||2|Black|12||3|Bigdata|13|+---+-------+---+二、RDD转换注意:如果需要RDD与DF或者DS之间操作,那么都需要引入隐式转换,特别注意引入的位置//构建SparkSQL会话valss:SparkSession=SparkSession.builder.master("local[*]").appName("a").getOrCreate()//注意ss是SparkSession的对象importss.implicits._在SparkSession创建Sparkcontext对象并创建RDD//创建SparkContext对象,创建RDDvalsc:SparkContext=ss.sparkContextvallistRDD:RDD[(Int,String)]=sc.makeRDD(List((1,"Bob"),(2,"Bigdata"),(3,"Black")))转换toDF(colNames:String*)参数是映射的字段名称vallistDF:DataFrame=listRDD.toDF("id","name")listDF.show()-------------+---+-------+|id|name|+---+-------+|1|Bob||2|Bigdata||3|Black|+---+-------+
配置环境在pom.xml文件里加入SparkSQL依赖(注意版本与SparkCore一致)还有所支持的Scala的版本<!--https://mvnrepository.com/artifact/org.apache.spark/spark-sql--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.1.1</version></dependency>配置SparkSQL会话这个配置会话跟SparkCore里的上下文连接对象(SparkContext())一个意思。第一种单独配置config//Spark运行配置valconf=newSparkConf().setMaster("local[*]").setAppName("a")//构建SparkSQL会话valss:SparkSession=SparkSession.builder.config(conf).getOrCreate()第二种//构建SparkSQL会话valss:SparkSession=SparkSession.builder.master("local[*]").appName("a").getOrCreate()小例子文件内容{"id":1,"name":"Bigdataboy","age":"18"}{"id":2,"name":"Bob","age":"16"}{"id":3,"name":"Black","age":"18"}例子代码//构建SparkSQL会话valss:SparkSession=SparkSession.builder.master("local[*]").appName("a").getOrCreate()//读取Json文件valjsonData:DataFrame=ss.read.json("indata/data.json")//展示DataFrame数据jsonData.show()--------------+---+---+----------+|age|id|name|+---+---+----------+|18|1|Bigdataboy||16|2|Bob||18|3|Black|+---+---+----------+
什么是SparkSQLSparkSQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用。SparkSQL的特点易整合统一的数据访问方式兼容Hive标准的数据连接什么是DataFrame与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待,DataFrame也是懒执行的。性能上比RDD要高,主要原因:优化的执行计划:查询计划通过Sparkcatalystoptimiser进行优化。什么是DataSet是DataframeAPI的一个扩展,是Spark最新的数据抽象。用户友好的API风格,既具有类型安全检查也具有Dataframe的查询优化特性。Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。样例类被用来在Dataset中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称。Dataframe是Dataset的特列,DataFrame=Dataset[Row],所以可以通过as方法将Dataframe转换为Dataset。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息我都用Row来表示。DataSet是强类型的。比如可以有Dataset[Car],Dataset[Person].DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。就跟JSON对象和类对象之间的类比。