#!/usr/bin/env python
#-*- coding:utf-8 -*-
# 采集公交url地址信息,并保存到文件中
"""
1、源地址http://bus.mapbar.com/beijing/poi/5af90Q5CN8BH
2、匹配内容的模型类
3、数据保存子类
4、逻辑控制类
"""

import abc
from Queue import Queue
from threading import Thread, Lock, stack_size
from BeautifulSoup import BeautifulSoup
import urllib2, cookielib
from gzip import GzipFile
from StringIO import StringIO
import re, logging
import MySQLdb
import time
import random

class DataModel:
    """
    数据模型基类,定义基本接口
    """
    
    __metaclass__ = abc.ABCMeta
    
    @abc.abstractmethod
    def analytics(self, data):
        return
    
    
class UrlDataModel(DataModel):
    """
    对url进行分析的数据模型
    """
    def __init__(self, host, city):
        self.host = host
        self.city = city
        
    def analytics(self, data):
        self.data = data
        lineName = ""       #公交线路名
        oppLineName = ""    #反向线路名
        point = []          #公交站点名和url
#         point_url = []      #公交站url
        region = ""         #内容匹配区域
        
        log = logging.getLogger('BusUrl.UrlDataModel')
        
        
        regex_oppLineName = re.compile('<input[\s]+type="hidden"[\s]+id="h_oppLineName[0-9]+"[\s]+value="(.*?)"[^/]*/>')
        regex_point =  re.compile('<a[\s]+class="thisSite[0-9]*"[\s]+href="(.*?)"[\s]+target=".*?"><b>[0-9]+?</b>(.*?)</a')
        regex_point2 = re.compile('<div.+?title="(.*?)"><a')
        regex_lineName =  re.compile("<strong>([0-9]+)</strong>路</a>")
        
        start_index = 0
        soup = BeautifulSoup(self.data)
        
        UrlData_dict = dict()
        
        while True:
            div = soup.findAll(id="divId"+str(start_index))
            start_index = start_index + 1
            if not div:
                break
            
            div = str(div[0])
            m = regex_lineName.search(div)
            if m is None:
                log.warn("%s regex_lineName search is empty", soup.title.string)
                continue
            lineName = m.groups(0)
            
            m = regex_oppLineName.search(div)
            if m is None:
                log.warn("%s regex_oppLineName search is empty", soup.title.string)
                continue
            oppLineName = m.groups(0)[0]
#             if oppLineName.index("(") > 0:
#                 oppLineName = oppLineName.split("(")[0]
            
            m = regex_point.findall(div)
            m2 = regex_point2.findall(div)
            if len(m) == 0:
                log.warn("%s regex_point search is empty", soup.title.string)
                continue
            for i in range(0, len(m)):
                v = m[i]
                point = []
                name = v[1]
                url = v[0]
                if v[1].find("span") != -1 and len(m2) >= i:
                    name = m2[i]
                
                if not v[0].startswith("http"):
                    point.insert(0,"http://bus.mapbar.com" + url)
                else:
                    point.insert(0, url)
                point.insert(1, name)
                m[i] = tuple(point)
            point = m
            
            UrlData_obj = UrlData(lineName, oppLineName, point, self.city)
            UrlData_dict[lineName] = UrlData_obj
            
        return UrlData_dict
    
        
class UrlData:
    """
    urldata 对象
    """
    def __init__(self, lineName, oppLineName, points, city):
        self.lineName = lineName
        self.oppLineName = oppLineName
        self.points = points
        self.city = city

class Db:
    """
    Mysql database class
    """
    
    def __init__(self, host, user, passwd, db, port, socket, charset):
        self.log = logging.getLogger('Storage.Db')
        try:
             self.conn = MySQLdb.connect(host=host, user=user, passwd=passwd, db=db, port=port, unix_socket=socket, charset=charset)
        except Exception, e:
            self.log.error("Database conn failed, error is %s", e)
            raise
    
    def query(self, sql):
        try:
            self.conn.query(sql)
            result = self.conn.store_result();
            
        except Exception, e:
            self.log.error("Database query failed, error is %s", e)
            raise
        
    def fetchAll(self, sql):
        try:
            self.conn.query(sql)
            result = self.conn.store_result()
            return result.fetch_row(result.num_rows(), 2)
        except Exception, e:
            self.log.error("Database fetchALl failed, error is %s", e)
            raise   

        
    def execute(self, sql, val):
        try:
            cur = self.conn.cursor()
            cur.execute(sql, val)
            self.conn.commit()
        except Exception, e:
            self.log.error("Database execute failed, error is %s", e)
            raise
    
    def executemany(self, sql, args):
        try:
            cur = self.conn.cursor()
            rows = cur.executemany(sql, args)
            self.conn.commit()
        except Exception, e:
            self.log.error("Database execute failed, error is %s", e)
            raise
        
    def update(self, sql, args):
        try:
            cur = self.conn.cursor()
            cur.execute(sql, args)
            self.conn.commit()
        except Exception,e:
            self.log.error("Database update failed, error is %s", e)
            raise
     
    def escape(self, string):
        return self.conn.escape_string(string)   
        
class Storage:
    """
    数据存储基类
    """
    
    __metaclass__ = abc.ABCMeta
    
    @abc.abstractmethod
    def save(self, data):
        return
    

class UrlStorage:
    """
    保存url数据
    """
    def __init__(self):
        self.log = logging.getLogger('Storage.UrlStorage')
        
        self.db = Db(host='localhost',user='root',passwd='',db='bj_busonline',port=3306,socket='/tmp/mysql.sock',charset="utf8")
        
    def save(self, UrlData):
        sql = "insert into point values(NULL, %s, %s, %s, %s)"
        self.db.execute(sql, [UrlData.City])
    
    def saveList(self, UrlData_dict):
        sql = "insert into point(id, city, lineName, oppLineName, points) values (NULL, %s, %s, %s, %s)"
        args = list()
        escape = self.db.escape
        for k, v in UrlData_dict.iteritems():
            tmp_point = ""
            for val in v.points:
                tmp_point += val[1] + "-"
            args.append(tuple([escape(v.city), escape(v.lineName[0]), escape(v.oppLineName), escape(tmp_point[:-1])]))
        
        return self.db.executemany(sql, args)
        
                
    def saveUrl(self, urls):
        sql = "insert into line_url(id, url, isget, url_id) values (NULL, %s, %s, %s)"
        args = list()
        escape = self.db.escape
        for id, url in urls:
            args.append(tuple([escape(url), 0, id]))
        
        self.db.executemany(sql, args)
    
    def loadUrl(self):
        sql = "SELECT * FROM line_url"
        result=self.db.fetchAll(sql)
        self.log.info("loadUrl 'SELECT * FROM line_url' return %d", len(result))
        return result
        
    def loadFromFile(self, filename):
        import pickle
        try:
            f = open(filename, "rb")
            self.data = pickle.load(f)
            f.close()
        except Exception, e:
            self.log.error('UrlStorage Exception with message %s', e)
            raise
        
    def updateUrl(self, url):
        sql = "Update line_url Set isget=1 WHERE url=%s"
        self.db.update(sql, url)
        
class ContentEncodingProcessor(urllib2.BaseHandler):
    """A handler to add gzip capabilities to urllib2 requests """

    # decode
    def http_response(self, req, resp):
        old_resp = resp
        # gzip
        if resp.headers.get("content-encoding") == "gzip":
            gz = GzipFile(
                        fileobj=StringIO(resp.read()),
                        mode="r"
                      )
            resp = urllib2.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)
            resp.msg = old_resp.msg
        # deflate
        if resp.headers.get("content-encoding") == "deflate":
            gz = StringIO( deflate(resp.read()) )
            resp = urllib2.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)  # 'class to add info() and
            resp.msg = old_resp.msg
        return resp

# deflate support
import zlib
def deflate(data):   # zlib only provides the zlib compress format, not the deflate format;
    try:               # so on top of all there's this workaround:
            return zlib.decompress(data, -zlib.MAX_WBITS)
    except zlib.error:
            return zlib.decompress(data)

class Fetcher():
    """
    抓取数据
    """
    
    def __init__(self, url, DataModel, Storage, ThreadNum, StackSize=32768*16, headers=[]):
        self.BaseUrl = url
        self.DataModel = DataModel
        self.Storage = Storage
        self.q_task = Queue()
        self.StackSize = StackSize
        self.ThreadNum = ThreadNum
        
        cookie_support = urllib2.HTTPCookieProcessor(cookielib.CookieJar())
        encoding_support = ContentEncodingProcessor()
        self.opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler, encoding_support)
        
        
        # if headers, add to request
        self.headers = headers
            
        
        self.q_task.put(url)
        self.task_done = []
        self.lock = Lock()
        self.urls = dict()
        
        self.log = logging.getLogger("BusUrl.Fetcher")
        
        self.load()
        
        self.thread_pool = []
        for i in range(ThreadNum):
            t = Thread(target=self.threadget)
            t.start()
            self.thread_pool.append(t)
        
    def load(self):
        urls = self.Storage.loadUrl()
        for url in urls:
            self.urls[url['line_url.url_id']] = url['line_url.url']
            if url['line_url.isget'] == 0:
                self.q_task.put(url['line_url.url'])
            
        
    def __del__(self):
        """
        保存任务队列,保存运行中的数据
        """
        time.sleep(0.5)
        self.q_task.join()
        
    def threadget(self):
        while True:
            try:
                stack_size(self.StackSize)
                
                self.lock.acquire(True)
                url = self.q_task.get()
                
                req = urllib2.Request(url)
                if self.headers is not None:
                    for header in self.headers:
                        k,v = header
                        req.add_header(k, v)
                
                req.add_header("Referer", url)
                data = self.get(req)
                info_list = self.DataModel.analytics(data)
                self.Storage.saveList(info_list)
                
                self.log.info("Complete url %s fetch", url)
                self.Storage.updateUrl((url))
                
                if url not in self.task_done:
                    self.task_done.append(url)
                    
                """
                执行过的任务和没有执行过的任务区别
                任务的执行状态要随时保存,以便下次从上次未完成的任务开始
                """
                for k, v in info_list.iteritems():
                    tmp = []
                    for val in v.points:
                        if val[0] not in self.urls.values():
                            self.urls[len(self.urls)]= val[0]
                            tmp.append((len(self.urls), val[0]))
                            self.q_task.put(val[0])
                    self.Storage.saveUrl(tmp)
                
            except Exception, e:
                self.log.warn("threadget get req %s failed, except %s", req.get_full_url(), e)
                pass
            finally:
                time.sleep(random.randint(30, 40))
                self.lock.release()
            
    def get(self,req,repeat=3):
        """
        http get, 重复3次
        获取的header信息状态问题要报错
        报错信息要集中保存
        """
        try:
            response = self.opener.open(req)
            data = response.read()
        except Exception , what:
            print what,req
            if repeat>0:
                return self.get(req,repeat-1)
            else:
                self.log.warn("GET Failed req %s",req)
                return ''
        return data
    
    def wait(self):
        for i in range(self.ThreadNum):
            self.thread_pool[i].join()
        
        
if __name__ == "__main__":
    import platform
    if platform.system() == "Windows":		
        logging.basicConfig(filename = "D:/www/busonline/tools/log.txt", filemode="a", level=logging.NOTSET) 
    else:
        logging.basicConfig(filename = "/media/D/www/busonline/tools/log.txt", filemode="a", level=logging.NOTSET) 
    
    filename = "header.txt"
    headers = []
    
    f = open(filename, "r")
    for header in f.readlines():
        v = header.strip().split("|", 2)
        headers.append((v[0].strip(), v[1].strip()))

    UrlDataModel_obj = UrlDataModel("http://bus.mapbar.com", "北京")
    UrlStorage_obj = UrlStorage()
    url = "http://bus.mapbar.com/beijing/poi/5af90Q5CN8BH"
    fetcher = Fetcher(url, UrlDataModel_obj, UrlStorage_obj, 4, headers=headers)
    fetcher.wait()
    
    


代码就这些量不大,基本解决从网页分析到入库部分问题,在保存url时有数据重复的bug没有处理。

数据库表结构

CREATE TABLE `line_url` (
 `id` int(11) NOT NULL AUTO_INCREMENT,
 `url` varchar(255) NOT NULL,
 `isget` tinyint(1) NOT NULL DEFAULT '0',
 `url_id` varchar(45) NOT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6865 DEFAULT CHARSET=utf8

CREATE TABLE `point` (
 `id` int(11) NOT NULL AUTO_INCREMENT,
 `city` varchar(45) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
 `lineName` varchar(45) DEFAULT NULL,
 `oppLineName` varchar(45) DEFAULT NULL,
 `points` varchar(10000) DEFAULT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10416 DEFAULT CHARSET=utf8



Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐