未分类

概述连接查询又称多表查询,查询的字段来自于多个表,推荐使用SQL99语法内连接(SQL92语法不推荐使用)就是1992年推出的语法笛卡尔乘积在连接查询不加以限制时,就会产生笛卡尔乘积现象。笛卡尔乘积:A表字段3个值,B表字段4个值,最终结果3X4=12个值。selectb.name,b2.boyNamefrombeautyb,boysb2#----------------------#name|boyName|----------|-------|柳岩|张无忌|柳岩|鹿晗|柳岩|黄晓明|柳岩|段誉|Angelababy|张无忌Angelababy|鹿晗|Angelababy|黄晓明Angelababy|段誉|热巴|张无忌|热巴|鹿晗|热巴|黄晓明|热巴|段誉|等值连接对字段进行一定的限制selectb.name,b2.boyNamefrombeautyb,boysb2whereb.boyfriend_id=b2.id#--------------------------#name|boyName|----------|-------|热巴|鹿晗|Angelababy|黄晓明|非等值连接#统计不同工资的等级selecte.salaryas工资,jg.grade_levelas等级frommyemployees.employeese,myemployees.job_gradesjgwheree.salarybetweenjg.lowest_salandjg.highest_salorderby等级,工资#------------------------------------------------------#工资|等级|-----|--|2900|A|2900|A|3000|B|3000|B|3100|B|4800|B|4800|B|5800|B|6000|C|6000|C|自连接两个需要作连接条件的字段在一张表上。就是把同一张表重名为不同的名字,就可以当做两张表查询了selecte.employee_id,e.first_name,m.first_name,m.manager_idfrommyemployees.employeese,myemployees.employeesmwheree.employee_id=m.manager_id连接查询(SQL99)语法格式select字段from表1[连接格式]join表2on连接条件[连接格式]join表3on连接条件[where限定条件][groupby分组][having查询后限定条件][orderby排序]内连接内连接:查询的是两表的交集等值连接on后的连接条件是相等#每个职位的员工个数selectj.job_title职位名称,count(*)数量frommyemployees.jobsjjoinmyemployees.employeeseonj.job_id=e.job_id#相等groupby职位名称非等值连接on后的连接条件是不等的#职工的工资等级selecte.first_name名称,e.salary工资,j.grade_level等级frommyemployees.employeesejoinmyemployees.job_gradesjone.salarybetweenj.lowest_salandj.highest_sal============================名称|工资|等级|-----------|-----|--|Steven|24000|E|Neena|17000|E|Lex|17000|E|Alexander|9000|C|Bruce|6000|C|David|4800|B|Valli|4800|B|自连接连接的两张表数据来自于同一张,只要别名不同就好selecte.employee_id领导编号,e.first_name领导名称,e1.first_name职工名称,e1.manager_id职工编号frommyemployees.employeesejoinmyemployees.employeese1one.employee_id=e1.manager_id;=================================领导编号|领导名称|职工名称|职工编号|----|---------|-----------|----|100|Steven|Neena|100|100|Steven|Lex|100|102|Lex|Alexander|102|103|Alexander|Bruce|103|103|Alexander|David|103|103|Alexander|Valli|103|外连接外连接其实是:内连接的结果+主表中有而从表里没有的可以使用where、having添加条件去掉null值左外连接左外连接:from后的主表#查询女生里没有男朋友的selectgby.name,gbs.boyNamefromgirls.beautygbyleftjoingirls.boysgbsongby.boyfriend_id=gbs.id============================name|boyName|----------|-------|周芷若|张无忌|小昭|张无忌|赵敏|张无忌|热巴|鹿晗|Angelababy|黄晓明|王语嫣|段誉|柳岩||苍老师||周冬雨||岳灵珊||双儿||夏雪||右外连接结果与上面一样,但是换成了右外连接,主从表的位置也换了,现在就是join后是主表#查询女生里没有男朋友的selectgby.name,gbs.boyNamefromgirls.boysgbsrightjoingirls.beautygbyongby.boyfriend_id=gbs.id============================name|boyName|----------|-------|周芷若|张无忌|小昭|张无忌|赵敏|张无忌|热巴|鹿晗|Angelababy|黄晓明|王语嫣|段誉|柳岩||苍老师||周冬雨||岳灵珊||双儿||夏雪||交叉连接(笛卡尔乘积)注意没有on关键字selectgby.*,gbs.*fromgirls.beautygbycrossjoingirls.boysgbs=========================id|name|sex|borndate|phone|photo|boyfriend_id|id|boyName|userCP|--|----------|---|---------------------|-----------|-----|------------|--|-------|------|3|Angelababy|女|1989-02-0300:00:00.0|18209876567||3|1|张无忌|100|3|Angelababy|女|1989-02-0300:00:00.0|18209876567||3|2|鹿晗|800|3|Angelababy|女|1989-02-0300:00:00.0|18209876567||3|3|黄晓明|50|3|Angelababy|女|1989-02-0300:00:00.0|18209876567||3|4|段誉|300|4|热巴|女|1993-02-0300:00:00.0|18209876579||2|1|张无忌|100|4|热巴|女|1993-02-0300:00:00.0|18209876579||2|2|鹿晗|800|4|热巴|女|1993-02-0300:00:00.0|18209876579||2|3|黄晓明|50|4|热巴|女|1993-02-0300:00:00.0|18209876579||2|4|段誉|300|

2020-3-1 926 0
2020-3-1 978 0
未分类

概述大体分为两类单行函数:执行之后,返回结果里字段的值不会减少,一般用于对字段进行操作。比如:length()、ifnull()、concat()聚合函数:又称分组函数,执行之后,字段里的值会减少,一般用于统计。比如:单行函数字符函数length()参数的字节长度selectlength("aaa");拼接concat()selectconcat("aaa","bbb");大小写upper|lowerselectupper("aa");selectlower("AA");截取索引从1开始#第三个参数是截取长度selectsubstring("abc",2,3);instr返回子串第一次出现的位置,没有返回0selectinstr("abcd","b")asoutput;trim()去掉前后空格,可以指定去掉的字符串selecttrim("aaa");selecttrim("a"from"aba")asoutput;--------output|------|b|lpad()左填充到指定长度,可指定填充字符selectlpad("吼吼吼",10,"*")asoutpat;rpad()右填充到指定长度,可指定填充字符selectrpad("吼吼吼",10,"*")asoutpat;replace()替换指定字符串,是全部替换selectreplace("aa吼吼吼","aa","bbb")asoutpat;数学函数四舍五入selectround(4.56);selectround(4.567,2);#保留两位小数向上取整,>=这个数selectceil(4.00);向下取整selectfloor(4.56);取余数,跟%一样selectmod(10,3);保留几位小数,不会进行四舍五入selecttruncate(1.5665,2);日期函数现在时间selectnow()as现在时间;------------------------现在时间|---------------------|2020-02-2816:40:44.0|现在日期selectcurdate()as现在日期;----------------------------现在日期|----------|2020-02-28|现在时间selectcurtime()as现在时间;----------------------------现在时间|--------|16:45:56|只截取日期格式里的某一部分,如年、月、日…其他就比如:year()、month()、day()、hour()、minute()、second()selectyear(now())as现在的年;-------------------------------现在的年|----|2020|提取字符串中的时间selectstr_to_date('2019--01-2016:01:45','%Y--%m-%d')as提取日期;-------------------------提取日期|----------|2019-01-20|时间转换时间字符串selectdate_format("1999-1-12","%Y年%m月%d日")as日期字符串;------------------------------------日期字符串|-----------|1999年01月12日|其他函数版本号selectversion();当前数据库selectdatabase();当前用户selectuser();流程控制函数if流程控制,实现ifelse效果selectif(10>5,"大","小")as结果;---------------------------------结果|--|大|case相当于Scala的模式匹配第一种匹配#格式selectcase字段或者表达式when待匹配值then值或表达式when待匹配值then值或表达式...else未匹配到的处理end结束标志----------------------------selectcase10%3when1then"奇数"when0then"偶数"else"未知"endas"类型"-----------------类型|--|奇数|第二中匹配,主要区别是case不用写值,其他都一样。#注意先匹配的先后顺序selectName,Population,casewhenPopulation>120000then"一级"whenPopulation>110000then"二级"else"其他"endas"等级"fromworld.city---------------Name|Population|等级|---------------------------------|----------|--|Serang|122400|一级|Probolinggo|120770|一级|Cilegon|117000|二级|Cianjur|114300|二级|Ciparay|111500|二级|Lhokseumawe|109600|其他|聚合函数简单的函数max():最大值min():最小值avg():平均值count():统计数量sum():求和以上函数都忽略null值,比如:count()结果不会统计null值,所以结果并不是字段中有多少个值。都可以与distinct去重配合使用,常与count()搭配更多。count(distinctPopulation)单独使用count():count(*):这样返回的就是真正的行数,全部字段一排里只要有一个字段不为null就会加1。count(1):会生成一行虚拟的值(值全为1),统计这一行。和分组函数一同查询的字段的要求是groupby操作后的字段。selectmax(Population)最大值,min(Population)最小值,avg(Population)平均值,count(Population)数量,sum(Population)和fromworld.city---------------最大值|最小值|平均值|数量|和|--------|---|-----------|----|----------|10500000|42|350468.2236|4079|1429559884|

2020-2-28 1065 0
未分类

概述Streaming的状态更新分为两种无状态更新:每个时间间隔的数据相对独立,不会进行累计,就像之前的单词统计一样,不使用个方法就行。有状态更新:会把现在统计的结果与之前的结果进行合并累计,使用有状态更新必须设置检查点。updateStateByKeyS:ClassTag=>Option[S]):DStream[(K,S)]这个方法有很多种参数,注重这个updateFunc函数参数。参数updateFunc:(Seq[V],Option[S])=>Option[S]Seq[V]:一个集合,集合里是当前计算结果相同Key的Value。Option[S]:该对象能获取之前时间段的相同Key的Value。实时单词累计统计//Spark通用配置valconf=newSparkConf().setMaster("local[*]").setAppName("up")//Streaming配置valcontext=newStreamingContext(conf,Seconds(1))//需要使用状态更新必须设置检查点机制context.checkpoint("checkdir")//监控端口valdataDStream:ReceiverInputDStream[String]=context.socketTextStream("192.168.176.61",5656)//处理数据,形成K-V对valmapDstream:DStream[(String,Int)]=dataDStream.flatMap(_.split("")).map((_,1))//新老数据合并更新valstateDStream:DStream[(String,Int)]=mapDstream.updateStateByKey{case(seq,option)=>{//获取之前的Key,如果没有取到,则返回0,说明是新出现的Keyvalstate:Int=option.getOrElse(0)//新旧Key的合并valkey:Int=state+seq.sum//返回Option对象Option(key)}}//打印结果stateDStream.print()//启动,持久化运行context.start()context.awaitTermination()

未分类

添加依赖注意SparkCore的版本和Scala的版本这里有个坑,下面依赖是Maven仓库官方的,如果运行报错,就把<scope>provided</scope>这个删掉<!--https://mvnrepository.com/artifact/org.apache.spark/spark-streaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.1.1</version><scope>provided</scope></dependency>大概使用流程SparkStreaming并不是真真意义上的实时处理,它有一个时间周期的概念,隔多长时间处理一次数据。这个间隔越小,就越接近实时。创建StreamingContext对象这里的时间周期,有几个官方提供的简单的方式,Milliseconds(毫秒)、Seconds(秒)、Minutes(分钟)…//Spark运行配置valconf=newSparkConf().setMaster("local[*]").setAppName("streaming")//创建SparkStreaming上下文对象和采集周期valcontext=newStreamingContext(conf,Seconds(3))设置监控的主机和端口SparkStreaming还能监控路径里文件的变化,但是一般不使用,有时候还会不生效,因为这方面Flume比它更强。设置了监控主机和端口后,那么就能得到DStream数据集了,底层也是RDD,完全能使用RDD的方法,并且还有新的方法。vallineDStream:ReceiverInputDStream[String]=context.socketTextStream("192.168.176.61",8888)保持Streaming的运行//启动Streaming,开始接收数据和处理流程context.start()//等待线程终止,保持SparkStreaming的持续运行,等待处理结果context.awaitTermination()一个小例子单词实时统计这个例子是无状态化转换的,每个周期的数据是独立的,没有连续统计。整体代码//Spark运行配置valconf=newSparkConf().setMaster("local[*]").setAppName("streaming")//创建SparkStreaming上下文对象,跟采集周期valcontext=newStreamingContext(conf,Seconds(3))//设置监控的主机及端口,返回DStream数据集vallineDStream:ReceiverInputDStream[String]=context.socketTextStream("192.168.176.61",8888)//对DStream进行处理valres:DStream[(String,Int)]=lineDStream.flatMap(_.split("")).map((_,1)).reduceByKey(_+_)//打印结果res.print()//启动Streaming,开始接收数据和处理流程context.start()//等待线程终止,保持SparkStreaming的持续运行,等待处理结果context.awaitTermination()使用natcat向指定端口发送数据Windows:只需要下载、解压、配置环境变量,开启命令nc-l-p端口号Linux:安装命令yuminstallnc,开启命令nc-lk端口号

2020-2-24 808 0
未分类

概述SparkSQL对文件的读取和保存都有通用的方式和不同格式文件读取、保存的简化方式。创建连接对象valsession=SparkSession.builder.master("local[*]").appName("read").getOrCreate()读取文件默认的读取文件的格式是.parquet文件,在Spark安装包/examples/src/main/resources下面有例子session.read.load("路径")通用格式通用个格式默认读取的是.parquet,我们可以修改这个默认值session.read.format("json").load("路径")简化格式简化格式只有一些常见的格式session.read.json("路径")-------------------------//支持格式csvformatjdbcjsonloadoptionoptionsorcparquetschematabletexttextFile读取MySQL的数据需要加入MySQL的依赖方式一session.read.format("jdbc")//数据库连接.option("url","jdbc:mysql://127.0.0.1:3306/world")//数据库表.option("dbtable","city")//数据库用户名.option("user","root")//数据库表密码.option("password","123456").load()方式二//创建一个Properties对象valproperties=newProperties()properties.put("user","root")properties.put("password","123456")session.read.jdbc("jdbc:mysql://127.0.0.1/world","city",properties)保存文件默认的保存文件的格式是.parquet文件//df是DataFrame或者DataSet对象df.write.save("路径")通用格式能指定保存的格式和模式格式:比如Json、csv…模式:有4种模式“error”:如果文件存在,则报错“append”:追加“overwrite”:覆写“ignore”:数据存在,则忽略df.write.mode("保存模式").format("文件格式").save("路径")简化格式直接保存为相应格式的文件df.write.json("路径")-------------------------//支持格式bucketByformatjdbcmodeoptionsparquetsavesortBycsvinsertIntojsonoptionorcpartitionBysaveAsTabletext保存数据到MySQL方式一df.write.mode("append").format("jdbc").option("url","jdbc:mysql://127.0.0.1/spark").option("dbtable","user").option("user","root").option("password","123456").save()方式二valproperties=newProperties()properties.put("user","root")properties.put("password","123456")df.write.mode("append").jdbc("jdbc:mysql://127.0.0.1/spark","user",properties)

2020-2-21 967 0
2020-2-21 948 0