2019-9-14 269 0
2019-9-14 211 0
哔哔大数据

概述spark-submit可以提交任务到spark集群执行,也可以提交到hadoop的yarn集群执行。最简单提交命令本地模式提交spark-submit\--class类名jar包参数不同搭建的提交方式主要模板spark-submit\--class主类\--master提交地址\--deploy-mode提交方式\其他参数\运行主程序程序参数自生调度器管理俗称的集群模式,一个master,多个worker,这时是Spark自身的资源调度器管理。spark-submit\--class主类\--masterspark://master:6066\--deploy-modecluster\其他参数\运行主程序程序参数提交到Yarn管理spark-submit\--class主类\--masterspark://master:6066\--deploy-modeyarn\其他参数\运行主程序程序参数参数详解--class:执行主类,Python不需要--master:提交的地址:spark://master:6066、yarn、local--deploy-mode:默认cliebt,cluster--name:程序名--driver-memory:driver内存,默认为1G--driver-cores:driverCPU数,默认1核。--executor-memory:每个executor的内存,默认是1G--executor-core:每个executor的核数。在yarn或者standalone下使用--total-executor-cores:所有executor总共的核数。仅仅在mesos或者standalone下使用--num-executors:启动的executor数量。默认为2。在yarn下使用Spark集群例子如果有些内存核心不指定,就不能很好的利用集群的算力spark-submit\--masterspark://node1:6066\--deploy-modecluster\--driver-memory14g\--driver-cores8\--executor-memory14g\--executor-cores4\--total-executor-cores24\--classtwo/root/jar/bigdataboy.jarobs://bigdata/bigdataboyYarn提交例子spark-submit\--masteryarn\--deploy-modecluster\--driver-cores5\--driver-memory10g\--executor-memory25g\--executor-cores4\--class类名Jar包参数全部参数--master提交地址spark://host:port,mesos://host:port,yarn,k8s://https://host:port,默认:local[*]--deploy-mode提交模式默认:client(单机),cluster(集群)--class程序主类Java&Scala程序--name程序名字application的名字--jarsJARS程序使用的Jar包路径,用逗号分割--packagesmaven的Jar包名称需要与--repositories一起使用--repositories需要与--package一使用(--packagesmysql:mysql-connector-java:5.1.27--repositorieshttp://maven.aliyun.com/nexus/content/groups/public/)--exclude-packages为了避免冲突而指定不包含的package--py-filesPython程序的路径支持者.zip,.egg,or.py压缩文件或者文件--filesFILESComma-separatedlistoffilestobeplacedintheworkingdirectoryofeachexecutor.FilepathsofthesefilesinexecutorscanbeaccessedviaSparkFiles.get(fileName).--confPROP=VALUE额外的配置--properties-fileFILE加载配置路径,默认conf/spark-defaults.conf.--driver-memoryMEMMemoryfordriver(e.g.1000M,2G)(Default:1024M).--driver-java-options传递给driver的额外Java选项。--driver-library-path传递给driver的额外库路径项。--driver-class-pathdriver的类路径,用--jars添加的jar包会自动包含在类路径里--executor-memoryMEMMemoryperexecutor(e.g.1000M,2G)(Default:1G).--help,-h显示此帮助消息并退出。--verbose,-v打印额外的调试输出。--version,打印当前Spark的版本仅支持Cluster部署模式:--driver-coresNUMdriver使用的核心数,仅在集群模式下使用(Default:1).仅支持standalone、Mesos的集群部署模式:--supervise如果给定,则在失败时重新启动驱动程序。--killSUBMISSION_ID如果给定,则杀死指定的驱动程序。--statusSUBMISSION_ID如果给定,请求指定的驱动程序的状态。仅支持standaloneandMesos部署模式:--total-executor-coresNUM所有executors的核心总数。仅支持standaloneandYARN部署模式:--executor-coresNUM每个executors的核心数。(Default:1inYARNmode,或者standalone模式worker的所有核数。)仅支持YARN部署模式:--queueQUEUE_NAMEYarn提交的对列名称(Default:"default").--num-executorsNUMexecutors启动的数量(Default:2).--archivesARCHIVESCommaseparatedlistofarchivestobeextractedintotheworkingdirectoryofeachexecutor.--principalPRINCIPALPrincipaltobeusedtologintoKDC,whilerunningonsecureHDFS.--keytabKEYTABThefullpathtothefilethatcontainsthekeytabfortheprincipalspecifiedabove.ThiskeytabwillbecopiedtothenoderunningtheApplicationMasterviatheSecureDistributedCache,forrenewingtheloginticketsandthedelegationtokensperiodically.

Python极客

爬虫目标通过抖音的分享链接,获取抖音的原视频(无水印)使用模块:requests、re爬虫结构主要为了最后获取视频时所要的参数classDYSpider():def__init__(self,share_url):#get获取3个参数item_idsmidu_codeself.share_url=share_urlself.item_ids=""self.mid=""self.u_code=""#get获取参数dytk后面大括号需要item_idsself.dytk_url="https://www.iesdouyin.com/share/video/{}/"self.dytk=""#获取信息接口getself.infor_url="https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/"self.headers={"user-agent":"Mozilla/5.0(iPhone;CPUiPhoneOS11_0likeMacOSX)AppleWebKit/604.1.38(KHTML,likeGecko)Version/11.0Mobile/15A372Safari/604.1",}defget_imu(self):passdefget_dytk(self):passdefget_infor(self):passdefrun(self):passget_imu()获取item_idsmidu_code三个参数defget_imu(self):r=requests.get(url=self.share_url)dytk_data=r.urlself.item_ids=re.search(r'video/(.+?)/',dytk_data).group(1)self.mid=re.search(r'mid=(.+?)&',dytk_data).group(1)self.u_code=re.search(r'u_code=(.+?)&',dytk_data).group(1)get_dytk()为了获取dytk参数defget_dytk(self):url=self.dytk_url.format(self.item_ids)r=requests.get(url=url,headers=self.headers,params={"region":"CN","mid":self.mid,"u_code":self.u_code,"titleType":"title","utm_source":"copy_link","utm_campaign":"client_share","utm_medium":"android","app":"aweme",})self.dytk=re.search(r'dytk:"(.+?)"}\);',r.text)get_infor()没有对返回的json进行处理defget_infor(self):r=requests.get(url=self.infor_url,headers=self.headers,params={"item_ids":self.item_ids,"dytk":self.dytk,})print(r.json())运行方法defrun(self):self.get_imu()self.get_dytk()self.get_infor()使用dys=DYSpider("https://v.douyin.com/Wf6Rsa/")dys.run()

Python极客

概述pymysql是一个纯Python实现的MySQL客户端操作库。Python–以下之一:CPython>=2.7或>=3.5MySQLServer–以下之一:MySQL>=5.5MariaDB>=5.5通用使用步骤importpymysql#连接MySQLconn=pymysql.connect(host="127.0.0.1",port=3306,user="root",password="123456",database="bigdataboy",charset='utf8')#创建光标cur=conn.cursor()#SQL语句sql=""#执行SQL语句r=cur.execute(sql)#这一步提交只有在修改,增加,删除时需要,查询时不需要。conn.commit()#commit作用是:更新状态到数据库#关闭连接conn.close()创表importpymysqlconn=pymysql.connect(host="127.0.0.1",port=3306,user="root",password="123456",database="bigdataboy",charset='utf8')cur=conn.cursor()sql="""createtablebigdata(idintprimarykeyauto_increment,namevarchar(255)notnull,phoneint)charset="utf8";"""r=cur.execute(sql)#创建成功返回0,失败返回1conn.commit()conn.close()插入数据插入一条数据importpymysqlconn=pymysql.connect(host="127.0.0.1",port=3306,user="root",password="123456",database="bigdataboy",charset='utf8')cur=conn.cursor()sql="insertintobigdatavalues(null,'bigdataboy',123456789)"r=cur.execute(sql)#插入成功返回1,影响的行数conn.commit()conn.close()插入多条数据使用如下executemany()方法可以预防SQL注入攻击。importpymysqlconn=pymysql.connect(host="127.0.0.1",port=3306,user="root",password="123456",database="bigdataboy",charset='utf8')cur=conn.cursor()#需要插入数据用%s代替sql="insertintobigdatavalues(null,%s,%s)"data=[("Bob",123456),("Block",123456),("Bigdataboy",123456789)]#参数1是SQL语句,参数2是数据列表(需要循环)。r=cur.executemany(sql,data)#返回添加的行数conn.commit()conn.close()查询数据查询结果有4中返回格式:Cursor:默认,元组类型DictCursor:字典类型SSCursor:无缓冲元组类型SSDictCursor:无缓冲字典类型无缓冲游标类型,适用于数据量很大,一次性返回太慢,或者服务端带宽较小默认元组返回结果cur=conn.cursor()sql="select*frombigdata;"r=cur.execute(sql)#返回查询到的数据条数print(cur.fetchall())#提取所有查询到的结果conn.close()-----------------------------((1,'bigdataboy',123456789),(2,'bigdataboy',123456789),(3,'bigdata',1234567),(4,'Bob',123456))字段返回类型importpymysqlconn=pymysql.connect(host="127.0.0.1",port=3306,user="root",password="123456",database="bigdataboy",charset='utf8')#指定DictCursor类cur=conn.cursor(cursor=pymysql.cursors.DictCursor)sql="select*frombigdata;"r=cur.execute(sql)print(cur.fetchall())#提取所有查询到的结果conn.close()-------------------------------[{'id':1,'name':'bigdataboy','phone':123456789},{'id':2,'name':'bigdataboy','phone':123456789},{'id':3,'name':'bigdata','phone':1234567}]返回结果的提取方法方法作用fetchall()取出全部的数据,可以返回一个结果集fetchmany(size)取出一定数量的数据fetchone()取出一条数据

2020-3-17 38 0
Python极客

概述在项目中,我们可能遇到有定时任务的需求。定时执行任务:例如每天早上8:00定时推送早报。间隔执行任务:比如:爬虫间隔多少时间去爬取一次。模块简介它是一个轻量级的Python定时任务调度框架。有四种组件,分别是:调度器(scheduler),作业存储(jobstore),触发器(trigger),执行器(executor),这里只介绍触发器(trigger)。同时,它还支持异步执行、后台执行调度任务。触发器APScheduler有三种内建的触发器。date触发器:表示特定时间点触发,只执行一次。interval触发器:固定时间间隔触发。cron触发器:在特定时间周期触发,最强大的触发器,能实现每天固定时间执行功能。date触发器参数:参数说明run_date(datetime或str)作业的运行日期或时间timezone(datetime.tzinfo或str)指定时区fromapscheduler.schedulers.blockingimportBlockingSchedulerfromdatetimeimportdatetimedeffunc():print(datetime.now().strftime("%Y-%m-%d%H:%M:%S"))scheduler=BlockingScheduler()#2020-03-1617:50:00指定时间运行一次scheduler.add_job(func,next_run_time=datetime(2020,3,16,17,50))scheduler.start()interval触发器以下都是可选参数(如果不填,就是当前开始间隔一秒执行一次但不会执行太久,会自动停止):参数说明weeks(int)间隔几周weeks(int)间隔几天hours(int)间隔几小时minutes(int)间隔几天weeks(int)间隔几分钟seconds(int)间隔多少秒start_date(datetime或str)开始日期end_date(datetime或str)结束日期timezone(datetime.tzinfo或str)时区fromapscheduler.schedulers.blockingimportBlockingSchedulerfromdatetimeimportdatetimedeffunc():print(datetime.now().strftime("%Y-%m-%d%H:%M:%S"))scheduler=BlockingScheduler()#每6秒执行一次scheduler.add_job(func,trigger="interval",seconds=6)scheduler.start()cron触发器在特定时间周期性地触发,和Linuxcrontab格式兼容。它是功能最强大的触发器。参数:header1header2year(int或str)年,4位数字month(int或str)月(范围1-12)day(int或str)日(范围1-31week(int或str)周(范围1-53)day_of_week(int或str)周内第几天或者星期几(范围0-6或者mon,tue,wed,thu,fri,sat,sun)hour(int或str)时(范围0-23)minute(int或str)分(范围0-59)second(int或str)秒(范围0-59)start_date(datetime或str)最早开始日期(包含)end_date(datetime或str)最晚结束时间(包含)timezone(datetime.tzinfo或str)指定时区fromapscheduler.schedulers.blockingimportBlockingSchedulerfromdatetimeimportdatetimedeffunc():print(datetime.now().strftime("%Y-%m-%d%H:%M:%S"))scheduler=BlockingScheduler()#在1到3月和7到9月的8到12和16到20的30分执行scheduler.add_job(func,trigger='cron',month='1-3,7-9',hour="8-12,16-20",minute='30')scheduler.start()

Python极客

爬虫目标通过微博话题获取发帖的信息与用户基本信息,数据可用于该话题的用户分析。爬取字段:账号id号、昵称、关注数、粉丝数、微博数、个性签名、账号类型、等级、简介、使用设备所用到模块:requests、lxml、re说明:无登录爬取微博用户数据,实现自动获取Cookie和Cookie失效自动更新,还有其他技巧,自己看。爬虫结构classWeiBoSpider():def__init__(self):#起始地址self.url="https://s.weibo.com/weibo?q=%23%E8%94%A1%E5%BE%90%E5%9D%A4%E7%82%B9%E8%AF%84%23&Refer=top"#cookieself.cookie=self.get_cookie()#获取所有数据defget_data(self):pass#通过id获取用户信息defget_user_data(self,user_id):pass#获取&更新Cookiedefget_cookie(self):passif__name__=='__main__':wbs=WeiBoSpider()wbs.get_data()获取所有数据defget_data(self):r=requests.get(url=self.url)html=etree.HTML(r.text)#接信息网页userdatas_list=html.xpath('//div[@id="pl_feedlist_index"]/div/div[@action-type="feed_list_item"]')#获取数据users_list=list()foruserdatainuserdatas_list:#昵称nick_name=userdata.xpath('.//div[@class="info"]/div[2]/a/@nick-name')#个人页user_id=re.search(r'/(\d*?)\?',userdata.xpath('.//div[@class="info"]/div[2]/a[1]/@href')[0]).group(1)#账号类型user_type=userdata.xpath('.//div[@class="info"]/div[2]/a[2]/@title')#使用设备equipment=userdata.xpath('.//p[@class="from"]/a[2]/text()')#组合数据data={"nick_name":nick_name,"user_id":user_id,"user_type":user_type,"equipment":equipment,#调用函数获取用户信息"user_data":self.get_user_data(user_id),}print(data)users_list.append(data)returnusers_list通过ID用户信息获取用户信息时,就对Cookie有着严格的要求,但是我们实现了自动化获取Cookie。defget_user_data(self,user_id="2803301701"):user_url="https://weibo.com/u/{}".format(user_id)headers={"Content-Type":"text/html;charset=utf-8","Host":"weibo.com",}r=requests.get(url=user_url,headers=headers,cookies=self.cookie)#判断cookie的有效性ifr.text=="":r=requests.get(url=user_url,headers=headers,cookies=self.get_cookie())#个人签名try:user_sign=re.search(r"简介:(.*?)<\\/span>",r.text).group(1).replace("\\t","").strip()exceptAttributeError:user_sign=""#关注concern=re.search(r'">(\d*?)<\\/strong><spanclass=\\"S_txt2\\">关注',r.text).group(1)#粉丝fans=re.search(r'">(\d*?)<\\/strong><spanclass=\\"S_txt2\\">粉丝<\\/span>',r.text).group(1)#微博数量weibo_count=re.search(r'">(\d*?)<\\/strong><spanclass=\\"S_txt2\\">微博',r.text).group(1)return{"user_sign":user_sign,"concern":concern,"fans":fans,"weibo_count":weibo_count}Cookie获取&更新在爬取微博用户数据遇到困难的朋友可以参考以下。defget_cookie(self):s=requests.session()#获取第一个Cookies.get("https://passport.weibo.com/visitor/visitor")#获取tid参数tid_=s.post(url="https://passport.weibo.com/visitor/genvisitor",data={"cb":"gen_callback","fp":{}})tid=re.search(r'{"tid":"(.+?)"',tid_.text).group(1)#返回需要的所有Cookier=s.get(url="https://passport.weibo.com/visitor/visitor",params={"a":"incarnate","t":tid,"cb":"cross_domain",})cookies=r.cookiestry:cookies["SRT"]exceptKeyError:print("获取cookie失败,真正重新获取")returnself.get_cookie()else:self.cookie=cookiesprint(self.cookie)returncookies运行结果暂没有翻页功能,看后面补充吧!!!

Python极客

爬虫目标爬取房天下指定地区的所在小区、小区链接、小区地址、户型、总价、单价等使用的模块:requests、lxml、re、json爬虫结构以下结构可以很好的进行多线程或者协程的扩展。参数类classTool():#需要爬取的城市city="成都"#爬取页数page=2#城市列表获取所以城市的URL的方法在文末,这里只截取了一部分city_url={'成都':'https://cd.esf.fang.com','郴州':'https://chenzhou.esf.fang.com'}解析网页获取信息#获取房子的信息defget_fang_infor(url):#信息fang_list=list()#获取网页信息r=requests.get(url=url)r.encoding=r.apparent_encoding#这里的错误处理是在爬取第二页之后,URL需要获取一个参数try:href=re.search(r'//location.href="(.*?)";',r.text).group(1)exceptAttributeError:passelse:r=requests.get(url=href)r.encoding=r.apparent_encoding#解析网页html=etree.HTML(r.text)dl_html=html.xpath('//div[@class="shop_listshop_list_4"]/dl')fordlindl_html:#小区名称house=dl.xpath('.//p[@class="add_shop"]/a/@title')ifhouse==list():continue#小区链接house_url=url+dl.xpath('.//p[@class="add_shop"]/a/@href')[0]#小区地址house_add=dl.xpath('.//p[@class="add_shop"]/span/text()')#户型等去除前后符号house_types=[a.strip()foraindl.xpath('.//p[@class="tel_shop"]/text()')]#总价all_price=dl.xpath('.//dd[@class="price_right"]/span[1]/b/text()')[0]+dl.xpath('.//dd[@class="price_right"]/span/text()')[0]#单价price=dl.xpath('.//dd[@class="price_right"]/span[2]/text()')#组合数据infor={#小区名称"house":house[0],#小区地址"house_add":house_add[0],#小区链接"house_url":house_url,#户型"house_type":house_types[0],#大小"house_size":house_types[1][:-2],#楼层"floor":house_types[2],#方向"direction":house_types[3],#修建日期"xj_date":house_types[4],#总价"all_price":all_price,#单价"price":price[0][:-3]}fang_list.append(infor)returnfang_list获取所有链接返回一个所有链接的list()#获取页数连接defget_pages():url=Tool.city_url[Tool.city]pages_list=[url]fornuminrange(2,Tool.page+1):pages_list.append(url+"/house/i3{}/".format(num))returnpages_list主程序入口if__name__=='__main__':pages_list=get_pages()print(pages_list)#打印链接列表forpageinpages_list:print(page)#当前爬取的链接fang_list=get_fang_infor(page)print(fang_list)获取所有城市的URLdefget_city_url():url="https://esf.fang.com/esfcities.aspx"r=requests.get(url=url,timeout=5)r.encoding=r.apparent_encoding#匹配城市&urlcity_url_lists=json.loads(re.search(r"varcityJson=(.*?),];",r.text).group(1)+"]")#重新组合格式city_url_dic=dict()#{,...}foraincity_url_lists:city_url_dic={**city_url_dic,**{a.get("name"):"https:"+a.get("url")}}print(city_url_dic)