Python:Mysql连库及简单封装使用, python mysql操作类

 

 

实例 1:
通过 db_config.json 加载数据库配置; 常规的增删改查进行封装

 

代码

连库配置: db_config.json

{
  "host": "192.168.7.251",
  "user": "root",
  "password": "123456",
  "db": "mars",
  "charset": "utf8",
  "port": 3306
}

封装工具类: mysqlutils.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# author: xu3352<xu3352@gmail.com>
# python3 环境
"""
Python Mysql 工具包
1. 通过 db_config.json 加载数据库配置
2. 常规的增删改查进行封装

注意事项:
1. %s 为 mysql 占位符; 能用 %s 的地方就不要自己拼接 sql 了
2. sql 里有一个占位符可使用 string 或 number; 有多个占位符可使用 tuple|list
3. insertmany 的时候所有字段使用占位符 %s (预编译), 参数使用 tuple|list
4. queryall 结果集只有一列的情况, 会自动转换为简单的列表 参考:simple_list()
5. queryone 结果集只有一行一列的情况, 自动转为结果数据 参考:simple_value()
6. insertone 插入一条数据, 返回数据ID
"""
import os
import json
import traceback

import pymysql.cursors

from utils import loggerutils

logger = loggerutils.logger


def find(name, path):
    """ 查找文件路径 """
    for root, dirs, files in os.walk(path):
        if name in files:
            return os.path.join(root, name)


def connect_mysql():
    """ 创建链接 """
    try:
        config = find("db_config.json", os.path.abspath("."))
        with open(config, "r") as file:
            load_dict = json.load(file)
        return pymysql.connect(cursorclass=pymysql.cursors.DictCursor, **load_dict)
    except Exception as e:
        logger.error(traceback.format_exc())
        logger.error("cannot create mysql connect")


def queryone(sql, param=None):
    """
    返回结果集的第一条数据
    :param sql: sql语句
    :param param: string|tuple|list
    :return: 字典列表 [{}]
    """
    con = connect_mysql()
    cur = con.cursor()

    row = None
    try:
        cur.execute(sql, param)
        row = cur.fetchone()
    except Exception as e:
        con.rollback()
        logger.error(traceback.format_exc())
        logger.error("[sql]:{} [param]:{}".format(sql, param))

    cur.close()
    con.close()
    return simple_value(row)


def queryall(sql, param=None):
    """
    返回所有查询到的内容 (分页要在sql里写好)
    :param sql: sql语句
    :param param: tuple|list
    :return: 字典列表 [{},{},{}...] or [,,,]
    """
    con = connect_mysql()
    cur = con.cursor()

    rows = None
    try:
        cur.execute(sql, param)
        rows = cur.fetchall()
    except Exception as e:
        con.rollback()
        logger.error(traceback.format_exc())
        logger.error("[sql]:{} [param]:{}".format(sql, param))

    cur.close()
    con.close()
    return simple_list(rows)


def insertmany(sql, arrays=None):
    """
    批量插入数据
    :param sql: sql语句
    :param arrays: list|tuple [(),(),()...]
    :return: 入库数量
    """
    con = connect_mysql()
    cur = con.cursor()

    cnt = 0
    try:
        cnt = cur.executemany(sql, arrays)
        con.commit()
    except Exception as e:
        con.rollback()
        logger.error(traceback.format_exc())
        logger.error("[sql]:{} [param]:{}".format(sql, arrays))

    cur.close()
    con.close()
    return cnt


def insertone(sql, param=None):
    """
    插入一条数据
    :param sql: sql语句
    :param param: string|tuple
    :return: id
    """
    con = connect_mysql()
    cur = con.cursor()

    lastrowid = 0
    try:
        cur.execute(sql, param)
        con.commit()
        lastrowid = cur.lastrowid
    except Exception as e:
        con.rollback()
        logger.error(traceback.format_exc())
        logger.error("[sql]:{} [param]:{}".format(sql, param))

    cur.close()
    con.close()
    return lastrowid


def execute(sql, param=None):
    """
    执行sql语句:修改或删除
    :param sql: sql语句
    :param param: string|list
    :return: 影响数量
    """
    con = connect_mysql()
    cur = con.cursor()

    cnt = 0
    try:
        cnt = cur.execute(sql, param)
        con.commit()
    except Exception as e:
        con.rollback()
        logger.error(traceback.format_exc())
        logger.error("[sql]:{} [param]:{}".format(sql, param))

    cur.close()
    con.close()
    return cnt


def simple_list(rows):
    """
    结果集只有一列的情况, 直接使用数据返回
    :param rows: [{'id': 1}, {'id': 2}, {'id': 3}]
    :return: [1, 2, 3]
    """
    if not rows:
        return rows

    if len(rows[0].keys()) == 1:
        simple_list = []
        # print(rows[0].keys())
        key = list(rows[0].keys())[0]
        for row in rows:
            simple_list.append(row[key])
        return simple_list

    return rows


def simple_value(row):
    """
    结果集只有一行, 一列的情况, 直接返回数据
    :param row: {'count(*)': 3}
    :return: 3
    """
    if not row:
        return None

    if len(row.keys()) == 1:
        # print(row.keys())
        key = list(row.keys())[0]
        return row[key]

    return row


if __name__ == '__main__':
    print("hello everyone!!!")

    # print("删表:", execute('drop table test_users'))

    sql = '''
            CREATE TABLE `test_users` (
              `id` int(11) NOT NULL AUTO_INCREMENT,
              `email` varchar(255) NOT NULL,
              `password` varchar(255) NOT NULL,
              PRIMARY KEY (`id`)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='测试用的, 可以直接删除';
            '''
    print("建表:", execute(sql))

    # 批量插入
    sql_str = "insert into test_users(email, password) values (%s, %s)"
    arrays = [
        ("aaa@126.com", "111111"),
        ("bbb@126.com", "222222"),
        ("ccc@126.com", "333333"),
        ("ddd@126.com", "444444")
    ]
    print("插入数据:", insertmany(sql_str, arrays))

    # 查询
    print("只取一行:", queryone("select * from test_users limit %s,%s", (0, 1)))    #尽量使用limit
    print("查询全表:", queryall("select * from test_users"))

    # 条件查询
    print("一列:", queryall("select email from test_users where id <= %s", 2))
    print("多列:", queryall("select * from test_users where email = %s and password = %s", ("bbb@126.com", "222222")))

    # 更新|删除
    print("更新:", execute("update test_users set email = %s where id = %s", ('new@126.com', 1)))
    print("删除:", execute("delete from test_users where id = %s", 4))

    # 查询
    print("再次查询全表:", queryall("select * from test_users"))
    print("数据总数:", queryone("select count(*) from test_users"))

 

 

运行结果

hello everyone!!!
删表: 0
建表: 0
插入数据: 4
只取一行: {'id': 1, 'password': '111111', 'email': 'aaa@126.com'}
查询全表: [{'id': 1, 'password': '111111', 'email': 'aaa@126.com'}, {'id': 2, 'password': '222222', 'email': 'bbb@126.com'}, {'id': 3, 'password': '333333', 'email': 'ccc@126.com'}, {'id': 4, 'password': '444444', 'email': 'ddd@126.com'}]
一列: ['aaa@126.com', 'bbb@126.com']
多列: [{'id': 2, 'password': '222222', 'email': 'bbb@126.com'}]
更新: 1
删除: 1
再次查询全表: [{'id': 1, 'password': '111111', 'email': 'new@126.com'}, {'id': 2, 'password': '222222', 'email': 'bbb@126.com'}, {'id': 3, 'password': '333333', 'email': 'ccc@126.com'}]
数据总数: 3

 

使用方法

db_config.json 是从当前执行的目录进行扫描的, 可以在子目录里

数据库配置好了, 直接运行就能看到效果

删表操作请先确认好了!!! 这里先注释掉

增删改查示例都有, 操作起来也算是比较方便了, 只需要写sql, 传参数, 就基本搞定

默认多行的结果集都是 list[dict] 的, 即使只有一列也是! 所以加了个 simple_list 方法只取数据 list

 

工具类注意事项:

  1. %s 为 mysql 占位符; 能用 %s 的地方就不要自己拼接 sql 了
  2. sql 里有一个占位符可使用 string 或 number; 有多个占位符可使用 tuple|list
  3. insertmany 的时候所有字段使用占位符 %s (预编译), 参数使用 tuple|list
  4. queryall 结果集如果只有一列的情况, 会自动转换为简单的列表 参考:simple_list()
  5. queryone 结果集如果只有一行一列的情况, 自动转为结果数据 参考:simple_value()
  6. insertone 插入一条数据, 返回数据ID

 

其他

pymysql 由于 sql 都是直接写的, 所以数据库操作非常灵活; 如果做 ORM 的话, 就需要自己手动进行转换; 类似于 java 的 MyBatise

如果你在找 Python 的 ORM 框架的话, SQLAlchemy 应该是个不错的选择; 类似于 Java 的 Hibernate

 

日志

loggerutils.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# author: xu3352<xu3352@gmail.com>
"""
Python 日志工具包
@see https://blog.csdn.net/chosen0ne/article/details/7319306
"""
import logging
import logging.handlers

log_file = 'output.log'
fmt = '%(asctime)s - %(levelname)s - %(filename)s#%(funcName)s():%(lineno)s - %(name)s - %(message)s'

# 实例化handler
handler = logging.handlers.RotatingFileHandler(log_file, maxBytes=1024 * 1024, backupCount=5)
formatter = logging.Formatter(fmt)  # 实例化formatter
handler.setFormatter(formatter)     # 为handler添加formatter

logger = logging.getLogger("main")  # 获取名为tst的logger
logger.addHandler(handler)          # 为logger添加handler
logger.setLevel(logging.DEBUG)      # 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'


if __name__ == '__main__':
    logger.debug("first message debug")
    logger.info("first message info")
    logger.warning("first message warning")
    logger.error("first message error")
    logger.critical("first message critical")

 

2018.05.28 更

  • 增加 logger 把错误日志记录到指定的文件里
  • connect_mysql 改造为 kwargs 形式传参(以后可以传入数据源字典)
  • 增加 insertone 方法, 返回主键ID

参考:

 

 

实例 2:封装的python mysql操作ORM类

 

这段时间写项目用了python的mysql模块,觉得sqlalchemy太庞大就自己封装了一个简单的基于mysql-python的ORM文件。功能目前刚好够用以后会慢慢完善。这个操作类中解决了python编码和入mysql库编码不统一的问题,同时还解决了mysql的反斜杠’\’会被转义丢弃的问题。

 

1、安装mysql-python模块

python的mysql操作需要安装mysql-python库,安装方法如下。

sudo pip install MySQL-python

如果安装的时候报错,提示EnvironmentError: mysql_config not found,请安装libmysqld-dev

 

2、我的mysql-python封装操作模型

首先是mysql连接配置文件config.py:

#Mysql配置
mysql_host='localhost'
mysql_user='root'
mysql_passwd='12345'
mysql_db='test'
mysql_charset='utf8'

然后是封装的操作类,由于代码比较长,请移步github查看。下面我说下简单的用法:

#!/usr/bin/python
#-*- coding:utf-8 -*-
#在config.py里配置数据库连接
from mysql_conn import myMdb

mydb = myMdb()

#新建表结构
mydb.setTable('testtablename',
            "(`test_id` int(10) NOT NULL AUTO_INCREMENT,`test_col1` varchar(255) NOT NULL,`test_col2` varchar(255) NOT NULL, PRIMARY KEY (`test_id`), KEY `test_col1` (`test_col1`), KEY `test_col2` (`test_col2`)) ENGINE=MyISAM DEFAULT CHARSET=utf8 AUTO_INCREMENT=1;"
            )

#新增数据记录
tag = mydb._insert({'test_col1':'123','test_col2':'455'})._from('testtablename').execute()
print tag

#修改数据记录
tag = mydb._update({'test_col2':'456'})._from('testtablename')._where({'column':'test_id','data':'1','relation':'='}).execute()
print tag

#删除数据记录
tag = mydb._delete()._from('testtablename')._where({'column':'test_id','data':'3','relation':'<'}).execute()
print tag

#查询前100条数据并排序
rs = mydb._select('*')._from('testtablename')._where({'column':'test_col1','data':'123','relation':'='})._where({'column':'test_col2','data':'456','relation':'!='})._limit(100,1)._order({'test_id':'DESC'}).execute()
print rs

#两表联合查询
rs = mydb._select('a.*')._from('testtablename_1','a')._leftjoin('testtablename_2','b','a.test_col1=b.test_col1')._where({'column':'a.test_col1','data':'123','relation':'='}).execute()
print rs

#支持上下文管理方式调用
with myMdb() as mydb:
    tag = mydb._insert({'test_col1':'123','test_col2':'455'})._from('testtablename').execute()
print tag

#打印调试最近一次执行的sql语句
print mydb.getsql

如果使用mysql-python官方扩展的时候报错

OperationalError: (2002, "Can't connect to local MySQL server through socket '/var/lib/mysql/mysql.sock' (2)")

其原因是我将mysql的sock文件已经调整到/tmp/mysql.sock,所以必须在mysqldb中指定sock的路径,unix_socket参数指定了sock文件的路径

mdb.connect(host=config.mysql_host, user=config.mysql_user, passwd=config.mysql_passwd, db=config.mysql_db, charset=config.mysql_charset, unix_socket='/tmp/mysql.sock')

 

 

实例 3:python操作mysql,基本的curd操作,连贯操作,基于MySQLdb。

使用示例

 

引入文件

from Db import mysql

 

实例化一个连接

test_db = {"host": "localhost", "user": "root", "passwd": "123456", "db": "dev", "port": "3306", "charset": "utf8"}
# 默认端口是3306,默认编码是utf-8。可以不指定,更多配置信息参考MySQLdb
mysql = mysql(**test_db)

# OR
# mysql = mysql(host='localhost', user='root', passwd='123456', db='dev', port='3306', charset='utf8')

 

查询一条记录

所有操作都必须调用table()

print(mysql.table("test").selectOne())

 

条件查询

print(mysql.table("test").where("name='asd'").selectALl())

 

选择字段

print(mysql.table("test").selectAll('name'))

 

更新

mysql.table("test").where("name='asd'").update("point=100")

 

插入

# data为数据关联的dict
data = {"name": "John", "point": 98}
mysql.table("test").insert(data)

 

批量插入

# data为数据关联的list
data = [{"name": "Lilei", "point": 69}, {"name": "xiaohong", "point": 58}]
mysql.table("test").insert_batch(data)

 

删除

mysql.table("test").where('id=1').delete()

 

limit

print(mysql.table("test").limit(0,10).selectAll())
# 转化为mysql语句为
# SELECT * FROM test WHERE 1=1 LIMIT 0, 10;

 

order by

print(mysql.table("test").order('point desc').selectAll())
# 转化为mysql语句为
# SELECT * FROM test WHERE 1=1 ORDER BY point DESC;

 

执行原生mysql

mysql.query("SELECT * FROM test WHERE name = 'asd' ")

 

Db.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# pip install mysqlclient

import traceback
import MySQLdb
import configs
import pdb

class mysql(object):

    def __init__(self, **kwargs):
        """
        构造函数
        :param kwargs:
        """
        if not kwargs: kwargs = configs.DBINFO

        self.conn_config = kwargs
        self.conn = None
        self.cur = None
        self.preix = self.conn_config.get('prefix')
        self.__setParam()
        self.connect()

    def __del__(self):
        """析构函数"""
        self.disconnect()

    def connect(self):
        """
        连接数据库
        :return:
        """
        # pdb.set_trace()
        if self.conn is None:
            self.conn = MySQLdb.connect(host=self.conn_config.get('host'), user=self.conn_config.get('user'),
                                        passwd=self.conn_config.get('passwd'), db=self.conn_config.get('db'),
                                        port=int(self.conn_config.get('port')) if self.conn_config.get('port') else 3306,
                                        charset=self.conn_config.get('charset') if self.conn_config.get('charset') else 'utf8')
            self.cur = self.conn.cursor(cursorclass=MySQLdb.cursors.DictCursor)

    def disconnect(self):
        """
        断开连接
        :return:
        """
        if self.conn is not None:
            self.__setParam()
            self.conn.close()
            self.conn = None

    def __err(self, sql=None):
        if self.conn is not None:
            self.__setParam()
            self.conn.rollback()
        exit("MySqlError: %s SQL Query: %s" % (traceback.format_exc(), sql))

    def __setParam(self):
        self._where = ''
        self._limit = ''
        self._order = ''
        self._group = ''

    def __getParam(self):
        extendQuery = ''
        if str(self._where).strip(): extendQuery += ' %s' % self._where
        if str(self._limit).strip(): extendQuery += ' %s' % self._limit
        if str(self._group).strip(): extendQuery += ' %s' % self._group
        if str(self._order).strip(): extendQuery += ' %s' % self._order
        extendQuery += ';'
        return extendQuery

    def table(self, table):
        """
        选择table
        eg: self.table('asd')
        :param table:
        :return:
        """
        self._table = self.preix + table if self.preix else table
        return self

    def where(self, *args):
        """
        where操作
        eg:
        :param args:
        :return:
        """
        if len(args) == 0:
            param = ' AND 1=1'
        elif len(args) == 1:
            param = args[0]
        else:
            param = ' AND '.join(str(item) for item in args)
        self._where = ' AND %s' % param
        return self

    def limit(self, *args):
        """
        limit的用法
        :param args:
        :return:
        """
        self._order = '' if len(args) != 2 else ' LIMIT %s,%s' % (args[0], args[1])
        return self

    def order(self, *args):
        """
        oeder by的用法
        :param args:
        :return:
        """
        self._order = '' if len(args) == 0 else ' ORDER BY %s' % ','.join(str(item) for item in args)
        return self

    def group(self, *args):
        """
        group by的用法
        :param args:
        :return:
        """

        self._group = '' if len(args) == 0 else ' GROUP BY %s' % ','.join(str(item) for item in args)
        return self

    def selectOne(self, *args):
        """
        select
        eg: db.table('asd').where('a=2').limit(0,1).
        :param args:
        :return:
        """
        try:
            param = '*' if len(args) == 0 else ','.join(str(item) for item in args)
            sql = "SELECT %s FROM `%s` WHERE 1=1 %s " % (param, self._table, self.__getParam())
            self.lastQuery = sql
            self.cur.execute(sql)
            data = self.cur.fetchone()
            # 重置查询条件,下次调用时不会受上次调用的影响
            self.__setParam()
            self.conn.commit()
            return data
        except:
            self.__err(sql)
        finally:
            self.disconnect()

    def selectAll(self, *args):
        """
        查询全部
        :param args:
        :return:
        """
        try:
            param = '*' if len(args) == 0 else ','.join(str(item) for item in args)
            sql = "SELECT %s FROM `%s` WHERE 1=1 %s " % (param, self._table, self.__getParam())
            self.lastQuery = sql
            self.cur.execute(sql)
            data = self.cur.fetchall()
            # 重置查询条件,下次调用时不会受上次调用的影响
            self.__setParam()
            self.conn.commit()
            return list(data)
        except:
            self.__err(sql)
        finally:
            self.disconnect()

    def insert(self, data):
        """
        insert操作
        :param data 数据关联的字典:
        :return:
        0: 没有插入数据
        -1: 插入数据失败
        > 0 : 即返回insert_id,插入数据成功
        """
        # INSERT INTO table_name (列1, 列2,...) VALUES (值1, 值2,....)

        # pdb.set_trace()
        try:
            keys = ""
            values = ""
            for key, value in data.items():
                keys += ",`" + key + '`' if len(keys) > 0 else '`' + key + '`'
                values += ",%s" if len(values) > 0 else "%s"
            sql = "INSERT INTO `" + self._table + "` (" + keys + ") VALUES (" + values + ")"
            self.cur.execute(sql, tuple(data.values()))
            insert_id = self.conn.insert_id()
            self.lastQuery = sql % tuple(data.values())
            self.conn.commit()
            return insert_id
        except:
            self.__err(sql)
        finally:
            self.disconnect()

    def insert_batch(self, data):
        """
        批量插入
        :param data:
        :return:
        """
        try:
            keys = ""
            values = ""
            args = range(len(data))
            for key, value in data[0].iteritems():
                keys += ",`" + key + '`' if len(keys) > 0 else '`' + key + '`'
                values += ",%s" if len(values) > 0 else "%s"
            sql = "INSERT INTO `" + self._table + "` (" + keys + ") VALUES (" + values + ")"
            i = 0
            for row in data:
                args[i] = tuple(data[i].values())
                i += 1
            self.cur.executemany(sql, args)
            self.lastQuery = sql % args
            rowcount = self.cur.rowcount
            self.conn.commit()
            return rowcount
        except:
            self.__err(sql)
        finally:
            self.disconnect()

    def update(self, **kwargs):
        """
        更新
        :param kwargs:
        :return:
        0: 没有更新数据
        -1: 更新失败
        1: 更新成功
        """
        # 更新某一行的某一列
        # UPDATE Person SET FirstName = 'Fred' WHERE LastName = 'Wilson'
        # 更新某一行的若干列
        # UPDATE Person SET Address = 'Zhongshan 23', City = 'Nanjing' WHERE LastName = 'Wilson'
        try:
            if len(kwargs) == 0:
                return 0
            else:
                sqlQuery = list()
                for item, key in kwargs.iteritems():
                    sqlQuery.append('`' + item + '`=' + str(key))
            extendQuery = ''
            if str(self._where).strip():
                extendQuery += self._where
            extendQuery += ';'
            sql = 'UPDATE `%s` SET %s WHERE 1=1 %s' % (self._table, ','.join(sqlQuery), extendQuery)
            self.lastQuery = sql
            self.cur.execute(sql)
            self.conn.commit()
            return 1
        except:
            self.__err(sql)
        finally:
            self.disconnect()

    def delete(self):
        try:
            paramQuery = self._where + ';' if str(self._where).strip() else ';'
            sql = 'DELETE FROM `%s` WHERE 1=1 %s' % (self._table, paramQuery)
            self.lastQuery = sql
            self.cur.execute(sql)
            self.conn.commit()
        except:
            self.__err(sql)
        finally:
            self.disconnect()

    def query(self, sql):
        """
        执行sql语句
        :param sql:
        :return:
        """
        try:
            self.lastQuery = sql
            result = self.cur.execute(sql)
            self.conn.commit()
            return result
        except:
            self.__err(sql)
        finally:
            self.disconnect()

    def findAll(self, *args):
        return self.selectAll(*args)

    def findOne(self, *args):
        return self.selectOne(*args)

 

 

实例 4:

新的mysql数据库操作连接类
在上面的操作类的基础上重新封装了一下,类视图如下

Python:Mysql连库及简单封装使用, python mysql操作类
Python:Mysql连库及简单封装使用, python mysql操作类
FIND_BY_SQL = "findBySql" # 根据sql查找
  COUNT_BY_SQL = "countBySql" # 自定义sql 统计影响行数
  INSERT = "insert" # 插入
  UPDATE_BY_ATTR = "updateByAttr" # 更新数据
  DELETE_BY_ATTR = "deleteByAttr" # 删除数据
  FIND_BY_ATTR = "findByAttr" # 根据条件查询一条记录
  FIND_ALL_BY_ATTR = "findAllByAttr"  #根据条件查询多条记录
  COUNT = "count" # 统计行
  EXIST = "exist" # 是否存在该记录


import mysql.connector
import mysql.connector.errors
from common.customConst import Const
class MySQLet:
    """Connection to a MySQL"""
    # def __init__(self,user='',password='',database='',charset=None,port=3306):
    def __init__(self,**kwargs):
        try:
            self._conn = mysql.connector.connect(host=kwargs["host"], user=kwargs["user"], password=kwargs["password"],
                                                 charset=kwargs["charset"], database=kwargs["database"], port=kwargs["port"])
            self.__cursor = None
            print("连接数据库")
            #set charset charset = ('latin1','latin1_general_ci')
        except mysql.connector.errors.ProgrammingError as err:
            print('mysql连接错误:' + err.msg)

    # def findBySql(self, sql, params={}, limit=0, join='AND'):
    def findBySql(self, **kwargs):
        """
        自定义sql语句查找
        limit = 是否需要返回多少行
        params = dict(field=value)
        join = 'AND | OR'
        """
        cursor = self.__getCursor()
        # sql = self.__joinWhere(kwargs["sql"], kwargs["params"], kwargs["join"])
        if kwargs.get("join", 0) == 0: kwargs["join"] = "AND"
        sql = self.__joinWhere(**kwargs)
        cursor.execute(sql, tuple(kwargs["params"].values()))
        rows = cursor.fetchmany(size=kwargs["limit"]) if kwargs["limit"] > 0 else cursor.fetchall()
        result = [dict(zip(cursor.column_names,row)) for row in rows] if rows else None
        return len(result)

    # def countBySql(self,sql,params = {},join = 'AND'):
    def countBySql(self, **kwargs):
        """自定义sql 统计影响行数"""
        if kwargs.get("join", 0) == 0: kwargs["join"] = "AND"
        cursor = self.__getCursor()
        # sql = self.__joinWhere(kwargs["sql"], kwargs["params"], kwargs["join"])
        sql = self.__joinWhere(**kwargs)
        cursor.execute(sql, tuple(kwargs["params"].values()))
        result = cursor.fetchall() # fetchone是一条记录, fetchall 所有记录
        return len(result) if result else 0

    # def insert(self,table,data):
    def insert(self, **kwargs):
        """新增一条记录
          table: 表名
          data: dict 插入的数据
        """
        fields = ','.join('`'+k+'`' for k in kwargs["data"].keys())
        values = ','.join(("%s", ) * len(kwargs["data"]))
        sql = 'INSERT INTO `%s` (%s) VALUES (%s)' % (kwargs["table"], fields, values)
        cursor = self.__getCursor()
        cursor.execute(sql, tuple(kwargs["data"].values()))
        insert_id = cursor.lastrowid
        self._conn.commit()
        return insert_id

    # def updateByAttr(self,table,data,params={},join='AND'):
    def updateByAttr(self, **kwargs):
    #     """更新数据"""
        if kwargs.get("params", 0) == 0:
            kwargs["params"] = {}
        if kwargs.get("join", 0) == 0:
            kwargs["join"] = "AND"
        fields = ','.join('`' + k + '`=%s' for k in kwargs["data"].keys())
        values = list(kwargs["data"].values())


        values.extend(list(kwargs["params"].values()))
        sql = "UPDATE `%s` SET %s " % (kwargs["table"], fields)
        kwargs["sql"] = sql
        sql = self.__joinWhere(**kwargs)
        cursor = self.__getCursor()
        cursor.execute(sql, tuple(values))
        self._conn.commit()
        return cursor.rowcount


    # def updateByPk(self,table,data,id,pk='id'):
    def updateByPk(self, **kwargs):
        """根据主键更新,默认是id为主键"""
        return self.updateByAttr(**kwargs)

    # def deleteByAttr(self,table,params={},join='AND'):
    def deleteByAttr(self, **kwargs):
        """删除数据"""
        if kwargs.get("params", 0) == 0:
            kwargs["params"] = {}
        if kwargs.get("join", 0) == 0:
            kwargs["join"] = "AND"
        # fields = ','.join('`'+k+'`=%s' for k in kwargs["params"].keys())
        sql = "DELETE FROM `%s` " % kwargs["table"]
        kwargs["sql"] = sql
        # sql = self.__joinWhere(sql, kwargs["params"], kwargs["join"])
        sql = self.__joinWhere(**kwargs)
        cursor = self.__getCursor()
        cursor.execute(sql, tuple(kwargs["params"].values()))
        self._conn.commit()
        return cursor.rowcount

    # def deleteByPk(self,table,id,pk='id'):
    def deleteByPk(self, **kwargs):
        """根据主键删除,默认是id为主键"""
        return self.deleteByAttr(**kwargs)

    # def findByAttr(self,table,criteria = {}):
    def findByAttr(self, **kwargs):
        """根據條件查找一條記錄"""
        return self.__query(**kwargs)

    # def findByPk(self,table,id,pk='id'):
    def findByPk(self, **kwargs):
        return self.findByAttr(**kwargs)

    # def findAllByAttr(self,table,criteria={}, whole=true):
    def findAllByAttr(self, **kwargs):
        """根據條件查找記錄"""
        return self.__query(**kwargs)

    # def count(self,table,params={},join='AND'):
    def count(self, **kwargs):
        """根据条件统计行数"""
        if kwargs.get("join", 0) == 0: kwargs["join"] = "AND"
        sql = 'SELECT COUNT(*) FROM `%s`' % kwargs["table"]
        # sql = self.__joinWhere(sql, kwargs["params"], kwargs["join"])
        kwargs["sql"] = sql
        sql = self.__joinWhere(**kwargs)
        cursor = self.__getCursor()
        cursor.execute(sql, tuple(kwargs["params"].values()))
        result = cursor.fetchone()
        return result[0] if result else 0

    # def exist(self,table,params={},join='AND'):
    def exist(self, **kwargs):
        """判断是否存在"""
        return self.count(**kwargs) > 0

    def close(self):
        """关闭游标和数据库连接"""
        if self.__cursor is not None:
            self.__cursor.close()
        self._conn.close()

    def __getCursor(self):
        """获取游标"""
        if self.__cursor is None:
            self.__cursor = self._conn.cursor()
        return self.__cursor

    # def __joinWhere(self,sql,params,join):
    def __joinWhere(self, **kwargs):
        """转换params为where连接语句"""
        if kwargs["params"]:
            keys,_keys = self.__tParams(**kwargs)
            where = ' AND '.join(k+'='+_k for k,_k in zip(keys,_keys)) if kwargs["join"] == 'AND' else ' OR '.join(k+'='+_k for k,_k in zip(keys,_keys))
            kwargs["sql"]+=' WHERE ' + where
        return kwargs["sql"]

    # def __tParams(self,params):
    def __tParams(self, **kwargs):
        keys = ['`'+k+'`' for k in kwargs["params"].keys()]
        _keys = ['%s' for k in kwargs["params"].keys()]
        return keys,_keys

    # def __query(self,table,criteria,whole=False):
    def __query(self, **kwargs):
        if kwargs.get("whole", False) == False or kwargs["whole"] is not True:
            kwargs["whole"] = False
            kwargs["criteria"]['limit'] = 1
        # sql = self.__contact_sql(kwargs["table"], kwargs["criteria"])
        sql = self.__contact_sql(**kwargs)
        cursor = self.__getCursor()
        cursor.execute(sql)
        rows = cursor.fetchall() if kwargs["whole"] else cursor.fetchone()
        result = [dict(zip(cursor.column_names, row)) for row in rows] if kwargs["whole"] else dict(zip(cursor.column_names, rows)) if rows else None
        return result

    # def __contact_sql(self,table,criteria):
    def __contact_sql(self, **kwargs):
        sql = 'SELECT '
        if kwargs["criteria"] and type(kwargs["criteria"]) is dict:
            #select fields
            if 'select' in kwargs["criteria"]:
                fields = kwargs["criteria"]['select'].split(',')
                sql+= ','.join('`'+field+'`' for field in fields)
            else:
                sql+=' * '
            #table
            sql+=' FROM `%s`'% kwargs["table"]
            #where
            if 'where' in kwargs["criteria"]:
                sql+=' WHERE '+ kwargs["criteria"]['where']
            #group by
            if 'group' in kwargs["criteria"]:
                sql+=' GROUP BY '+ kwargs["criteria"]['group']
            #having
            if 'having' in kwargs["criteria"]:
                sql+=' HAVING '+ kwargs["criteria"]['having']
            #order by
            if 'order' in kwargs["criteria"]:
                sql+=' ORDER BY '+ kwargs["criteria"]['order']
            #limit
            if 'limit' in kwargs["criteria"]:
                sql+=' LIMIT '+ str(kwargs["criteria"]['limit'])
            #offset
            if 'offset' in kwargs["criteria"]:
                sql+=' OFFSET '+ str(kwargs["criteria"]['offset'])
        else:
            sql+=' * FROM `%s`'% kwargs["table"]
        return sql
    def findKeySql(self, key ,**kwargs):
        sqlOperate = {
        Const.COUNT: lambda: self.count(**kwargs),
        Const.COUNT_BY_SQL: lambda: self.countBySql(**kwargs),
        Const.DELETE_BY_ATTR: lambda: self.deleteByAttr(**kwargs),
        Const.EXIST: lambda: self.exist(**kwargs),
        Const.FIND_ALL_BY_ATTR: lambda: self.findAllByAttr(**kwargs),
        Const.INSERT: lambda: self.insert(**kwargs),
        Const.FIND_BY_ATTR: lambda: self.findByAttr(**kwargs),
        Const.UPDATE_BY_ATTR: lambda: self.updateByAttr(**kwargs),
        Const.FIND_BY_SQL: lambda: self.findBySql(**kwargs)

        }
        return sqlOperate[key]()


if __name__ == "__main__":
    mysqlet = MySQLet(host="127.0.0.1", user="root", password="", charset="utf8", database="userinfo", port=3306)
    # 根据字段统计count, join>>AND,OR,可以不传,默认为AND
    print(mysqlet.findKeySql(Const.COUNT, table="info", params={"id": "11", "name": "666"}, join="OR"))
    # 自定义sql语句统计count
    print(mysqlet.findKeySql(Const.COUNT_BY_SQL, sql="select * from info", params={"name": "666"}, join="AND"))
    #插入数据
    print(mysqlet.findKeySql(Const.INSERT, table="info", data={"name":"333", "pwd": "111"}))
    #根据字段删除,不传params参数,就是删除全部
    print(mysqlet.findKeySql(Const.DELETE_BY_ATTR, table="info", params={"id": 20}))
    # 查找是否存在该记录,不传params参数,就是查找全部.join同上
    print(mysqlet.findKeySql(Const.EXIST, table="info", params={"id": 180},join='AND'))
    #根据字段查找多条记录,whole不传就查一条记录,criteria里面可以传where,group by,having,order by,limt,offset
    print(mysqlet.findKeySql(Const.FIND_ALL_BY_ATTR, table="info", criteria= {"where": "name=333"}, whole=True))
    # 根据字段查一条记录,和上面的查多条记录参数基本一样,少了个whole参数
    print(mysqlet.findKeySql(Const.FIND_BY_ATTR, table="info", criteria= {"where": "name=333"}))
    # 根据字段更新数据库中的记录,join可以传AND,OR,不传默认取AND
    print(mysqlet.findKeySql(Const.UPDATE_BY_ATTR, table="info",data={"name": "-09"}, params={"id": 18, "name": "333"}, join='AND'))
    # 根据自定义sql语句查询记录,limit:0表示所有记录,join:AND|OR.不传取AND
    print(mysqlet.findKeySql(Const.FIND_BY_SQL, sql="select * from info", params={"name": "333", "id": 18}, limit=0))

 

本文:Python:Mysql连库及简单封装使用, python mysql操作类

One Comment

Leave a Reply