python脚本sqlite3模块的应用
#!/usr/bin/python# -*- coding:utf-8 -*-import sqlite3import osclass SQLTest: '''sqlite数据库接口''' def __init__(self,path='',verbose=False): self.verbose = verbose se
·
#!/usr/bin/python
# -*- coding:utf-8 -*-
import sqlite3
import os
class SQLTest:
'''sqlite数据库接口'''
def __init__(self,path='',verbose=False):
self.verbose = verbose
self.path = path
if os.path.isfile(path):
self.conn = sqlite3.connect(path)
if self.verbose:
print('硬盘上面:[{}].format(path)')
else:
self.conn = sqlite3.connect(':memory:')
if self.verbose:
print('内存上面:[:memory:]')
def setverbose(self,b):
self.verbose = b
def createtable(self,sql):
'''创建数据库表'''
if sql is not None and sql != '':
cu = self.conn.cursor()
if self.verbose:
print('执行sql:[{}]'.format(sql))
cu.execute(sql)
self.conn.commit()
if self.verbose:
print('创建数据库表成功!')
self.close_all(cu)
else:
print('The [{}] is empty or equal None!'.format(sql))
def querytable(self):
sql = 'SELECT name FROM sqlite_master WHERE type="table" ORDER BY name'
cu = self.conn.cursor()
cu.execute(sql)
return cu.fetchall()
def renametable(self,table,newtable):
if table is not None and talbe !='':
sql = 'ALTER TABLE %s RENAME TO "%s" ' % (table,newtable)
cu = self.conn.cursor()
cu.execute(sql)
self.conn.commit()
print('delete table sucess!')
self.close_all(cu)
else:
print('The [{}] is empty or equal None!'.format(sql))
def insert(self,sql,data):
if sql is not None and sql != '':
if data is not None:
cu = self.conn.cursor()
for d in data:
cu.execute(sql,d)
self.conn.commit()
if self.verbose:
print('插入数据库表成功!')
self.close_all(cu)
else:
print('The [{}] is empty or equal None!'.format(sql))
def fetchall(self,sql):
if sql is not None and sql != '':
cu = self.conn.cursor()
if self.verbose:
print('执行sql:[{}]'.format(sql))
cu.execute(sql)
return cu.fetchall()
else:
print('The [{}] is empty or equal None!'.format(sql))
def fetchone(self,sql,data):
if sql is not None and sql != '':
if data is not None:
cu = self.conn.cursor()
cu.execute(sql,(data,))
return cu.fetchall()
else:
print('The [{}] is None!'.format(data))
else:
print('The [{}] is empty or equal None!'.format(sql))
def updata(self,sql,data):
if sql is not None and sql != '':
if data is not None:
cu = self.conn.cursor()
for d in data:
cu.execute(sql,d)
self.conn.commit()
self.close_all(cu)
else:
print('The [{}] is empty or equal None!'.format(sql))
def rowcount(self,table):
sql = 'select count (*) from "%s"' % table
cu = self.conn.cursor()
r = cu.execute(sql)
return (r.fetchone()[0])
def delete(self,sql,data):
if sql is not None and sql != '':
if data is not None:
cu = self.conn.cursor()
for d in data:
cu.execute(sql,d)
self.conn.commit()
self.close_all(cu)
else:
print('The [{}] is empty or equal None!'.format(sql))
def droptable(self,table):
if table is not None and table != '':
sql = 'DROP TABLE IF EXISTS ' + table
cu = self.conn.cursor()
cu.execute(sql)
self.conn.commit()
print('delete table sucess!')
self.close_all(cu)
else:
print('The [{}] is empty or equal None!'.format(sql))
def close_all(self,cu):
try:
if cu is not None:
cu.close()
finally:
if cu is not None:
cu.close()
# function
def drop_table_test(sql,table):
'''删除数据库表测试'''
print('删除数据库表测试 ...')
db.droptable(table)
def create_table_test(sql):
'''创建数据库表测试'''
print('创建数据库表测试 ...')
create_table_sql = '''CREATE TABLE IF NOT EXISTS 'table1'(
'id' integer(32) NOT NULL,
'name' nvarchar(128) NOT NULL,
PRIMARY KEY('id')
)'''
sql.createtable(create_table_sql)
def insert_test(sql):
'''插入数据测试 ...'''
print('插入数据测试 ...')
insert_sql = 'INSERT INTO table1 values(?,?)'
data = [(1,'aaa'),(2,'bbb')]
sql.insert(insert_sql,data)
def fetchall_test(sql):
'''查询所有数据'''
print('查询所有数据 ...')
fetchall_sql = 'SELECT * FROM table1'
r = sql.fetchall(fetchall_sql)
for e in range(len(r)):
print(r[e])
def fetchone_test(sql):
'''查询一条数据'''
print('查询一条数据 ...')
fetchall_sql = 'SELECT * FROM table1 WHERE id = ?'
data = 1
r = sql.fetchone(fetchall_sql,data)
for e in range(len(r)):
print(r[e])
def query_test(sql):
'''有几个表'''
print('有几个表 ...')
print(sql.query_table(db))
def update_test(sql):
'''更新数据'''
print('更新数据 ...')
update_sql = 'UPDATE table1 SET name = ? WHERE id = ?'
data = [('TestA',1),('TestB',2)]
sql.updata(update_sql,data)
def delete_test(db):
'''删除数据'''
print('删除数据 ...')
delete_sql = 'DELETE FROM table1 WHERE name = ? and id = ?'
data = [('TestA',1)]
sql.delete(delete_sql,data)
# self test
if __name__ == '__main__':
sql = SQLTest(verbose=True)
#创建数据库表
create_table_test(sql)
#插入数据
insert_test(sql)
#查询多条数据
fetchall_test(sql)
#查询一条数据
fetchone_test(sql)
#更新数据
update_test(sql)
#查询多条数据
fetchall_test(sql)
#删除一条数据
delete_test(sql)
print(sql.rowcount('table1'))
#查询多条数据
fetchall_test(sql)
# -*- coding:utf-8 -*-
import sqlite3
import os
class SQLTest:
'''sqlite数据库接口'''
def __init__(self,path='',verbose=False):
self.verbose = verbose
self.path = path
if os.path.isfile(path):
self.conn = sqlite3.connect(path)
if self.verbose:
print('硬盘上面:[{}].format(path)')
else:
self.conn = sqlite3.connect(':memory:')
if self.verbose:
print('内存上面:[:memory:]')
def setverbose(self,b):
self.verbose = b
def createtable(self,sql):
'''创建数据库表'''
if sql is not None and sql != '':
cu = self.conn.cursor()
if self.verbose:
print('执行sql:[{}]'.format(sql))
cu.execute(sql)
self.conn.commit()
if self.verbose:
print('创建数据库表成功!')
self.close_all(cu)
else:
print('The [{}] is empty or equal None!'.format(sql))
def querytable(self):
sql = 'SELECT name FROM sqlite_master WHERE type="table" ORDER BY name'
cu = self.conn.cursor()
cu.execute(sql)
return cu.fetchall()
def renametable(self,table,newtable):
if table is not None and talbe !='':
sql = 'ALTER TABLE %s RENAME TO "%s" ' % (table,newtable)
cu = self.conn.cursor()
cu.execute(sql)
self.conn.commit()
print('delete table sucess!')
self.close_all(cu)
else:
print('The [{}] is empty or equal None!'.format(sql))
def insert(self,sql,data):
if sql is not None and sql != '':
if data is not None:
cu = self.conn.cursor()
for d in data:
cu.execute(sql,d)
self.conn.commit()
if self.verbose:
print('插入数据库表成功!')
self.close_all(cu)
else:
print('The [{}] is empty or equal None!'.format(sql))
def fetchall(self,sql):
if sql is not None and sql != '':
cu = self.conn.cursor()
if self.verbose:
print('执行sql:[{}]'.format(sql))
cu.execute(sql)
return cu.fetchall()
else:
print('The [{}] is empty or equal None!'.format(sql))
def fetchone(self,sql,data):
if sql is not None and sql != '':
if data is not None:
cu = self.conn.cursor()
cu.execute(sql,(data,))
return cu.fetchall()
else:
print('The [{}] is None!'.format(data))
else:
print('The [{}] is empty or equal None!'.format(sql))
def updata(self,sql,data):
if sql is not None and sql != '':
if data is not None:
cu = self.conn.cursor()
for d in data:
cu.execute(sql,d)
self.conn.commit()
self.close_all(cu)
else:
print('The [{}] is empty or equal None!'.format(sql))
def rowcount(self,table):
sql = 'select count (*) from "%s"' % table
cu = self.conn.cursor()
r = cu.execute(sql)
return (r.fetchone()[0])
def delete(self,sql,data):
if sql is not None and sql != '':
if data is not None:
cu = self.conn.cursor()
for d in data:
cu.execute(sql,d)
self.conn.commit()
self.close_all(cu)
else:
print('The [{}] is empty or equal None!'.format(sql))
def droptable(self,table):
if table is not None and table != '':
sql = 'DROP TABLE IF EXISTS ' + table
cu = self.conn.cursor()
cu.execute(sql)
self.conn.commit()
print('delete table sucess!')
self.close_all(cu)
else:
print('The [{}] is empty or equal None!'.format(sql))
def close_all(self,cu):
try:
if cu is not None:
cu.close()
finally:
if cu is not None:
cu.close()
# function
def drop_table_test(sql,table):
'''删除数据库表测试'''
print('删除数据库表测试 ...')
db.droptable(table)
def create_table_test(sql):
'''创建数据库表测试'''
print('创建数据库表测试 ...')
create_table_sql = '''CREATE TABLE IF NOT EXISTS 'table1'(
'id' integer(32) NOT NULL,
'name' nvarchar(128) NOT NULL,
PRIMARY KEY('id')
)'''
sql.createtable(create_table_sql)
def insert_test(sql):
'''插入数据测试 ...'''
print('插入数据测试 ...')
insert_sql = 'INSERT INTO table1 values(?,?)'
data = [(1,'aaa'),(2,'bbb')]
sql.insert(insert_sql,data)
def fetchall_test(sql):
'''查询所有数据'''
print('查询所有数据 ...')
fetchall_sql = 'SELECT * FROM table1'
r = sql.fetchall(fetchall_sql)
for e in range(len(r)):
print(r[e])
def fetchone_test(sql):
'''查询一条数据'''
print('查询一条数据 ...')
fetchall_sql = 'SELECT * FROM table1 WHERE id = ?'
data = 1
r = sql.fetchone(fetchall_sql,data)
for e in range(len(r)):
print(r[e])
def query_test(sql):
'''有几个表'''
print('有几个表 ...')
print(sql.query_table(db))
def update_test(sql):
'''更新数据'''
print('更新数据 ...')
update_sql = 'UPDATE table1 SET name = ? WHERE id = ?'
data = [('TestA',1),('TestB',2)]
sql.updata(update_sql,data)
def delete_test(db):
'''删除数据'''
print('删除数据 ...')
delete_sql = 'DELETE FROM table1 WHERE name = ? and id = ?'
data = [('TestA',1)]
sql.delete(delete_sql,data)
# self test
if __name__ == '__main__':
sql = SQLTest(verbose=True)
#创建数据库表
create_table_test(sql)
#插入数据
insert_test(sql)
#查询多条数据
fetchall_test(sql)
#查询一条数据
fetchone_test(sql)
#更新数据
update_test(sql)
#查询多条数据
fetchall_test(sql)
#删除一条数据
delete_test(sql)
print(sql.rowcount('table1'))
#查询多条数据
fetchall_test(sql)
#==================================================================================
测试结果
更多推荐
已为社区贡献4条内容
所有评论(0)