根据CI模式弄的一个python2访问MySql数据库的class
#!/usr/local/Python-2.7/bin/python# -*- coding: utf-8 -*-import MySQLdbimport stringimport reimport datetimeimport timeimport tracebackclass database:adsys_test_db = {"
·
python 2.7版本的,不是很完善。可以自己修复下使用。
#!/usr/local/Python-2.7/bin/python
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
######################
#数据库操作类
######################
#数据库操作类
######################
import MySQLdb
import string
import re
import datetime
import time
import traceback
import string
import re
import datetime
import time
import traceback
class database:
adsys_test_db = {"host":"","user":"root","passwd":"****","db":"","port":"3306","charset":"utf8"}
adsys_test_db = {"host":"","user":"root","passwd":"****","db":"","port":"3306","charset":"utf8"}
def __init__(self,conn_config):
self.conn_config = conn_config
self.conn = None
self.cur = None
self.where_sql = ""
self.where_params = None
self.conn_config = conn_config
self.conn = None
self.cur = None
self.where_sql = ""
self.where_params = None
def __del__(self):
if self.conn != None:
self.conn.close
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = None
self.where_params = None
def connection(self):
if self.conn == None:
self.conn = MySQLdb.connect(host=self.conn_config["host"],user=self.conn_config["user"],passwd=self.conn_config["passwd"],db=self.conn_config["db"],port=string.atoi(self.conn_config["port"]),charset=self.conn_config["charset"])
self.cur = self.conn.cursor(cursorclass = MySQLdb.cursors.DictCursor)
if self.conn != None:
self.conn.close
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = None
self.where_params = None
def connection(self):
if self.conn == None:
self.conn = MySQLdb.connect(host=self.conn_config["host"],user=self.conn_config["user"],passwd=self.conn_config["passwd"],db=self.conn_config["db"],port=string.atoi(self.conn_config["port"]),charset=self.conn_config["charset"])
self.cur = self.conn.cursor(cursorclass = MySQLdb.cursors.DictCursor)
#执行SQL并根据SQL访问数据
#如SQL是select语句则返回全部查询数据
#如SQL是insert语句则返回插入的ID
#其它的不返回
def exec_sql(self,sql,args=None):
try:
self.connection()
if re.search("^SELECT",sql,re.IGNORECASE) != None:
self.cur.execute(sql)
data = self.cur.fetchall()
self.conn.commit()
return data
#如SQL是select语句则返回全部查询数据
#如SQL是insert语句则返回插入的ID
#其它的不返回
def exec_sql(self,sql,args=None):
try:
self.connection()
if re.search("^SELECT",sql,re.IGNORECASE) != None:
self.cur.execute(sql)
data = self.cur.fetchall()
self.conn.commit()
return data
if re.search("^INSERT\s+INTO[\s\S]+VALUES",sql,re.IGNORECASE) != None:
self.cur.execute(sql)
insert_id = self.conn.insert_id()
self.conn.commit()
return insert_id
self.cur.execute(sql)
insert_id = self.conn.insert_id()
self.conn.commit()
return insert_id
if args != None:
self.cur.execute(sql,args)
else:
self.cur.execute(sql)
self.conn.commit()
except:
if self.conn != None:
self.conn.rollback()
msg = "MySqlError: %s SQL Query: %s" % (traceback.format_exc(),sql)
raise MySQLdb.Error,msg
finally:
if self.conn != None:
self.conn.close()
self.conn = None
self.cur.execute(sql,args)
else:
self.cur.execute(sql)
self.conn.commit()
except:
if self.conn != None:
self.conn.rollback()
msg = "MySqlError: %s SQL Query: %s" % (traceback.format_exc(),sql)
raise MySQLdb.Error,msg
finally:
if self.conn != None:
self.conn.close()
self.conn = None
#插入[insert]
#input table 表名
#input data 数据的关联字典
#out insert_id
def insert(self,table,data):
sql = ""
try:
keys = ""
values = ""
sqlparams = []
for key,value in data.iteritems():
keys += "," + key if len(keys) > 0 else key
values += ",%s" if len(values) > 0 else "%s"
sqlparams.append(value)
sql = "INSERT INTO " + table + " (" + keys + ") VALUES (" + values + ")"
self.connection()
self.cur.execute(sql,tuple(sqlparams))
insert_id = self.conn.insert_id()
self.conn.commit()
return insert_id
except:
if self.conn != None:
self.conn.rollback()
msg = "MySqlError: %s SQL Query: %s" % (traceback.format_exc(),sql)
raise MySQLdb.Error,msg
finally:
if self.conn != None:
self.conn.close()
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = None
self.where_params = None
#input table 表名
#input data 数据的关联字典
#out insert_id
def insert(self,table,data):
sql = ""
try:
keys = ""
values = ""
sqlparams = []
for key,value in data.iteritems():
keys += "," + key if len(keys) > 0 else key
values += ",%s" if len(values) > 0 else "%s"
sqlparams.append(value)
sql = "INSERT INTO " + table + " (" + keys + ") VALUES (" + values + ")"
self.connection()
self.cur.execute(sql,tuple(sqlparams))
insert_id = self.conn.insert_id()
self.conn.commit()
return insert_id
except:
if self.conn != None:
self.conn.rollback()
msg = "MySqlError: %s SQL Query: %s" % (traceback.format_exc(),sql)
raise MySQLdb.Error,msg
finally:
if self.conn != None:
self.conn.close()
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = None
self.where_params = None
#批量插入[insert_batch]
#input table 表名
#input data 数据的关联元组字典
#out insert_id
def insert_batch(self,table,data):
sql = ""
try:
keys = ""
values = ""
args = range(len(data))
for key,value in data[0].iteritems():
keys += "," + key if len(keys) > 0 else key
values += ",%s" if len(values) > 0 else "%s"
sql = "INSERT INTO " + table + " (" + keys + ") VALUES (" + values + ")"
i = 0
for row in data:
sqlparams = []
for key,value in data[i].iteritems():
sqlparams.append(value)
args[i] = tuple(sqlparams)
i+=1
self.connection()
self.cur.executemany(sql,args)
insert_id = self.conn.insert_id()
self.conn.commit()
return insert_id
except:
if self.conn != None:
self.conn.rollback()
msg = "MySqlError: %s SQL Query: %s" % (traceback.format_exc(),sql)
raise MySQLdb.Error,msg
finally:
if self.conn != None:
self.conn.close()
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = None
self.where_params = None
#input table 表名
#input data 数据的关联元组字典
#out insert_id
def insert_batch(self,table,data):
sql = ""
try:
keys = ""
values = ""
args = range(len(data))
for key,value in data[0].iteritems():
keys += "," + key if len(keys) > 0 else key
values += ",%s" if len(values) > 0 else "%s"
sql = "INSERT INTO " + table + " (" + keys + ") VALUES (" + values + ")"
i = 0
for row in data:
sqlparams = []
for key,value in data[i].iteritems():
sqlparams.append(value)
args[i] = tuple(sqlparams)
i+=1
self.connection()
self.cur.executemany(sql,args)
insert_id = self.conn.insert_id()
self.conn.commit()
return insert_id
except:
if self.conn != None:
self.conn.rollback()
msg = "MySqlError: %s SQL Query: %s" % (traceback.format_exc(),sql)
raise MySQLdb.Error,msg
finally:
if self.conn != None:
self.conn.close()
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = None
self.where_params = None
#获取insert sql string[insert_string]
#input table 表名
#input data 数据的关联字典
def insert_string(self,table,data):
keys = ''
values = ''
for key,value in data.iteritems():
keys += "," + key if len(keys) > 0 else key
temp_value = ""
if type(value) == datetime.datetime:
temp_value = value.strftime("%Y-%m-%d %H:%M:%S")
values += "," + temp_value if len(values) > 0 else temp_value
continue
if type(value) == time.struct_time:
temp_value = time.strftime("%Y-%m-%d %H:%M:%S",value)
values += "," + temp_value if len(values) > 0 else temp_value
continue
values += ",'%s'" % ('%s' % value).replace("'","\\'") if len(values) > 0 else "'%s'" % ('%s' % value).replace("'","\\'")
sql = "INSERT INTO " + table + " (" + keys + ") VALUES (" + values + ")"
return sql
#input table 表名
#input data 数据的关联字典
def insert_string(self,table,data):
keys = ''
values = ''
for key,value in data.iteritems():
keys += "," + key if len(keys) > 0 else key
temp_value = ""
if type(value) == datetime.datetime:
temp_value = value.strftime("%Y-%m-%d %H:%M:%S")
values += "," + temp_value if len(values) > 0 else temp_value
continue
if type(value) == time.struct_time:
temp_value = time.strftime("%Y-%m-%d %H:%M:%S",value)
values += "," + temp_value if len(values) > 0 else temp_value
continue
values += ",'%s'" % ('%s' % value).replace("'","\\'") if len(values) > 0 else "'%s'" % ('%s' % value).replace("'","\\'")
sql = "INSERT INTO " + table + " (" + keys + ") VALUES (" + values + ")"
return sql
#获取insert sql string ... on duplicate key update ... [insert_string]
#input table 表名
#input data 数据的关联字典
#input update_data 可以是字符串也可以是字典
#out sql string
def insert_on_duplicate_key_update_string(self,table,data,update_data = None):
sql = self.insert_string(table,data)
update_str = ''
if update_data == None:
update_data = data
update_type = type(update_data)
if isinstance(update_data,str):
sql += " ON DUPLICATE KEY UPDATE " + update_data
else:
for k,v in update_data.iteritems():
update_str += "%s = '%s'" % (k,('%s' % v).replace("'","\\'")) if len(update_str) == 0 else ", %s = '%s'" % (k,('%s' % v).replace("'","\\'"))
sql += " ON DUPLICATE KEY UPDATE " + update_str
return sql
#input table 表名
#input data 数据的关联字典
#input update_data 可以是字符串也可以是字典
#out sql string
def insert_on_duplicate_key_update_string(self,table,data,update_data = None):
sql = self.insert_string(table,data)
update_str = ''
if update_data == None:
update_data = data
update_type = type(update_data)
if isinstance(update_data,str):
sql += " ON DUPLICATE KEY UPDATE " + update_data
else:
for k,v in update_data.iteritems():
update_str += "%s = '%s'" % (k,('%s' % v).replace("'","\\'")) if len(update_str) == 0 else ", %s = '%s'" % (k,('%s' % v).replace("'","\\'"))
sql += " ON DUPLICATE KEY UPDATE " + update_str
return sql
#where条件[where]
#input data 条件的关联字典
def where(self,data):
for key,value in data.iteritems():
operator = ['>','<','=']
key = re.sub("\s+",'',key,re.IGNORECASE | re.UNICODE | re.VERBOSE)
if key[-1] in operator:
self.where_sql = " AND " + key + " %s" if len(self.where_sql) > 0 else " WHERE " + key + " %s"
else:
self.where_sql = " AND " + key + " = %s" if len(self.where_sql) > 0 else " WHERE " + key + " = %s"
self.where_params = data.values()
#input data 条件的关联字典
def where(self,data):
for key,value in data.iteritems():
operator = ['>','<','=']
key = re.sub("\s+",'',key,re.IGNORECASE | re.UNICODE | re.VERBOSE)
if key[-1] in operator:
self.where_sql = " AND " + key + " %s" if len(self.where_sql) > 0 else " WHERE " + key + " %s"
else:
self.where_sql = " AND " + key + " = %s" if len(self.where_sql) > 0 else " WHERE " + key + " = %s"
self.where_params = data.values()
#更新[update]
#input table 表名
#input data 数据的关联字典
#input where 条件的关联字典
def update(self,table,data,where=None):
sql = ""
try:
set_str = ""
params = data.values()
for key,value in data.iteritems():
set_str += ", " + key + " = %s" if len(set_str) > 0 else key + " = %s"
sql = "UPDATE " + table + " SET " + set_str
if where != None:
self.where(where)
if len(self.where_sql) > 0:
sql += self.where_sql
if self.where_params != None:
params.extend(self.where_params)
self.connection()
self.cur.execute(sql,tuple(params))
self.conn.commit()
except:
if self.conn != None:
self.conn.rollback()
msg = "MySqlError: %s SQL Query: %s" % (traceback.format_exc(),sql)
raise MySQLdb.Error,msg
finally:
if self.conn != None:
self.conn.close()
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = None
self.where_params = None
#input table 表名
#input data 数据的关联字典
#input where 条件的关联字典
def update(self,table,data,where=None):
sql = ""
try:
set_str = ""
params = data.values()
for key,value in data.iteritems():
set_str += ", " + key + " = %s" if len(set_str) > 0 else key + " = %s"
sql = "UPDATE " + table + " SET " + set_str
if where != None:
self.where(where)
if len(self.where_sql) > 0:
sql += self.where_sql
if self.where_params != None:
params.extend(self.where_params)
self.connection()
self.cur.execute(sql,tuple(params))
self.conn.commit()
except:
if self.conn != None:
self.conn.rollback()
msg = "MySqlError: %s SQL Query: %s" % (traceback.format_exc(),sql)
raise MySQLdb.Error,msg
finally:
if self.conn != None:
self.conn.close()
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = None
self.where_params = None
#获取update sql string[update_string]
#input table 表名
#input data 数据的关联字典
#input where 条件的关联字典
#out sql string
def update_string(self,table,data,where):
set_str = ""
for key,value in data.iteritems():
temp_value = ""
if type(value) == datetime.datetime:
temp_value = value.strftime("%Y-%m-%d %H:%M:%S")
set_str += ", " + key + " = " + temp_value if len(set_str) > 0 else key + " = " + temp_value
continue
if type(value) == time.struct_time:
temp_value = time.strftime("%Y-%m-%d %H:%M:%S",value)
set_str += ", " + key + " = " + temp_value if len(set_str) > 0 else key + " = " + temp_value
continue
set_str += ", " + key + " = %s" % value if len(set_str) > 0 else key + " = %s" % value
sql = "UPDATE " + table + " SET " + set_str
where_str = ""
for key,value in where.iteritems():
where_str += " AND " + key + " = %s" % value if len(where_str) > 0 else " WHERE " + key + " = %s" % value
sql += where_str
return sql
#input table 表名
#input data 数据的关联字典
#input where 条件的关联字典
#out sql string
def update_string(self,table,data,where):
set_str = ""
for key,value in data.iteritems():
temp_value = ""
if type(value) == datetime.datetime:
temp_value = value.strftime("%Y-%m-%d %H:%M:%S")
set_str += ", " + key + " = " + temp_value if len(set_str) > 0 else key + " = " + temp_value
continue
if type(value) == time.struct_time:
temp_value = time.strftime("%Y-%m-%d %H:%M:%S",value)
set_str += ", " + key + " = " + temp_value if len(set_str) > 0 else key + " = " + temp_value
continue
set_str += ", " + key + " = %s" % value if len(set_str) > 0 else key + " = %s" % value
sql = "UPDATE " + table + " SET " + set_str
where_str = ""
for key,value in where.iteritems():
where_str += " AND " + key + " = %s" % value if len(where_str) > 0 else " WHERE " + key + " = %s" % value
sql += where_str
return sql
#删除[delete]
#input table 表名
#input where 条件的关联字典
def delete(self,table,where=None):
sql = ""
try:
params = None
sql = "DELETE FROM " + table
if where != None:
self.where(where)
if len(self.where_sql) > 0:
sql += self.where_sql
if self.where_params != None:
params = self.where_params
self.connection()
self.cur.execute(sql,tuple(params))
self.conn.commit()
except:
if self.conn != None:
self.conn.rollback()
msg = "MySqlError: %s SQL Query: %s" % (traceback.format_exc(),sql)
raise MySQLdb.Error,msg
finally:
if self.conn != None:
self.conn.close()
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = None
self.where_params = None
#input table 表名
#input where 条件的关联字典
def delete(self,table,where=None):
sql = ""
try:
params = None
sql = "DELETE FROM " + table
if where != None:
self.where(where)
if len(self.where_sql) > 0:
sql += self.where_sql
if self.where_params != None:
params = self.where_params
self.connection()
self.cur.execute(sql,tuple(params))
self.conn.commit()
except:
if self.conn != None:
self.conn.rollback()
msg = "MySqlError: %s SQL Query: %s" % (traceback.format_exc(),sql)
raise MySQLdb.Error,msg
finally:
if self.conn != None:
self.conn.close()
self.conn = None
if self.where_sql != None and self.where_sql != "":
self.where_sql = None
self.where_params = None
更多推荐
已为社区贡献1条内容
所有评论(0)