实例 1:
通过 db_config.json
加载数据库配置; 常规的增删改查进行封装
代码
连库配置: db_config.json
{
"host": "192.168.7.251",
"user": "root",
"password": "123456",
"db": "mars",
"charset": "utf8",
"port": 3306
}
封装工具类: mysqlutils.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# author: xu3352<xu3352@gmail.com>
# python3 环境
"""
Python Mysql 工具包
1. 通过 db_config.json 加载数据库配置
2. 常规的增删改查进行封装
注意事项:
1. %s 为 mysql 占位符; 能用 %s 的地方就不要自己拼接 sql 了
2. sql 里有一个占位符可使用 string 或 number; 有多个占位符可使用 tuple|list
3. insertmany 的时候所有字段使用占位符 %s (预编译), 参数使用 tuple|list
4. queryall 结果集只有一列的情况, 会自动转换为简单的列表 参考:simple_list()
5. queryone 结果集只有一行一列的情况, 自动转为结果数据 参考:simple_value()
6. insertone 插入一条数据, 返回数据ID
"""
import os
import json
import traceback
import pymysql.cursors
from utils import loggerutils
logger = loggerutils.logger
def find(name, path):
""" 查找文件路径 """
for root, dirs, files in os.walk(path):
if name in files:
return os.path.join(root, name)
def connect_mysql():
""" 创建链接 """
try:
config = find("db_config.json", os.path.abspath("."))
with open(config, "r") as file:
load_dict = json.load(file)
return pymysql.connect(cursorclass=pymysql.cursors.DictCursor, **load_dict)
except Exception as e:
logger.error(traceback.format_exc())
logger.error("cannot create mysql connect")
def queryone(sql, param=None):
"""
返回结果集的第一条数据
:param sql: sql语句
:param param: string|tuple|list
:return: 字典列表 [{}]
"""
con = connect_mysql()
cur = con.cursor()
row = None
try:
cur.execute(sql, param)
row = cur.fetchone()
except Exception as e:
con.rollback()
logger.error(traceback.format_exc())
logger.error(":{} [param]:{}".format(sql, param))
cur.close()
con.close()
return simple_value(row)
def queryall(sql, param=None):
"""
返回所有查询到的内容 (分页要在sql里写好)
:param sql: sql语句
:param param: tuple|list
:return: 字典列表 [{},{},{}...] or [,,,]
"""
con = connect_mysql()
cur = con.cursor()
rows = None
try:
cur.execute(sql, param)
rows = cur.fetchall()
except Exception as e:
con.rollback()
logger.error(traceback.format_exc())
logger.error(":{} [param]:{}".format(sql, param))
cur.close()
con.close()
return simple_list(rows)
def insertmany(sql, arrays=None):
"""
批量插入数据
:param sql: sql语句
:param arrays: list|tuple [(),(),()...]
:return: 入库数量
"""
con = connect_mysql()
cur = con.cursor()
cnt = 0
try:
cnt = cur.executemany(sql, arrays)
con.commit()
except Exception as e:
con.rollback()
logger.error(traceback.format_exc())
logger.error(":{} [param]:{}".format(sql, arrays))
cur.close()
con.close()
return cnt
def insertone(sql, param=None):
"""
插入一条数据
:param sql: sql语句
:param param: string|tuple
:return: id
"""
con = connect_mysql()
cur = con.cursor()
lastrowid = 0
try:
cur.execute(sql, param)
con.commit()
lastrowid = cur.lastrowid
except Exception as e:
con.rollback()
logger.error(traceback.format_exc())
logger.error(":{} [param]:{}".format(sql, param))
cur.close()
con.close()
return lastrowid
def execute(sql, param=None):
"""
执行sql语句:修改或删除
:param sql: sql语句
:param param: string|list
:return: 影响数量
"""
con = connect_mysql()
cur = con.cursor()
cnt = 0
try:
cnt = cur.execute(sql, param)
con.commit()
except Exception as e:
con.rollback()
logger.error(traceback.format_exc())
logger.error(":{} [param]:{}".format(sql, param))
cur.close()
con.close()
return cnt
def simple_list(rows):
"""
结果集只有一列的情况, 直接使用数据返回
:param rows: [{'id': 1}, {'id': 2}, {'id': 3}]
:return: [1, 2, 3]
"""
if not rows:
return rows
if len(rows[0].keys()) == 1:
simple_list = []
# print(rows[0].keys())
key = list(rows[0].keys())[0]
for row in rows:
simple_list.append(row[key])
return simple_list
return rows
def simple_value(row):
"""
结果集只有一行, 一列的情况, 直接返回数据
:param row: {'count(*)': 3}
:return: 3
"""
if not row:
return None
if len(row.keys()) == 1:
# print(row.keys())
key = list(row.keys())[0]
return row[key]
return row
if __name__ == '__main__':
print("hello everyone!!!")
# print("删表:", execute('drop table test_users'))
sql = '''
CREATE TABLE `test_users` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`email` varchar(255) NOT NULL,
`password` varchar(255) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='测试用的, 可以直接删除';
'''
print("建表:", execute(sql))
# 批量插入
sql_str = "insert into test_users(email, password) values (%s, %s)"
arrays = [
("aaa@126.com", "111111"),
("bbb@126.com", "222222"),
("ccc@126.com", "333333"),
("ddd@126.com", "444444")
]
print("插入数据:", insertmany(sql_str, arrays))
# 查询
print("只取一行:", queryone("select * from test_users limit %s,%s", (0, 1))) #尽量使用limit
print("查询全表:", queryall("select * from test_users"))
# 条件查询
print("一列:", queryall("select email from test_users where id <= %s", 2))
print("多列:", queryall("select * from test_users where email = %s and password = %s", ("bbb@126.com", "222222")))
# 更新|删除
print("更新:", execute("update test_users set email = %s where id = %s", ('new@126.com', 1)))
print("删除:", execute("delete from test_users where id = %s", 4))
# 查询
print("再次查询全表:", queryall("select * from test_users"))
print("数据总数:", queryone("select count(*) from test_users"))
运行结果
hello everyone!!!
删表: 0
建表: 0
插入数据: 4
只取一行: {'id': 1, 'password': '111111', 'email': 'aaa@126.com'}
查询全表: [{'id': 1, 'password': '111111', 'email': 'aaa@126.com'}, {'id': 2, 'password': '222222', 'email': 'bbb@126.com'}, {'id': 3, 'password': '333333', 'email': 'ccc@126.com'}, {'id': 4, 'password': '444444', 'email': 'ddd@126.com'}]
一列: ['aaa@126.com', 'bbb@126.com']
多列: [{'id': 2, 'password': '222222', 'email': 'bbb@126.com'}]
更新: 1
删除: 1
再次查询全表: [{'id': 1, 'password': '111111', 'email': 'new@126.com'}, {'id': 2, 'password': '222222', 'email': 'bbb@126.com'}, {'id': 3, 'password': '333333', 'email': 'ccc@126.com'}]
数据总数: 3
使用方法
db_config.json
是从当前执行的目录进行扫描的, 可以在子目录里
数据库配置好了, 直接运行就能看到效果
删表操作请先确认好了!!! 这里先注释掉
增删改查示例都有, 操作起来也算是比较方便了, 只需要写sql, 传参数, 就基本搞定
默认多行的结果集都是 list[dict] 的, 即使只有一列也是! 所以加了个 simple_list 方法只取数据 list
工具类注意事项:
%s
为 mysql
占位符; 能用 %s
的地方就不要自己拼接 sql
了
sql
里有一个占位符可使用 string
或 number
; 有多个占位符可使用 tuple|list
insertmany
的时候所有字段使用占位符 %s
(预编译), 参数使用 tuple|list
queryall
结果集如果只有一列的情况, 会自动转换为简单的列表 参考:simple_list()
queryone
结果集如果只有一行一列的情况, 自动转为结果数据 参考:simple_value()
insertone
插入一条数据, 返回数据ID
其他
pymysql
由于 sql 都是直接写的, 所以数据库操作非常灵活; 如果做 ORM
的话, 就需要自己手动进行转换; 类似于 java
的 MyBatise
如果你在找 Python
的 ORM
框架的话, SQLAlchemy
应该是个不错的选择; 类似于 Java
的 Hibernate
日志
loggerutils.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# author: xu3352<xu3352@gmail.com>
"""
Python 日志工具包
@see https://blog.csdn.net/chosen0ne/article/details/7319306
"""
import logging
import logging.handlers
log_file = 'output.log'
fmt = '%(asctime)s - %(levelname)s - %(filename)s#%(funcName)s():%(lineno)s - %(name)s - %(message)s'
# 实例化handler
handler = logging.handlers.RotatingFileHandler(log_file, maxBytes=1024 * 1024, backupCount=5)
formatter = logging.Formatter(fmt) # 实例化formatter
handler.setFormatter(formatter) # 为handler添加formatter
logger = logging.getLogger("main") # 获取名为tst的logger
logger.addHandler(handler) # 为logger添加handler
logger.setLevel(logging.DEBUG) # 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
if __name__ == '__main__':
logger.debug("first message debug")
logger.info("first message info")
logger.warning("first message warning")
logger.error("first message error")
logger.critical("first message critical")
2018.05.28 更
- 增加
logger
把错误日志记录到指定的文件里
connect_mysql
改造为 kwargs
形式传参(以后可以传入数据源字典)
- 增加
insertone
方法, 返回主键ID
参考:
实例 2:封装的python mysql操作ORM类
这段时间写项目用了python的mysql模块,觉得sqlalchemy太庞大就自己封装了一个简单的基于mysql-python的ORM文件。功能目前刚好够用以后会慢慢完善。这个操作类中解决了python编码和入mysql库编码不统一的问题,同时还解决了mysql的反斜杠’\’会被转义丢弃的问题。
1、安装mysql-python模块
python的mysql操作需要安装mysql-python库,安装方法如下。
sudo pip install MySQL-python
如果安装的时候报错,提示EnvironmentError: mysql_config not found,请安装libmysqld-dev
2、我的mysql-python封装操作模型
首先是mysql连接配置文件config.py:
#Mysql配置
mysql_host='localhost'
mysql_user='root'
mysql_passwd='12345'
mysql_db='test'
mysql_charset='utf8'
然后是封装的操作类,由于代码比较长,请移步github查看。下面我说下简单的用法:
#!/usr/bin/python
#-*- coding:utf-8 -*-
#在config.py里配置数据库连接
from mysql_conn import myMdb
mydb = myMdb()
#新建表结构
mydb.setTable('testtablename',
"(`test_id` int(10) NOT NULL AUTO_INCREMENT,`test_col1` varchar(255) NOT NULL,`test_col2` varchar(255) NOT NULL, PRIMARY KEY (`test_id`), KEY `test_col1` (`test_col1`), KEY `test_col2` (`test_col2`)) ENGINE=MyISAM DEFAULT CHARSET=utf8 AUTO_INCREMENT=1;"
)
#新增数据记录
tag = mydb._insert({'test_col1':'123','test_col2':'455'})._from('testtablename').execute()
print tag
#修改数据记录
tag = mydb._update({'test_col2':'456'})._from('testtablename')._where({'column':'test_id','data':'1','relation':'='}).execute()
print tag
#删除数据记录
tag = mydb._delete()._from('testtablename')._where({'column':'test_id','data':'3','relation':'<'}).execute()
print tag
#查询前100条数据并排序
rs = mydb._select('*')._from('testtablename')._where({'column':'test_col1','data':'123','relation':'='})._where({'column':'test_col2','data':'456','relation':'!='})._limit(100,1)._order({'test_id':'DESC'}).execute()
print rs
#两表联合查询
rs = mydb._select('a.*')._from('testtablename_1','a')._leftjoin('testtablename_2','b','a.test_col1=b.test_col1')._where({'column':'a.test_col1','data':'123','relation':'='}).execute()
print rs
#支持上下文管理方式调用
with myMdb() as mydb:
tag = mydb._insert({'test_col1':'123','test_col2':'455'})._from('testtablename').execute()
print tag
#打印调试最近一次执行的sql语句
print mydb.getsql
如果使用mysql-python官方扩展的时候报错
OperationalError: (2002, "Can't connect to local MySQL server through socket '/var/lib/mysql/mysql.sock' (2)")
其原因是我将mysql的sock文件已经调整到/tmp/mysql.sock,所以必须在mysqldb中指定sock的路径,unix_socket参数指定了sock文件的路径
mdb.connect(host=config.mysql_host, user=config.mysql_user, passwd=config.mysql_passwd, db=config.mysql_db, charset=config.mysql_charset, unix_socket='/tmp/mysql.sock')
实例 3:python操作mysql,基本的curd操作,连贯操作,基于MySQLdb。
使用示例
引入文件
from Db import mysql
实例化一个连接
test_db = {"host": "localhost", "user": "root", "passwd": "123456", "db": "dev", "port": "3306", "charset": "utf8"}
# 默认端口是3306,默认编码是utf-8。可以不指定,更多配置信息参考MySQLdb
mysql = mysql(**test_db)
# OR
# mysql = mysql(host='localhost', user='root', passwd='123456', db='dev', port='3306', charset='utf8')
查询一条记录
所有操作都必须调用table()
print(mysql.table("test").selectOne())
条件查询
print(mysql.table("test").where("name='asd'").selectALl())
选择字段
print(mysql.table("test").selectAll('name'))
更新
mysql.table("test").where("name='asd'").update("point=100")
插入
# data为数据关联的dict
data = {"name": "John", "point": 98}
mysql.table("test").insert(data)
批量插入
# data为数据关联的list
data = [{"name": "Lilei", "point": 69}, {"name": "xiaohong", "point": 58}]
mysql.table("test").insert_batch(data)
删除
mysql.table("test").where('id=1').delete()
limit
print(mysql.table("test").limit(0,10).selectAll())
# 转化为mysql语句为
# SELECT * FROM test WHERE 1=1 LIMIT 0, 10;
order by
print(mysql.table("test").order('point desc').selectAll())
# 转化为mysql语句为
# SELECT * FROM test WHERE 1=1 ORDER BY point DESC;
执行原生mysql
mysql.query("SELECT * FROM test WHERE name = 'asd' ")
Db.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# pip install mysqlclient
import traceback
import MySQLdb
import configs
import pdb
class mysql(object):
def __init__(self, **kwargs):
"""
构造函数
:param kwargs:
"""
if not kwargs: kwargs = configs.DBINFO
self.conn_config = kwargs
self.conn = None
self.cur = None
self.preix = self.conn_config.get('prefix')
self.__setParam()
self.connect()
def __del__(self):
"""析构函数"""
self.disconnect()
def connect(self):
"""
连接数据库
:return:
"""
# pdb.set_trace()
if self.conn is None:
self.conn = MySQLdb.connect(host=self.conn_config.get('host'), user=self.conn_config.get('user'),
passwd=self.conn_config.get('passwd'), db=self.conn_config.get('db'),
port=int(self.conn_config.get('port')) if self.conn_config.get('port') else 3306,
charset=self.conn_config.get('charset') if self.conn_config.get('charset') else 'utf8')
self.cur = self.conn.cursor(cursorclass=MySQLdb.cursors.DictCursor)
def disconnect(self):
"""
断开连接
:return:
"""
if self.conn is not None:
self.__setParam()
self.conn.close()
self.conn = None
def __err(self, sql=None):
if self.conn is not None:
self.__setParam()
self.conn.rollback()
exit("MySqlError: %s SQL Query: %s" % (traceback.format_exc(), sql))
def __setParam(self):
self._where = ''
self._limit = ''
self._order = ''
self._group = ''
def __getParam(self):
extendQuery = ''
if str(self._where).strip(): extendQuery += ' %s' % self._where
if str(self._limit).strip(): extendQuery += ' %s' % self._limit
if str(self._group).strip(): extendQuery += ' %s' % self._group
if str(self._order).strip(): extendQuery += ' %s' % self._order
extendQuery += ';'
return extendQuery
def table(self, table):
"""
选择table
eg: self.table('asd')
:param table:
:return:
"""
self._table = self.preix + table if self.preix else table
return self
def where(self, *args):
"""
where操作
eg:
:param args:
:return:
"""
if len(args) == 0:
param = ' AND 1=1'
elif len(args) == 1:
param = args[0]
else:
param = ' AND '.join(str(item) for item in args)
self._where = ' AND %s' % param
return self
def limit(self, *args):
"""
limit的用法
:param args:
:return:
"""
self._order = '' if len(args) != 2 else ' LIMIT %s,%s' % (args[0], args[1])
return self
def order(self, *args):
"""
oeder by的用法
:param args:
:return:
"""
self._order = '' if len(args) == 0 else ' ORDER BY %s' % ','.join(str(item) for item in args)
return self
def group(self, *args):
"""
group by的用法
:param args:
:return:
"""
self._group = '' if len(args) == 0 else ' GROUP BY %s' % ','.join(str(item) for item in args)
return self
def selectOne(self, *args):
"""
select
eg: db.table('asd').where('a=2').limit(0,1).
:param args:
:return:
"""
try:
param = '*' if len(args) == 0 else ','.join(str(item) for item in args)
sql = "SELECT %s FROM `%s` WHERE 1=1 %s " % (param, self._table, self.__getParam())
self.lastQuery = sql
self.cur.execute(sql)
data = self.cur.fetchone()
# 重置查询条件,下次调用时不会受上次调用的影响
self.__setParam()
self.conn.commit()
return data
except:
self.__err(sql)
finally:
self.disconnect()
def selectAll(self, *args):
"""
查询全部
:param args:
:return:
"""
try:
param = '*' if len(args) == 0 else ','.join(str(item) for item in args)
sql = "SELECT %s FROM `%s` WHERE 1=1 %s " % (param, self._table, self.__getParam())
self.lastQuery = sql
self.cur.execute(sql)
data = self.cur.fetchall()
# 重置查询条件,下次调用时不会受上次调用的影响
self.__setParam()
self.conn.commit()
return list(data)
except:
self.__err(sql)
finally:
self.disconnect()
def insert(self, data):
"""
insert操作
:param data 数据关联的字典:
:return:
0: 没有插入数据
-1: 插入数据失败
> 0 : 即返回insert_id,插入数据成功
"""
# INSERT INTO table_name (列1, 列2,...) VALUES (值1, 值2,....)
# pdb.set_trace()
try:
keys = ""
values = ""
for key, value in data.items():
keys += ",`" + key + '`' if len(keys) > 0 else '`' + key + '`'
values += ",%s" if len(values) > 0 else "%s"
sql = "INSERT INTO `" + self._table + "` (" + keys + ") VALUES (" + values + ")"
self.cur.execute(sql, tuple(data.values()))
insert_id = self.conn.insert_id()
self.lastQuery = sql % tuple(data.values())
self.conn.commit()
return insert_id
except:
self.__err(sql)
finally:
self.disconnect()
def insert_batch(self, data):
"""
批量插入
:param data:
:return:
"""
try:
keys = ""
values = ""
args = range(len(data))
for key, value in data[0].iteritems():
keys += ",`" + key + '`' if len(keys) > 0 else '`' + key + '`'
values += ",%s" if len(values) > 0 else "%s"
sql = "INSERT INTO `" + self._table + "` (" + keys + ") VALUES (" + values + ")"
i = 0
for row in data:
args[i] = tuple(data[i].values())
i += 1
self.cur.executemany(sql, args)
self.lastQuery = sql % args
rowcount = self.cur.rowcount
self.conn.commit()
return rowcount
except:
self.__err(sql)
finally:
self.disconnect()
def update(self, **kwargs):
"""
更新
:param kwargs:
:return:
0: 没有更新数据
-1: 更新失败
1: 更新成功
"""
# 更新某一行的某一列
# UPDATE Person SET FirstName = 'Fred' WHERE LastName = 'Wilson'
# 更新某一行的若干列
# UPDATE Person SET Address = 'Zhongshan 23', City = 'Nanjing' WHERE LastName = 'Wilson'
try:
if len(kwargs) == 0:
return 0
else:
sqlQuery = list()
for item, key in kwargs.iteritems():
sqlQuery.append('`' + item + '`=' + str(key))
extendQuery = ''
if str(self._where).strip():
extendQuery += self._where
extendQuery += ';'
sql = 'UPDATE `%s` SET %s WHERE 1=1 %s' % (self._table, ','.join(sqlQuery), extendQuery)
self.lastQuery = sql
self.cur.execute(sql)
self.conn.commit()
return 1
except:
self.__err(sql)
finally:
self.disconnect()
def delete(self):
try:
paramQuery = self._where + ';' if str(self._where).strip() else ';'
sql = 'DELETE FROM `%s` WHERE 1=1 %s' % (self._table, paramQuery)
self.lastQuery = sql
self.cur.execute(sql)
self.conn.commit()
except:
self.__err(sql)
finally:
self.disconnect()
def query(self, sql):
"""
执行sql语句
:param sql:
:return:
"""
try:
self.lastQuery = sql
result = self.cur.execute(sql)
self.conn.commit()
return result
except:
self.__err(sql)
finally:
self.disconnect()
def findAll(self, *args):
return self.selectAll(*args)
def findOne(self, *args):
return self.selectOne(*args)
实例 4:
新的mysql数据库操作连接类
在上面的操作类的基础上重新封装了一下,类视图如下
Python:Mysql连库及简单封装使用, python mysql操作类
FIND_BY_SQL = "findBySql" # 根据sql查找
COUNT_BY_SQL = "countBySql" # 自定义sql 统计影响行数
INSERT = "insert" # 插入
UPDATE_BY_ATTR = "updateByAttr" # 更新数据
DELETE_BY_ATTR = "deleteByAttr" # 删除数据
FIND_BY_ATTR = "findByAttr" # 根据条件查询一条记录
FIND_ALL_BY_ATTR = "findAllByAttr" #根据条件查询多条记录
COUNT = "count" # 统计行
EXIST = "exist" # 是否存在该记录
import mysql.connector
import mysql.connector.errors
from common.customConst import Const
class MySQLet:
"""Connection to a MySQL"""
# def __init__(self,user='',password='',database='',charset=None,port=3306):
def __init__(self,**kwargs):
try:
self._conn = mysql.connector.connect(host=kwargs["host"], user=kwargs["user"], password=kwargs["password"],
charset=kwargs["charset"], database=kwargs["database"], port=kwargs["port"])
self.__cursor = None
print("连接数据库")
#set charset charset = ('latin1','latin1_general_ci')
except mysql.connector.errors.ProgrammingError as err:
print('mysql连接错误:' + err.msg)
# def findBySql(self, sql, params={}, limit=0, join='AND'):
def findBySql(self, **kwargs):
"""
自定义sql语句查找
limit = 是否需要返回多少行
params = dict(field=value)
join = 'AND | OR'
"""
cursor = self.__getCursor()
# sql = self.__joinWhere(kwargs["sql"], kwargs["params"], kwargs["join"])
if kwargs.get("join", 0) == 0: kwargs["join"] = "AND"
sql = self.__joinWhere(**kwargs)
cursor.execute(sql, tuple(kwargs["params"].values()))
rows = cursor.fetchmany(size=kwargs["limit"]) if kwargs["limit"] > 0 else cursor.fetchall()
result = [dict(zip(cursor.column_names,row)) for row in rows] if rows else None
return len(result)
# def countBySql(self,sql,params = {},join = 'AND'):
def countBySql(self, **kwargs):
"""自定义sql 统计影响行数"""
if kwargs.get("join", 0) == 0: kwargs["join"] = "AND"
cursor = self.__getCursor()
# sql = self.__joinWhere(kwargs["sql"], kwargs["params"], kwargs["join"])
sql = self.__joinWhere(**kwargs)
cursor.execute(sql, tuple(kwargs["params"].values()))
result = cursor.fetchall() # fetchone是一条记录, fetchall 所有记录
return len(result) if result else 0
# def insert(self,table,data):
def insert(self, **kwargs):
"""新增一条记录
table: 表名
data: dict 插入的数据
"""
fields = ','.join('`'+k+'`' for k in kwargs["data"].keys())
values = ','.join(("%s", ) * len(kwargs["data"]))
sql = 'INSERT INTO `%s` (%s) VALUES (%s)' % (kwargs["table"], fields, values)
cursor = self.__getCursor()
cursor.execute(sql, tuple(kwargs["data"].values()))
insert_id = cursor.lastrowid
self._conn.commit()
return insert_id
# def updateByAttr(self,table,data,params={},join='AND'):
def updateByAttr(self, **kwargs):
# """更新数据"""
if kwargs.get("params", 0) == 0:
kwargs["params"] = {}
if kwargs.get("join", 0) == 0:
kwargs["join"] = "AND"
fields = ','.join('`' + k + '`=%s' for k in kwargs["data"].keys())
values = list(kwargs["data"].values())
values.extend(list(kwargs["params"].values()))
sql = "UPDATE `%s` SET %s " % (kwargs["table"], fields)
kwargs["sql"] = sql
sql = self.__joinWhere(**kwargs)
cursor = self.__getCursor()
cursor.execute(sql, tuple(values))
self._conn.commit()
return cursor.rowcount
# def updateByPk(self,table,data,id,pk='id'):
def updateByPk(self, **kwargs):
"""根据主键更新,默认是id为主键"""
return self.updateByAttr(**kwargs)
# def deleteByAttr(self,table,params={},join='AND'):
def deleteByAttr(self, **kwargs):
"""删除数据"""
if kwargs.get("params", 0) == 0:
kwargs["params"] = {}
if kwargs.get("join", 0) == 0:
kwargs["join"] = "AND"
# fields = ','.join('`'+k+'`=%s' for k in kwargs["params"].keys())
sql = "DELETE FROM `%s` " % kwargs["table"]
kwargs["sql"] = sql
# sql = self.__joinWhere(sql, kwargs["params"], kwargs["join"])
sql = self.__joinWhere(**kwargs)
cursor = self.__getCursor()
cursor.execute(sql, tuple(kwargs["params"].values()))
self._conn.commit()
return cursor.rowcount
# def deleteByPk(self,table,id,pk='id'):
def deleteByPk(self, **kwargs):
"""根据主键删除,默认是id为主键"""
return self.deleteByAttr(**kwargs)
# def findByAttr(self,table,criteria = {}):
def findByAttr(self, **kwargs):
"""根據條件查找一條記錄"""
return self.__query(**kwargs)
# def findByPk(self,table,id,pk='id'):
def findByPk(self, **kwargs):
return self.findByAttr(**kwargs)
# def findAllByAttr(self,table,criteria={}, whole=true):
def findAllByAttr(self, **kwargs):
"""根據條件查找記錄"""
return self.__query(**kwargs)
# def count(self,table,params={},join='AND'):
def count(self, **kwargs):
"""根据条件统计行数"""
if kwargs.get("join", 0) == 0: kwargs["join"] = "AND"
sql = 'SELECT COUNT(*) FROM `%s`' % kwargs["table"]
# sql = self.__joinWhere(sql, kwargs["params"], kwargs["join"])
kwargs["sql"] = sql
sql = self.__joinWhere(**kwargs)
cursor = self.__getCursor()
cursor.execute(sql, tuple(kwargs["params"].values()))
result = cursor.fetchone()
return result[0] if result else 0
# def exist(self,table,params={},join='AND'):
def exist(self, **kwargs):
"""判断是否存在"""
return self.count(**kwargs) > 0
def close(self):
"""关闭游标和数据库连接"""
if self.__cursor is not None:
self.__cursor.close()
self._conn.close()
def __getCursor(self):
"""获取游标"""
if self.__cursor is None:
self.__cursor = self._conn.cursor()
return self.__cursor
# def __joinWhere(self,sql,params,join):
def __joinWhere(self, **kwargs):
"""转换params为where连接语句"""
if kwargs["params"]:
keys,_keys = self.__tParams(**kwargs)
where = ' AND '.join(k+'='+_k for k,_k in zip(keys,_keys)) if kwargs["join"] == 'AND' else ' OR '.join(k+'='+_k for k,_k in zip(keys,_keys))
kwargs["sql"]+=' WHERE ' + where
return kwargs["sql"]
# def __tParams(self,params):
def __tParams(self, **kwargs):
keys = ['`'+k+'`' for k in kwargs["params"].keys()]
_keys = ['%s' for k in kwargs["params"].keys()]
return keys,_keys
# def __query(self,table,criteria,whole=False):
def __query(self, **kwargs):
if kwargs.get("whole", False) == False or kwargs["whole"] is not True:
kwargs["whole"] = False
kwargs["criteria"]['limit'] = 1
# sql = self.__contact_sql(kwargs["table"], kwargs["criteria"])
sql = self.__contact_sql(**kwargs)
cursor = self.__getCursor()
cursor.execute(sql)
rows = cursor.fetchall() if kwargs["whole"] else cursor.fetchone()
result = [dict(zip(cursor.column_names, row)) for row in rows] if kwargs["whole"] else dict(zip(cursor.column_names, rows)) if rows else None
return result
# def __contact_sql(self,table,criteria):
def __contact_sql(self, **kwargs):
sql = 'SELECT '
if kwargs["criteria"] and type(kwargs["criteria"]) is dict:
#select fields
if 'select' in kwargs["criteria"]:
fields = kwargs["criteria"]['select'].split(',')
sql+= ','.join('`'+field+'`' for field in fields)
else:
sql+=' * '
#table
sql+=' FROM `%s`'% kwargs["table"]
#where
if 'where' in kwargs["criteria"]:
sql+=' WHERE '+ kwargs["criteria"]['where']
#group by
if 'group' in kwargs["criteria"]:
sql+=' GROUP BY '+ kwargs["criteria"]['group']
#having
if 'having' in kwargs["criteria"]:
sql+=' HAVING '+ kwargs["criteria"]['having']
#order by
if 'order' in kwargs["criteria"]:
sql+=' ORDER BY '+ kwargs["criteria"]['order']
#limit
if 'limit' in kwargs["criteria"]:
sql+=' LIMIT '+ str(kwargs["criteria"]['limit'])
#offset
if 'offset' in kwargs["criteria"]:
sql+=' OFFSET '+ str(kwargs["criteria"]['offset'])
else:
sql+=' * FROM `%s`'% kwargs["table"]
return sql
def findKeySql(self, key ,**kwargs):
sqlOperate = {
Const.COUNT: lambda: self.count(**kwargs),
Const.COUNT_BY_SQL: lambda: self.countBySql(**kwargs),
Const.DELETE_BY_ATTR: lambda: self.deleteByAttr(**kwargs),
Const.EXIST: lambda: self.exist(**kwargs),
Const.FIND_ALL_BY_ATTR: lambda: self.findAllByAttr(**kwargs),
Const.INSERT: lambda: self.insert(**kwargs),
Const.FIND_BY_ATTR: lambda: self.findByAttr(**kwargs),
Const.UPDATE_BY_ATTR: lambda: self.updateByAttr(**kwargs),
Const.FIND_BY_SQL: lambda: self.findBySql(**kwargs)
}
return sqlOperate[key]()
if __name__ == "__main__":
mysqlet = MySQLet(host="127.0.0.1", user="root", password="", charset="utf8", database="userinfo", port=3306)
# 根据字段统计count, join>>AND,OR,可以不传,默认为AND
print(mysqlet.findKeySql(Const.COUNT, table="info", params={"id": "11", "name": "666"}, join="OR"))
# 自定义sql语句统计count
print(mysqlet.findKeySql(Const.COUNT_BY_SQL, sql="select * from info", params={"name": "666"}, join="AND"))
#插入数据
print(mysqlet.findKeySql(Const.INSERT, table="info", data={"name":"333", "pwd": "111"}))
#根据字段删除,不传params参数,就是删除全部
print(mysqlet.findKeySql(Const.DELETE_BY_ATTR, table="info", params={"id": 20}))
# 查找是否存在该记录,不传params参数,就是查找全部.join同上
print(mysqlet.findKeySql(Const.EXIST, table="info", params={"id": 180},join='AND'))
#根据字段查找多条记录,whole不传就查一条记录,criteria里面可以传where,group by,having,order by,limt,offset
print(mysqlet.findKeySql(Const.FIND_ALL_BY_ATTR, table="info", criteria= {"where": "name=333"}, whole=True))
# 根据字段查一条记录,和上面的查多条记录参数基本一样,少了个whole参数
print(mysqlet.findKeySql(Const.FIND_BY_ATTR, table="info", criteria= {"where": "name=333"}))
# 根据字段更新数据库中的记录,join可以传AND,OR,不传默认取AND
print(mysqlet.findKeySql(Const.UPDATE_BY_ATTR, table="info",data={"name": "-09"}, params={"id": 18, "name": "333"}, join='AND'))
# 根据自定义sql语句查询记录,limit:0表示所有记录,join:AND|OR.不传取AND
print(mysqlet.findKeySql(Const.FIND_BY_SQL, sql="select * from info", params={"name": "333", "id": 18}, limit=0))
本文:Python:Mysql连库及简单封装使用, python mysql操作类
