接触python时间也不是很长的,最近有个项目需要分析数据,于是选用python为编程语言,除了语言特性外主要还是看重python对于sqlite3数据库良好的支持能力了,因为需要灵活处理大量的中间数据。
刚开始一些模块我还乐此不疲的写sql语句,后来渐渐厌倦了,回想到以前捣鼓c#的时候利用反射初步构建了个sql查询构造器,直到发现linq,于是放弃了这个计划,当然微软后来又推出了entity framework,这些都是后话了,而且现在我对微软的东西兴趣不是很大的,好了,扯多了,下面继续正文。
对了,再扯一句,优秀的博客程序drupal也使用了类似的查询构造器进行数据库查询,避免直接写sql语句,另外这样做的一点点好处就是,可以一定程度的屏蔽平台相关性,对于数据库迁移还是有帮助的。
不过我今天介绍的数据库辅助类查询构造器是个很简单的东东,甚至只限于sqlite数据库,如果有童鞋感兴趣可以完善下,我目前只要操作sqlite顺手就可以了,对于比较大的数据库应用就直接上orm吧。
先看代码:
复制代码 代码如下:
import sqlite3
# ***************************************************
# *
# * description: python操作sqlite3数据库辅助类(查询构造器)
# * author: wangye
# *
# ***************************************************
def _wrap_value(value):
return repr(value)
def _wrap_values(values):
return list(map(_wrap_value, values))
def _wrap_fields(fields):
for key,value in fields.items():
fields[key] = _wrap_value(value)
return fields
def _concat_keys(keys):
return [ + ],[.join(keys) + ]
def _concat_values(values):
return ,.join(values)
def _concat_fields(fields, operator = (none, ,)):
if operator:
unit_operator, group_operator = operator
# fields = _wrap_fields(fields)
compiled = []
for key,value in fields.items():
compiled.append([ + key + ])
if unit_operator:
compiled.append(unit_operator)
compiled.append(value)
compiled.append(group_operator)
compiled.pop() # pop last group_operator
return .join(compiled)
class datacondition(object):
本类用于操作sql构造器辅助类的条件语句部分
例如:
datacondition((=, and), id = 26)
datacondition((=, and), true, id = 26)
def __init__(self, operator = (=, and), ingroup = true, **kwargs):
构造方法
参数:
operator 操作符,分为(表达式操作符, 条件运算符)
ingroup 是否分组,如果分组,将以括号包含
kwargs 键值元组,包含数据库表的列名以及值
注意这里的等于号不等于实际生成sql语句符号
实际符号是由operator[0]控制的
例如:
datacondition((=, and), id = 26)
(id=26)
datacondition((>, or), id = 26, age = 35)
(id>26 or age>35)
datacondition((like, or), false, name = john, company = google)
name like 'john' or company like google
self.ingroup = ingroup
self.fields = kwargs
self.operator = operator
def __unicode__(self):
self.fields = _wrap_fields(self.fields)
result = _concat_fields(self.fields, self.operator)
if self.ingroup:
return ( + result + )
return result
def __str__(self):
return self.__unicode__()
def tostring(self):
return self.__unicode__()
class datahelper(object):
sqlite3 数据查询辅助类
def __init__(self, filename):
构造方法
参数: filename 为sqlite3 数据库文件名
self.file_name = filename
def open(self):
打开数据库并设置游标
self.connection = sqlite3.connect(self.file_name)
self.cursor = self.connection.cursor()
return self
def close(self):
关闭数据库,注意若不显式调用此方法,
在类被回收时也会尝试调用
if hasattr(self, connection) and self.connection:
self.connection.close()
def __del__(self):
析构方法,做一些清理工作
self.close()
def commit(self):
提交事务
select语句不需要此操作,默认的execute方法的
commit_at_once设为true会隐式调用此方法,
否则就需要显示调用本方法。
self.connection.commit()
def execute(self, sql = none, commit_at_once = true):
执行sql语句
参数:
sql 要执行的sql语句,若为none,则调用构造器生成的sql语句。
commit_at_once 是否立即提交事务,如果不立即提交,
对于非查询操作,则需要调用commit显式提交。
if not sql:
sql = self.sql
self.cursor.execute(sql)
if commit_at_once:
self.commit()
def fetchone(self, sql = none):
取一条记录
self.execute(sql, false)
return self.cursor.fetchone()
def fetchall(self, sql = none):
取所有记录
self.execute(sql, false)
return self.cursor.fetchall()
def __concat_keys(self, keys):
return _concat_keys(keys)
def __concat_values(self, values):
return _concat_values(values)
def table(self, *args):
设置查询的表,多个表名用逗号分隔
self.tables = args
self.tables_snippet = self.__concat_keys(self.tables)
return self
def __wrap_value(self, value):
return _wrap_value(value)
def __wrap_values(self, values):
return _wrap_values(values)
def __wrap_fields(self, fields):
return _wrap_fields(fields)
def __where(self):
# self.condition_snippet
if hasattr(self, condition_snippet):
self.where_snippet = where + self.condition_snippet
def __select(self):
template = select %(keys)s from %(tables)s
body_snippet_fields = {
tables : self.tables_snippet,
keys : self.__concat_keys(self.body_keys),
}
self.sql = template % body_snippet_fields
def __insert(self):
template = insert into %(tables)s (%(keys)s) values (%(values)s)
body_snippet_fields = {
tables : self.tables_snippet,
keys : self.__concat_keys(list(self.body_fields.keys())),
values : self.__concat_values(list(self.body_fields.values()))
}
self.sql = template % body_snippet_fields
def __update(self):
template = update %(tables)s set %(fields)s
body_snippet_fields = {
tables : self.tables_snippet,
fields : _concat_fields(self.body_fields, (=,,))
}
self.sql = template % body_snippet_fields
def __delete(self):
template = delete from %(tables)s
body_snippet_fields = {
tables : self.tables_snippet
}
self.sql = template % body_snippet_fields
def __build(self):
{
select: self.__select,
insert: self.__insert,
update: self.__update,
delete: self.__delete
}[self.current_token]()
def __unicode__(self):
return self.sql
def __str__(self):
return self.__unicode__()
def select(self, *args):
self.current_token = select
self.body_keys = args
self.__build()
return self
def insert(self, **kwargs):
self.current_token = insert
self.body_fields = self.__wrap_fields(kwargs)
self.__build()
return self
def update(self, **kwargs):
self.current_token = update
self.body_fields = self.__wrap_fields(kwargs)
self.__build()
return self
def delete(self, *conditions):
self.current_token = delete
self.__build()
#if *conditions:
self.where(*conditions)
return self
def where(self, *conditions):
conditions = list(map(str, conditions))
self.condition_snippet = and .join(conditions)
self.__where()
if hasattr(self, where_snippet):
self.sql += self.where_snippet
return self
下面举几个例子供大家参考吧:
复制代码 代码如下:
db = datahelper(/home/wangye/sample.db3)
db.open() # 打开数据库
db.execute(
create table [staffs] (
[staff_id] integer primary key autoincrement,
[staff_name] text not null,
[staff_cardnum] text not null,
[staff_reserved] integer not null
)
) # 直接执行sql语句,注意这里commit_at_once默认为true
db.table(staffs).insert(staff_name=john, staff_cardnum=1001, staff_reserved=0)
# 插入一条记录
rs = db.table(staffs).select(staff_id, staff_name).fetchall()
# 直接取出所有staff_id和staff_name
rs = db.table(staffs).select(staff_name).where(datacondition((=, and), id = 1)).fetchone()
# 取一条staff_id为1的staff_name
rs = db.table(staffs).select(staff_name).where(datacondition((# 取一条id小于100并且staff_reserved为1的staff_name记录
db.close() # 关闭数据库
目前还没有让其支持星号(*)操作符,另外在多表同名列操作方面处理得也不是很好,这个只用于日常简单的脚本操作,最好不要用于生产环境,因为可能有未知问题。