加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

Python光大证券中文云系统——爬取新浪财经新闻

(2017-06-08 16:30:52)
分类: 指标自动化

【任务目标】

调通光大证券中文云系统


【任务进度】

依据Github光大证券中文云系统开源的说明文档,应该是分爬虫模块、检索模块、统计模块、关键词频模块和关键词网络模块,是一个整体非常庞大的系统。现在的进度是,深入研究了爬虫模块。爬虫模块主要作用在于将股票论坛、个股新闻、研究报告三个网站的网页 数据通过网页解析的方式将文本内容爬下来,用于之后模块的文本挖掘。爬虫模块将爬到的文本数据以【日期 + 股票代码】为单位存至相应的TXT 文本文件当中,同时将文本文件所在的位置以及其他相关信息写入数据库


对于每个数据源,都有一个独立的程序进行网页爬虫,他们分别是:
 GetGuba_pylucene.py     股票论坛网页爬虫  
 GetMbReport_pylucene.py  研究报告网页爬虫

 GetSinaNews_pylucene.py 个股新闻网页爬虫

重点目前研究了个股新闻网页爬虫。由于该系统已有一定的年代,采用python2.7版本编写,已经不能再Python 3以上的版本中运行,因为语法有一定程度的变动,且该模块调用了很多包。所以前期花了很长的时间在配置环境上。其次,由于个股新闻网页爬虫与数据库直接相连,而原本光大证券的数据库肯定不可能给出,也无法再度连接。不得不去揣摩每张表的样貌,有哪些指标,从而在自己的数据库里去建表模拟,否则根本也无法运行。最后,要看懂代码,并对相关部分进行修改,调试,方便运行。

目前的进展是,我附上的GetSinaNews_pylucene.py这个代码已经可以直接运行,连接读取数据库也成功。但是无法出现Technical说明文档中,存储text文件的效果。经查证,似乎是调用的方式不对,GetSinaNews_pylucene.py似乎是后台文件被调用,要直接运行前端的某个文件,然而前端该文件似乎还调用了整个项目其他py代码,很复杂,因此搁置,未能调通。

具体的代码阅读理解,在注释中说明。

#coding=GBK  
import Queue       #一个队列的包,与thread配合使用,多线程运行,保证速度  
import threading   #多线程  
#from pymmseg import mmseg    ##load seperateword tool  
#mmseg.dict_load_defaults()   ##load seperateword default dictionary  
#mmseg.dict_load_words('stockname.dic') ##load special stockname and stock numberdictionary    
import urllib2     #爬虫抓取网页内容常用的一个包,抓取下来后,利用包中自带的一系列函数,对内容进行类似文档一样的处理和调用  
import time  
import re  
from bs4 import BeautifulSoup  #beatifulsoup一个爬虫解析网页的包  
from urlparse import urljoin  
import pyodbc                  #连接远程sql server数据库的包  
#import MySQLdb  
import string  
import sys  
reload(sys)  
sys.setdefaultencoding('gbk')  #对中文字符进行的编码处理  
  
  
  
  
  
originalURL="http://money.finance.sina.com.cn/corp/view/vCB_AllNewsStock.php?symbol=sh600418&Page=1"   #原始网页  
   
  
queue = Queue.Queue()  
out_queue = Queue.Queue()  
  
hosts=[]  
  
      
conn1=pyodbc.connect("DRIVER={SQL Server};SERVER=;DATABASE=;UID=;PWD=")  #连接数据库  
cur=conn1.cursor()  #游标  
cur.execute("select s_info_code,market from ggg") #where class like 'A%' and closedate is Null order by code  
#读取所有股票编号及所在市场  
result=cur.fetchall()  
stockCodes=[]  
for i in range(len(result)):  
    if result[i][1]=='SZ'.encode('GBK'):  
       stockCodes.append('sz'+result[i][0])  
    if result[i][1]=='SH'.encode('GBK'):  
       stockCodes.append('sh'+result[i][0])  
#将股票代码,市场重新存入stockCodes中,有一个转码,不再是GBK了  
  
for stock_code in stockCodes:  
    oriURL=originalURL[:-15]+stock_code+originalURL[-7:]  
    print oriURL  
    #time.sleep(1)  
    hosts.append([oriURL,stock_code[2:]])  
      
#替换原始网址中的symbol=的值,从而形成新浪财经各个编码股票的网址 存入host格式是:网址+股票编码    
  
class ThreadUrl(threading.Thread):  
    def __init__(self, queue, out_queue):  
        threading.Thread.__init__(self)  
        self.queue = queue  
        self.out_queue = out_queue  
          
#利用urillib这个包,爬取网页的所有内容  
    def run(self):  
        while True:  
           try:  
               #grabs host from queue  
               hostchunk = self.queue.get()  
           host=hostchunk[0]  
               queuestock_code=hostchunk[1]  
               #grabs urls of hosts and then grabs chunk of webpage  
               #print self.getName()+'Now Grabbing'+host  
              
               url = urllib2.urlopen(host)  
             #  print self.getName()+'END___Grabbing'+host  
               chunk = url.read()  
  
               #将读取的网页内容存入chunk  
               #再将chunk和股票编码存入队列  
               #place chunk into out queue  
               self.out_queue.put([host,chunk,queuestock_code])  
  
  
               #signals to queue job is done  
               #self.queue.task_done()  
               #特例出现报错的情况  
           except Exception,e:  
               #writeindex=file('GrabErro.txt','a+')  
               print 'There is Problem@@@@@@@@@@@@@@@@@@@GRABErro'  
              # writeindex.write(str(e)+'\n')  
              # writeindex.close()  
           if 'HTTP Error' in str(e):  
           time.sleep(10)  
           filer=file('stock_code.txt','r')  
                   stock_codes=filer.readlines()  
           for stock_code in stock_codes:  
                  oriURL=host[:71]+stock_code[:-1]+host[79:]  
                  hosts.append([oriURL,stock_code[:-1]])  
               continue  
  
class DatamineThread(threading.Thread):  
    def __init__(self,queue,out_queue):  
        threading.Thread.__init__(self)  
        self.out_queue = out_queue  
        self.queue = queue  
  
#没有完全读懂这个函数,大体是结合新浪财经网页的具体特点,将用urllib2爬下来的内容运用Beautiful Soup进行解析。  
      
    def run(self):  
        crawler1=crawler(self.getName())  
        while True:   
           try:  
               chunkUrl = self.out_queue.get()  
          # print 'QUEUE_____SIZE:::*********************'+str(self.out_queue.qsize())  
               chunk=chunkUrl[1]  
               page=chunkUrl[0]  
           tempstock_code=chunkUrl[2]  
           #   print 'addtoindex::'+page  
               #parse the chunk  
           if 'Page' in page:  
                  print self.getName()+'Beginning Souping'+page  
               soup=BeautifulSoup(chunk,fromEncoding='GBK')  
               crawler1.addtoindex(page,tempstock_code)  
######################ÐÂÀËÖÐÓÐÐ©Ò³ÃæÊÇÌø×ªµ½ÁíÒ»¸öÒ³ÃæµÄ,ÏÂÃæÓнøÐÐÏàÓ¦µÄ´¦Àí###  
               if 'AllNews'not in page:  
           InCaseTransLink=soup.find('meta',{"http-equiv":"Refresh"})  
           if InCaseTransLink==None:  
                   
              crawler1.insertTextInfo(soup,page,tempstock_code)  
           else:  
              transLink=InCaseTransLink['content'][6:]  
              if not crawler1.isindexed(transLink,tempstock_code):  
                 self.queue.put([transLink,tempstock_code])  
           if 'AllNews' in page:  
           soup1=soup.find('table',{"class":"table2"})  
                   if soup1!=None:  
            links=soup1.findAll('a',href=True)  
            indexFlag=0  
            EarlyindexFlag=0  
                    for link in links :  
                       link=link["href"]  
               if 'Early' in page:   
                   if not crawler1.isInserted(link,page[78:84]):  
                  if not crawler1.isindexed(link,page[78:84]):  
                   #if EarlyindexFlag==0:  
                      self.queue.put([link,page[78:84]])  
                   else:  
                   EarlyindexFlag=1  
                       else:  
                   if not crawler1.isInserted(link,page[73:79]):  
                  if not crawler1.isindexed(link,page[73:79]):  
                  # if indexFlag==0:  
                      self.queue.put([link,page[73:79]])  
                   else:  
                    indexFlag=1  
  
               #print self.getName()+'Ending Souping'+page  
           ##ÒòΪÔÚÐÂÀ˸ö¹ÉÔçÆÚ×ÊѶÖÐ,ÏÂÒ»Ò³µÄÁ´½ÓÖ¸Ïò´íÎó,Òò´ËдÁËÏÂÃæµÄ´úÂë  
           if 'Early' in page:  
           lastlink=links[-1]["href"]  
           if 'News' in lastlink :  
            if 'Early' not in lastlink:  
                   newlastlink=lastlink[:59]+'Early'+lastlink[59:]  
               if not crawler1.isindexed(newlastlink,page[78:84]):  
                   self.queue.put([newlastlink,page[78:84]])  
               crawler1.dbcommit()       
           except Exception,e:  
               writeindex2=file('DatamineErro.txt','a+')  
           print 'There is Problem@@@@@@@@@@@@@@@@@@@DATAMINEErro'  
               writeindex2.write(str(e)+'\n')  
               writeindex2.close()  
               continue      
  
  
###################################################################################  
class crawler:  
   
  def __init__(self, name):  
   ##  self.conn=sqlite.connect(dbname)  
     self.conn=pyodbc.connect("DRIVER={SQL Server};SERVER=;DATABASE=;UID=;PWD=")  
    # self.conn=MySQLdb.connect(host='localhost',user='root',passwd='233218')  
    # self.conn.select_db(dbname)  
     self.cursor=self.conn.cursor()  
    # self.cursor.execute('ALTER DATABASE DEFAULT CHARACTER SET GBK')  
    # self.cursor.execute('set names GBK')  
     self.name=name  
       
#自行定义封装的一个完成爬虫爬下来的内容和云端数据库存储,查询等操作的一个类  
  
  
  def __del__(self):  
     self.cursor.close() #数据库基本操作,关闭游标  
  
  def dbcommit(self):  
     self.conn.commit()  #数据库基本操作,提交  
  
#查询云端数据库某张表是否存储了该网址和对应的股票代码,存储了则返回rowid,没有则添加。  
  def getentryid(self ,table,field1,value1,field2,value2,createnew=True):  
     self.cursor.execute("select rowid from %s where %s='%s' and %s='%s'"%(table,field1,value1,field2,value2))  
     res=self.cursor.fetchone()  
 #    if table=='urllist':  
 #        print self.name+table+'*****the rowid******'+str(res)  
     if res==None:  
        #I have changed the cursor.py  line 127  charset   
###################sql serverµÄд·¨##################################       
         self.cursor.execute("insert into %s  values('%s','%s')"%(table,value1,value2))  
         self.conn.commit()  
         self.cursor.execute("select max(rowid) from %s"%(table))  
         LastRowID=self.cursor.fetchone()[0]  
######################################################################  
  
         return LastRowID  
     else:  
         return res[0]  
    
#如果在表sinaNewsUrllist中没有存储,则添加  
  def addtoindex(self,url,stockcode):  
     if (self.isindexed(url,stockcode)==False):  
         urlid=self.getentryid('sinaNewsUrllist','url',url,'stockcode',stockcode)  
   
#解析取所有网页内容中text部分  
  def gettextonly(self,soup):  
     v=soup.string  
     if v==None:  
        c=soup.contents  
        resulttext=''  
        for t in c :  
            subtext=self.gettextonly(t)  
            resulttext+=subtext  
        return resulttext  
     else:  
        return v.strip()   
  
#取文章的内容  
  def getContentsText(self,soup,url):  
     if url[26:30]=='look':  
        content=soup.find('div',{"class":"huifu"})    
        resultContenttext=self.gettextonly(content)  
        return resultContenttext  
  
#取文章的标题  
  def getTitleText(self,soup,url):  
     if url[26:30]=='look':  
        content1=soup.find('div',{"class":"biaoti"})      
        resultTitletext=self.gettextonly(content1)  
        return resultTitletext  
        
  def separatewords(self,text):  
     utf8text=text.encode('utf-8')     #because mmseg can only deal with utf-8  
     algor=mmseg.Algorithm(utf8text)  
     resulttext=[]  
     for tok in algor:  
    resulttext.append(tok.text.decode('utf-8').encode('GBK'))        
     return resulttext  
  
#查询是否有在数据库表sinaNewsUrllist中存储  
  def isindexed(self,url,stockcode):  
     self.cursor.execute("select rowid from sinaNewsUrllist where url='%s' and stockcode='%s'" % (url,stockcode))  
     u=self.cursor.fetchone()  
     if u!=None:  
         return True  
     return False  
  
#检查是否在表sinaStockNews中已经添加成功  
  def isInserted(self,url,stockcode):  
     self.cursor.execute("select * from sinaStockNews where url='%s' and stockcode='%s'" % (url,stockcode))  
     u=self.cursor.fetchone()  
     if u!=None:  
         return True  
     return False  
  
#将爬到的内容以一行:url,股票代码,发表时间,文章标题,内容的形式存储到表sinaStockNews中  
  def insertTextInfo(self,soup,url,stock_code):  
     # print '@@@@@@@@@@@@@@@@@@@@@@@@'  
      tempsoup=soup.find('div',{"class":"blkContainerSblk"})  
      title=tempsoup.find('h1').string.encode('GBK')  
      timestring=soup.find('span',{"id":"pub_date"}).string  
      time=timestring[0:4]+timestring[5:7]+timestring[8:10]  
      print time        
      if tempsoup != None:  
         texts=tempsoup.findAll('p')  
     if len(texts)>0:  
            resultText=''  
            for text in texts:  
        ##ÓÐЩÊÇûÓÐÓõĴúÂëµÄƬ¶Î,¹ýÂ˵ô#  
           tmptext=self.gettextonly(text)  
           if 'document.getElementByIdx_x("artibodyTitle").innerHTML'not in tmptext:  
               resultText=resultText+tmptext+'\n'  
      resultText.replace("'"," ")  
      self.cursor.execute("insert into sinaStockNews values('%s','%s','%s','%s','%s')"%(url,stock_code,str(time),title,resultText.decode('GBK').encode('GBK')))  
             # writeindex.write(resultText.decode('GBK').encode('utf-8'))  
                   
  
  
#没有太看明白这个函数,和爬新闻内容没有太大关系,涉及到后面的字段分析。  
  def insertWordInfo(self,soup,url):  
     id=string.atoi(url[31:37]+url[38:-5])  
     self.cursor.execute("select * from wordlocation where textid='%d'"%(id))  
     u=self.cursor.fetchone()  
     if u==None:  
         contentsText=str(self.getContentsText(soup,url))  
         titleText=str(self.getTitleText(soup,url))  
         contentsWords=self.separatewords(contentsText)  
         titleWords=self.separatewords(titleText)  
         for i in range(len(contentsWords)):  
             word=contentsWords[i]  
             self.cursor.execute("insert into wordlocation(textid,word,location) values('%d','%s','contents')"%(id,word))  
         for i in range(len(titleWords)):  
             word=titleWords[i]  
             self.cursor.execute("insert into wordlocation(textid,word,location) values('%d','%s','title')"%(id,word))  
     self.conn.commit()  
  def createindextables(self):  
    self.cursor.execute('create database gubaSearch character set gbk')  
    self.cursor.execute('create table urllist(rowid int identity(1,1) ,url varchar(100))default charset=gbk')  
    self.cursor.execute('create table wordlocation(textid bigint,word varchar(100) ,location varchar(100))default charset=gbk')  
    self.cursor.execute('create index urlidx on urllist(url)')  
    self.cursor.execute('create index textidx on wordlocation(textid)')  
    self.cursor.execute("create table text(id bigint Primary key,url varchar(100),pub_date date,topic_id int,reply_num int,access_num int ,subject varchar(100))default charset=gbk")  
    self.cursor.execute('create index idx on text(id)')  
    self.cursor.execute('create index pubdatex on text(pub_date)')  
  
  
    self.conn.commit()  
      
  
  
  
def main():  
    #spawn a pool of threads.  
    #开了15个多进程  
    for i in range(15):  
        t = ThreadUrl(queue, out_queue)  
        t.setDaemon(True)   #以后台的方式运行  
        t.start()  
  
  
    #populate queue with data  
    #放入队列  
    for host in hosts:  
        queue.put(host)  
  
  
    for i in range(40):  
        dt = DatamineThread(queue,out_queue)  
        dt.setDaemon(True)  
        dt.start()  
        print 'bbbb'  
  
  
    #wait on the queue until everything has been processed  
    queue.join()  
    out_queue.join()  
  
  
main()  

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有