python 2.7版本的,不是很完善。可以自己修复下使用。
#!/usr/local/Python-2.7/bin/python
# -*- coding: utf-8 -*-
######################
#数据库操作类
######################
import MySQLdb
import string
import re
import datetime
import time
import traceback
class database:
    adsys_test_db = {"host":"","user":"root","passwd":"****","db":"","port":"3306","charset":"utf8"}
    def __init__(self,conn_config):
        self.conn_config = conn_config
        self.conn = None
        self.cur = None
        self.where_sql = ""
        self.where_params = None
    def __del__(self):
        if self.conn != None:
            self.conn.close
            self.conn = None
        if self.where_sql != None and self.where_sql != "":
            self.where_sql = None
            self.where_params = None
   
    def connection(self):
        if self.conn == None:
            self.conn = MySQLdb.connect(host=self.conn_config["host"],user=self.conn_config["user"],passwd=self.conn_config["passwd"],db=self.conn_config["db"],port=string.atoi(self.conn_config["port"]),charset=self.conn_config["charset"])
            self.cur = self.conn.cursor(cursorclass = MySQLdb.cursors.DictCursor)
    #执行SQL并根据SQL访问数据
    #如SQL是select语句则返回全部查询数据
    #如SQL是insert语句则返回插入的ID
    #其它的不返回
    def exec_sql(self,sql,args=None):
        try:
            self.connection()
            if re.search("^SELECT",sql,re.IGNORECASE) != None:
                self.cur.execute(sql)
                data = self.cur.fetchall()
                self.conn.commit()
                return data
            if re.search("^INSERT\s+INTO[\s\S]+VALUES",sql,re.IGNORECASE) != None:
                self.cur.execute(sql)
                insert_id = self.conn.insert_id()
                self.conn.commit()
                return insert_id
            if args != None:
                self.cur.execute(sql,args)
            else:
                self.cur.execute(sql)
            self.conn.commit()
        except:
            if self.conn != None:
                self.conn.rollback()
            msg = "MySqlError: %s  SQL Query: %s" % (traceback.format_exc(),sql)
            raise MySQLdb.Error,msg
        finally:
            if self.conn != None:
                self.conn.close()
                self.conn = None
    #插入[insert]
    #input table 表名
    #input data 数据的关联字典
    #out insert_id
    def insert(self,table,data):
        sql = ""
        try:
            keys = ""
            values = ""
            sqlparams = []
            for key,value in data.iteritems():
                keys += "," + key if len(keys) > 0 else key
                values += ",%s" if len(values) > 0 else "%s"
                sqlparams.append(value)
            sql = "INSERT INTO " + table + " (" + keys + ") VALUES (" + values + ")"
            self.connection()
            self.cur.execute(sql,tuple(sqlparams))
            insert_id = self.conn.insert_id()
            self.conn.commit()
            return insert_id
        except:
            if self.conn != None:
                self.conn.rollback()
            msg = "MySqlError: %s  SQL Query: %s" % (traceback.format_exc(),sql)
            raise MySQLdb.Error,msg
        finally:
            if self.conn != None:
                self.conn.close()
                self.conn = None
            if self.where_sql != None and self.where_sql != "":
                self.where_sql = None
                self.where_params = None
    #批量插入[insert_batch]
    #input table 表名
    #input data 数据的关联元组字典
    #out insert_id
    def insert_batch(self,table,data):
        sql = ""
        try:
            keys = ""
            values = ""
            args = range(len(data))
            for key,value in data[0].iteritems():
                keys += "," + key if len(keys) > 0 else key
                values += ",%s" if len(values) > 0 else "%s"
            sql = "INSERT INTO " + table + " (" + keys + ") VALUES (" + values + ")"
            i = 0
            for row in data:
                sqlparams = []
                for key,value in data[i].iteritems():
                    sqlparams.append(value)
                args[i] = tuple(sqlparams)
                i+=1
            self.connection()
            self.cur.executemany(sql,args)
            insert_id = self.conn.insert_id()
            self.conn.commit()
            return insert_id
        except:
            if self.conn != None:
                self.conn.rollback()
            msg = "MySqlError: %s  SQL Query: %s" % (traceback.format_exc(),sql)
            raise MySQLdb.Error,msg
        finally:
            if self.conn != None:
                self.conn.close()
                self.conn = None
            if self.where_sql != None and self.where_sql != "":
                self.where_sql = None
                self.where_params = None
    #获取insert sql string[insert_string]
    #input table 表名
    #input data 数据的关联字典
    def insert_string(self,table,data):
        keys = ''
        values = ''
        for key,value in data.iteritems():
            keys += "," + key if len(keys) > 0 else key
            temp_value = ""
            if type(value) == datetime.datetime:
                temp_value = value.strftime("%Y-%m-%d %H:%M:%S")
                values += "," + temp_value if len(values) > 0 else temp_value
                continue
            if type(value) == time.struct_time:
                temp_value = time.strftime("%Y-%m-%d %H:%M:%S",value)
                values += "," + temp_value if len(values) > 0 else temp_value
                continue
            values += ",'%s'" % ('%s' % value).replace("'","\\'") if len(values) > 0 else "'%s'" % ('%s' % value).replace("'","\\'")
        sql = "INSERT INTO " + table + " (" + keys + ") VALUES (" + values + ")"
        return sql
    #获取insert sql string ...  on duplicate key update ...  [insert_string]
    #input table 表名
    #input data 数据的关联字典
    #input update_data 可以是字符串也可以是字典
    #out sql string
    def insert_on_duplicate_key_update_string(self,table,data,update_data = None):
        sql = self.insert_string(table,data)
        update_str = ''
        if update_data == None:
            update_data = data
        update_type = type(update_data)
        if isinstance(update_data,str):
            sql += " ON DUPLICATE KEY UPDATE " + update_data
        else:
            for k,v in update_data.iteritems():
                update_str += "%s = '%s'" % (k,('%s' % v).replace("'","\\'")) if len(update_str) == 0 else ", %s = '%s'" % (k,('%s' % v).replace("'","\\'"))
            sql += " ON DUPLICATE KEY UPDATE " + update_str
        return sql
    #where条件[where]
    #input data 条件的关联字典
    def where(self,data):
        for key,value in data.iteritems():
            operator = ['>','<','=']
            key =  re.sub("\s+",'',key,re.IGNORECASE | re.UNICODE | re.VERBOSE)
            if key[-1] in operator:
                self.where_sql = " AND " + key + " %s" if len(self.where_sql) > 0 else " WHERE " + key + " %s"
            else:
                self.where_sql = " AND " + key + " = %s" if len(self.where_sql) > 0 else " WHERE " + key + " = %s"
        self.where_params = data.values()
    #更新[update]
    #input table 表名
    #input data 数据的关联字典
    #input where 条件的关联字典
    def update(self,table,data,where=None):
        sql = ""
        try:
            set_str = ""
            params = data.values()
            for key,value in data.iteritems():
                set_str += ", " + key + " = %s" if len(set_str) > 0 else key + " = %s"
            sql = "UPDATE " + table + " SET " + set_str
            if where != None:
                self.where(where)
            if len(self.where_sql) > 0:
                sql += self.where_sql
            if self.where_params != None:
                params.extend(self.where_params)
            self.connection()
            self.cur.execute(sql,tuple(params))
            self.conn.commit()
        except:
            if self.conn != None:
                self.conn.rollback()
            msg = "MySqlError: %s  SQL Query: %s" % (traceback.format_exc(),sql)
            raise MySQLdb.Error,msg
        finally:
            if self.conn != None:
                self.conn.close()
                self.conn = None
            if self.where_sql != None and self.where_sql != "":
                self.where_sql = None
                self.where_params = None
    #获取update sql string[update_string]
    #input table 表名
    #input data 数据的关联字典
    #input where 条件的关联字典
    #out sql string
    def update_string(self,table,data,where):
        set_str = ""
        for key,value in data.iteritems():
            temp_value = ""
            if type(value) == datetime.datetime:
                temp_value = value.strftime("%Y-%m-%d %H:%M:%S")
                set_str += ", " + key + " = " + temp_value if len(set_str) > 0 else key + " = " + temp_value
                continue
            if type(value) == time.struct_time:
                temp_value = time.strftime("%Y-%m-%d %H:%M:%S",value)
                set_str += ", " + key + " = " + temp_value if len(set_str) > 0 else key + " = " + temp_value
                continue
            set_str += ", " + key + " = %s" % value if len(set_str) > 0 else key + " = %s" % value
        sql = "UPDATE " + table + " SET " + set_str
        where_str = ""
        for key,value in where.iteritems():
            where_str += " AND " + key + " = %s" % value if len(where_str) > 0 else " WHERE " + key + " = %s" % value
        sql += where_str
        return sql
    #删除[delete]
    #input table 表名
    #input where 条件的关联字典
    def delete(self,table,where=None):
        sql = ""
        try:
            params = None
            sql = "DELETE FROM " + table
            if where != None:
                self.where(where)
            if len(self.where_sql) > 0:
                sql += self.where_sql
            if self.where_params != None:
                params = self.where_params
            self.connection()
            self.cur.execute(sql,tuple(params))
            self.conn.commit()
        except:
            if self.conn != None:
                self.conn.rollback()
            msg = "MySqlError: %s  SQL Query: %s" % (traceback.format_exc(),sql)
            raise MySQLdb.Error,msg
        finally:
            if self.conn != None:
                self.conn.close()
                self.conn = None
            if self.where_sql != None and self.where_sql != "":
                self.where_sql = None
                self.where_params = None
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐