Python: MySQL 数据库连接, PyMySQL 驱动, pymysql 操作MySQL数据库, 防止SQL注入

什么是 PyMySQL?

PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库,Python2中则使用mysqldb。

PyMySQL 遵循 Python 数据库 API v2.0 规范,并包含了 pure-Python MySQL 客户端库。

 

项目地址:PyMySQL

安装要求:

  • Python — 以下满足任一:
  • MySQL Server –以下满足任一:

 

PyMySQL 安装

在使用 PyMySQL 之前,我们需要确保 PyMySQL 已安装。

PyMySQL 下载地址:https://github.com/PyMySQL/PyMySQL。

如果还未安装,我们可以使用以下命令安装最新版的 PyMySQL:

$ pip3 install PyMySQL

 

如果你的系统不支持 pip 命令,可以使用以下方式安装:

1、使用 git 命令下载安装包安装(你也可以手动下载):

$ git clone https://github.com/PyMySQL/PyMySQL
$ cd PyMySQL/
$ python3 setup.py install

 

2、如果需要制定版本号,可以使用 curl 命令来安装:

$ # X.X 为 PyMySQL 的版本号
$ curl -L https://github.com/PyMySQL/PyMySQL/tarball/pymysql-X.X | tar xz
$ cd PyMySQL*
$ python3 setup.py install
$ # 现在你可以删除 PyMySQL* 目录

注意:请确保您有root权限来安装上述模块。

 

安装的过程中可能会出现”ImportError: No module named setuptools”的错误提示,意思是你没有安装setuptools,你可以访问https://pypi.python.org/pypi/setuptools 找到各个系统的安装方法。

Linux 系统安装实例:

$ wget https://bootstrap.pypa.io/ez_setup.py
$ python3 ez_setup.py

 

数据库连接

连接数据库前,请先确认以下事项:

  • 您已经创建了数据库 TESTDB.
  • 在TESTDB数据库中您已经创建了表 EMPLOYEE
  • EMPLOYEE表字段为 FIRST_NAME, LAST_NAME, AGE, SEX 和 INCOME。
  • 连接数据库TESTDB使用的用户名为 “testuser” ,密码为 “test123”,你可以可以自己设定或者直接使用root用户名及其密码,Mysql数据库用户授权请使用Grant命令。
  • 在你的机子上已经安装了 Python MySQLdb 模块。
  • 如果您对sql语句不熟悉,可以访问我们的 SQL基础教程

 

实例:

以下实例链接 Mysql 的ikeepstudying 数据库:

#!/usr/bin/python3
 
import pymysql
 
# 打开数据库连接
db = pymysql.connect("localhost","root","root","ikeepstudying" )
 
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
 
# 使用 execute()  方法执行 SQL 查询 
cursor.execute("SELECT VERSION()")
 
# 使用 fetchone() 方法获取单条数据.
data = cursor.fetchone()
 
print ("Database version : %s " % data)
 
# 关闭数据库连接
db.close()

 

执行以上脚本输出结果如下:

Database version : 5.5.20-log

 

或者:

import pymysql.cursors

# Connect to the database
connection = pymysql.connect(host='127.0.0.1',
                             port=3306,
                             user='root',
                             password='root',
                             db='ikeepstudying',
                             charset='utf8mb4',
                             cursorclass=pymysql.cursors.DictCursor)

 

也可以使用字典进行连接参数的管理,我觉得这样子更优雅一些:

 

事务处理

  • 开启事务 connection.begin()
  • 提交修改 connection.commit()
  • 回滚事务 connection.rollback()

事务机制可以确保数据一致性。

事务应该具有4个属性:原子性、一致性、隔离性、持久性。这四个属性通常称为ACID特性。

  • 原子性(atomicity)。一个事务是一个不可分割的工作单位,事务中包括的诸操作要么都做,要么都不做。
  • 一致性(consistency)。事务必须是使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的。
  • 隔离性(isolation)。一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对并发的其他事务是隔离的,并发执行的各个事务之间不能互相干扰。
  • 持久性(durability)。持续性也称永久性(permanence),指一个事务一旦提交,它对数据库中数据的改变就应该是永久性的。接下来的其他操作或故障不应该对其有任何影响。

Python DB API 2.0 的事务提供了两个方法 commit 或 rollback。

# SQL删除记录语句
sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
try:
   # 执行SQL语句
   cursor.execute(sql)
   # 向数据库提交
   db.commit()
except:
   # 发生错误时回滚
   db.rollback()

对于支持事务的数据库, 在Python数据库编程中,当游标建立之时,就自动开始了一个隐形的数据库事务。

commit()方法游标的所有更新操作,rollback()方法回滚当前游标的所有操作。每一个方法都开始了一个新的事务。

 

创建数据库表

如果数据库连接存在我们可以使用execute()方法来为数据库创建表,如下所示创建表EMPLOYEE:

#!/usr/bin/python3
 
import pymysql
 
# 打开数据库连接
db = pymysql.connect("localhost","root","root","ikeepstudying" )
 
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
 
# 使用 execute() 方法执行 SQL,如果表存在则删除
cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")
 
# 使用预处理语句创建表
sql = """CREATE TABLE EMPLOYEE (
         FIRST_NAME  CHAR(20) NOT NULL,
         LAST_NAME  CHAR(20),
         AGE INT,  
         SEX CHAR(1),
         INCOME FLOAT )"""
 
cursor.execute(sql)
 
# 关闭数据库连接
db.close()

 

数据库插入操作

以下实例使用执行 SQL INSERT 语句向表 EMPLOYEE 插入记录:

#!/usr/bin/python3
 
import pymysql
 
# 打开数据库连接
db = pymysql.connect("localhost","root","root","ikeepstudying" )
 
# 使用cursor()方法获取操作游标 
cursor = db.cursor()
 
# SQL 插入语句
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
         LAST_NAME, AGE, SEX, INCOME)
         VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
try:
   # 执行sql语句
   cursor.execute(sql)
   # 提交到数据库执行
   db.commit()
except:
   # 如果发生错误则回滚
   db.rollback()
 
# 关闭数据库连接
db.close()

 

以上例子也可以写成如下形式:

#!/usr/bin/python3
 
import pymysql
 
# 打开数据库连接
db = pymysql.connect("localhost","root","root","ikeepstudying" )
 
# 使用cursor()方法获取操作游标 
cursor = db.cursor()
 
# SQL 插入语句
sql = "INSERT INTO EMPLOYEE(FIRST_NAME, \
       LAST_NAME, AGE, SEX, INCOME) \
       VALUES (%s, %s,  %s,  %s,  %s )" % \
       ('Mac', 'Mohan', 20, 'M', 2000)
try:
   # 执行sql语句
   cursor.execute(sql)
   # 执行sql语句
   db.commit()
except:
   # 发生错误时回滚
   db.rollback()
 
# 关闭数据库连接
db.close()

以下代码使用变量向SQL语句中传递参数:

..................................
user_id = "admin"
password = "admin2019"

con.execute('insert into Login values( %s,  %s)' % \
             (user_id, password))
..................................

 

另一个插入实例:

from datetime import date, datetime, timedelta 
import pymysql.cursors

#连接配置信息
config = {
          'host':'127.0.0.1',
          'port':3306,
          'user':'root',
          'password':'root',
          'db':'ikeepstudying',
          'charset':'utf8mb4',
          'cursorclass':pymysql.cursors.DictCursor,
          }
# 创建连接
connection = pymysql.connect(**config)

# 获取明天的时间
tomorrow = datetime.now().date() + timedelta(days=1)

# 执行sql语句
try:
    with connection.cursor() as cursor:
        # 执行sql语句,插入记录
        sql = 'INSERT INTO employees (first_name, last_name, hire_date, gender, birth_date) VALUES (%s, %s, %s, %s, %s)'
        cursor.execute(sql, ('justcode', 'ikeepstudying', tomorrow, 'M', date(1989, 6, 14)));
    # 没有设置默认自动提交,需要主动提交,以保存所执行的语句
    connection.commit()

finally:
    connection.close();

 

获取自增 ID

cursor.lastrowid

 

执行 SQL ( 单条 + 批量 )

 

  1.  cursor.execute(sql, args) 执行单条 SQL
    # 获取游标
    cursor = connection.cursor()
        
    # 创建数据表
    effect_row = cursor.execute('''
    CREATE TABLE `users` (
      `name` varchar(32) NOT NULL,
      `age` int(10) unsigned NOT NULL DEFAULT '0',
      PRIMARY KEY (`name`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    ''')
        
    # 插入数据(元组或列表)
    effect_row = cursor.execute('INSERT INTO `users` (`name`, `age`) VALUES (%s, %s)', ('mary', 18))
        
    # 插入数据(字典)
    info = {'name': 'fake', 'age': 15}
    effect_row = cursor.execute('INSERT INTO `users` (`name`, `age`) VALUES (%(name)s, %(age)s)', info)
        
    connection.commit()

     

  2. executemany(sql, args) 批量执行 SQL
    # 获取游标
    cursor = connection.cursor()
        
    # 批量插入
    effect_row = cursor.executemany(
        'INSERT INTO `users` (`name`, `age`) VALUES (%s, %s) ON DUPLICATE KEY UPDATE age=VALUES(age)', [
            ('hello', 13),
            ('fake', 28),
        ])
        
    connection.commit()

     

注意:INSERT、UPDATE、DELETE 等修改数据的语句需手动执行connection.commit()完成对数据修改的提交。

 

数据库查询操作

Python查询Mysql使用 fetchone() 方法获取单条数据, 使用fetchall() 方法获取多条数据。

  • fetchone(): 该方法获取下一个查询结果集。结果集是一个对象
  • fetchall(): 接收全部的返回结果行.
  • fetchmany(3): 获取前N条数据
  • rowcount: 这是一个只读属性,并返回执行execute()方法后影响的行数。

实例:

查询EMPLOYEE表中salary(工资)字段大于1000的所有数据:

#!/usr/bin/python3
 
import pymysql
 
# 打开数据库连接
db = pymysql.connect("localhost","root","root","ikeepstudying" )
 
# 使用cursor()方法获取操作游标 
cursor = db.cursor()
 
# SQL 查询语句
sql = "SELECT * FROM EMPLOYEE \
       WHERE INCOME > %s" % (1000)
try:
   # 执行SQL语句
   cursor.execute(sql)
   # 获取所有记录列表
   results = cursor.fetchall()
   for row in results:
      fname = row[0]
      lname = row[1]
      age = row[2]
      sex = row[3]
      income = row[4]
       # 打印结果
      print ("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \
             (fname, lname, age, sex, income ))
except:
   print ("Error: unable to fetch data")
 
# 关闭数据库连接
db.close()

以上脚本执行结果如下:

fname=Mac, lname=Mohan, age=20, sex=M, income=2000

 

或者使用:

import datetime
import pymysql.cursors

#连接配置信息
config = {
          'host':'127.0.0.1',
          'port':3306,
          'user':'root',
          'password':'root',
          'db':'ikeepstudying',
          'charset':'utf8mb4',
          'cursorclass':pymysql.cursors.DictCursor,
          }
# 创建连接
connection = pymysql.connect(**config)

# 获取雇佣日期
hire_start = datetime.date(1999, 1, 1)
hire_end = datetime.date(2016, 12, 31)

# 执行sql语句
try:
    with connection.cursor() as cursor:
        # 执行sql语句,进行查询
        sql = 'SELECT first_name, last_name, hire_date FROM employees WHERE hire_date BETWEEN %s AND %s'
        cursor.execute(sql, (hire_start, hire_end))
        # 获取查询结果
        result = cursor.fetchone()
        print(result)
    # 没有设置默认自动提交,需要主动提交,以保存所执行的语句
    connection.commit()

finally:
    connection.close();

 

这里的查询支取了一条查询结果,查询结果以字典的形式返回。

从结果集中获取指定数目的记录,可以使用fetchmany方法:

result = cursor.fetchmany(2)

不过不建议这样使用,最好在sql语句中设置查询的记录总数。

获取全部结果集可以使用fetchall方法:

result = cursor.fetchall()

因为只有两条记录,所以上面提到的这两种查询方式查到的结果是一样的:

游标控制

所有的数据查询操作均基于游标,我们可以通过cursor.scroll(num, mode)控制游标的位置。

cursor.scroll(1, mode='relative') # 相对当前位置移动
cursor.scroll(2, mode='absolute') # 相对绝对位置移动

 

设置游标类型

查询时,默认返回的数据类型为元组,可以自定义设置返回类型。支持5种游标类型:

  • Cursor: 默认,元组类型
  • DictCursor: 字典类型
  • DictCursorMixin: 支持自定义的游标类型,需先自定义才可使用
  • SSCursor: 无缓冲元组类型
  • SSDictCursor: 无缓冲字典类型

无缓冲游标类型,适用于数据量很大,一次性返回太慢,或者服务端带宽较小时。源码注释:

Unbuffered Cursor, mainly useful for queries that return a lot of data, or for connections to remote servers over a slow network.

Instead of copying every row of data into a buffer, this will fetch rows as needed. The upside of this is the client uses much less memory, and rows are returned much faster when traveling over a slow network or if the result set is very big.

There are limitations, though. The MySQL protocol doesn’t support returning the total number of rows, so the only way to tell how many rows there are is to iterate over every row returned. Also, it currently isn’t possible to scroll backwards, as only the current row is held in memory.

 

创建连接时,通过 cursorclass 参数指定类型:

connection = pymysql.connect(host='localhost',
                             user='root',
                             password='root',
                             db='demo',
                             charset='utf8',
                             cursorclass=pymysql.cursors.DictCursor)

 

也可以在创建游标时指定类型:

cursor = connection.cursor(cursor=pymysql.cursors.DictCursor)

 

数据库更新操作

更新操作用于更新数据表的的数据,以下实例将 TESTDB 表中 SEX 为 ‘M’ 的 AGE 字段递增 1:

#!/usr/bin/python3
 
import pymysql
 
# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )
 
# 使用cursor()方法获取操作游标 
cursor = db.cursor()
 
# SQL 更新语句
sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M')
try:
   # 执行SQL语句
   cursor.execute(sql)
   # 提交到数据库执行
   db.commit()
except:
   # 发生错误时回滚
   db.rollback()
 
# 关闭数据库连接
db.close()

 

删除操作

删除操作用于删除数据表中的数据,以下实例演示了删除数据表 EMPLOYEE 中 AGE 大于 20 的所有数据:

#!/usr/bin/python3
 
import pymysql
 
# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )
 
# 使用cursor()方法获取操作游标 
cursor = db.cursor()
 
# SQL 删除语句
sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
try:
   # 执行SQL语句
   cursor.execute(sql)
   # 提交修改
   db.commit()
except:
   # 发生错误时回滚
   db.rollback()
 
# 关闭连接
db.close()

 

防 SQL 注入

  • 转义特殊字符 connection.escape_string(str)
  • 参数化语句 支持传入参数进行自动转义、格式化 SQL 语句,以避免 SQL 注入等安全问题。
# 插入数据(元组或列表)
effect_row = cursor.execute('INSERT INTO `users` (`name`, `age`) VALUES (%s, %s)', ('mary', 18))
# 插入数据(字典)
info = {'name': 'fake', 'age': 15}
effect_row = cursor.execute('INSERT INTO `users` (`name`, `age`) VALUES (%(name)s, %(age)s)', info)
# 批量插入
effect_row = cursor.executemany(
    'INSERT INTO `users` (`name`, `age`) VALUES (%s, %s) ON DUPLICATE KEY UPDATE age=VALUES(age)', [
        ('hello', 13),
        ('fake', 28),
    ])

 

实例 1:避免注入,使用pymysql提供的参数化语句

正常参数化查询

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
 
import pymysql
 
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root', db='ikeepstudying')
cursor = conn.cursor()
user="u1"
passwd="u1pass"
#执行参数化查询
row_count=cursor.execute("select user,pass from users where user=%s and pass=%s",(user,passwd))
row_1 = cursor.fetchone()
print row_count,row_1
 
conn.commit()
cursor.close()
conn.close()

 

构造注入,参数化查询注入失败。

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql
 
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root', db='ikeepstudying')
cursor = conn.cursor()
 
user="u1' or '1'-- "
passwd="u1pass"
#执行参数化查询
row_count=cursor.execute("select user,pass from users where user=%s and pass=%s",(user,passwd))
#内部执行参数化生成的SQL语句,对特殊字符进行了加\转义,避免注入语句生成。
# sql=cursor.mogrify("select user,pass from users where user=%s and pass=%s",(user,passwd))
# print(sql)
#select user,pass from users where user='u1\' or \'1\'-- ' and pass='u1pass'被转义的语句。
 
row_1 = cursor.fetchone()
print(row_count,row_1)
 
conn.commit()
cursor.close()
conn.close()

 

结论:excute执行SQL语句的时候,必须使用参数化的方式,否则必然产生SQL注入漏洞。

 

实例 2: 使用MYSQL存储过程自动提供防注入,动态传入SQL到存储过程执行语句。

delimiter \\
DROP PROCEDURE IF EXISTS proc_sql \\
CREATE PROCEDURE proc_sql (
  in nid1 INT,
  in nid2 INT,
  in callsql VARCHAR(255)
  )
BEGIN
  set @nid1 = nid1;
  set @nid2 = nid2;
  set @callsql = callsql;
    PREPARE myprod FROM @callsql;
--   PREPARE prod FROM 'select * from tb2 where nid>? and nid<?';  传入的值为字符串,?为占位符
--   用@p1,和@p2填充占位符
    EXECUTE myprod USING @nid1,@nid2;
  DEALLOCATE prepare myprod;
 
END\\
delimiter ;
set @nid1=12;
set @nid2=15;
set @callsql = 'select * from tb7 where nid>? and nid<?';
CALL proc_sql(@nid1,@nid2,@callsql)

 

 

pymsql中调用

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql
 
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root', db='ikeepstudying')
cursor = conn.cursor()
mysql="select * from tb7 where nid>? and nid<?"
cursor.callproc('proc_sql', args=(11, 15, mysql))
 
rows = cursor.fetchall()
print rows #((12, 'u1', 'u1pass', 11111), (13, 'u2', 'u2pass', 22222), (14, 'u3', 'u3pass', 11113))
conn.commit()
cursor.close()
conn.close()

 

错误处理

DB API中定义了一些数据库操作的错误及异常,下表列出了这些错误和异常:

异常 描述
Warning 当有严重警告时触发,例如插入数据是被截断等等。必须是 StandardError 的子类。
Error 警告以外所有其他错误类。必须是 StandardError 的子类。
InterfaceError 当有数据库接口模块本身的错误(而不是数据库的错误)发生时触发。 必须是Error的子类。
DatabaseError 和数据库有关的错误发生时触发。 必须是Error的子类。
DataError 当有数据处理时的错误发生时触发,例如:除零错误,数据超范围等等。 必须是DatabaseError的子类。
OperationalError 指非用户控制的,而是操作数据库时发生的错误。例如:连接意外断开、 数据库名未找到、事务处理失败、内存分配错误等等操作数据库是发生的错误。 必须是DatabaseError的子类。
IntegrityError 完整性相关的错误,例如外键检查失败等。必须是DatabaseError子类。
InternalError 数据库的内部错误,例如游标(cursor)失效了、事务同步失败等等。 必须是DatabaseError子类。
ProgrammingError 程序错误,例如数据表(table)没找到或已存在、SQL语句语法错误、 参数数量错误等等。必须是DatabaseError的子类。
NotSupportedError 不支持错误,指使用了数据库不支持的函数或API等。例如在连接对象上 使用.rollback()函数,然而数据库并不支持事务或者事务已关闭。 必须是DatabaseError的子类。

 

在django中使用

在django中使用是我找这个库的最初目的。目前同时支持python3.4、django1.8的数据库backend并不好找。PyMySQL是我目前找到的最好用的。

设置DATABASES和官方推荐使用的MySQLdb的设置没什么区别:

DATABASES = {
   'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'ikeepstudying',
        'USER': 'root',
        'PASSWORD': 'root',
        'HOST': '127.0.0.1',
        'PORT': '3306',
    }
}

关键是这里:我们还需要在站点的__init__.py文件中添加如下的内容:

 

完整一条:

#!/usr/bin/env pytho
# -*- coding:utf-8 -*-
import pymysql
  
# 创建连接
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root', db='ikeepstudying', charset='utf8')
# 创建游标
cursor = conn.cursor()
  
# 执行SQL,并返回收影响行数
effect_row = cursor.execute("select * from tb7")
  
# 执行SQL,并返回受影响行数
#effect_row = cursor.execute("update tb7 set pass = '123' where nid = %s", (11,))
  
# 执行SQL,并返回受影响行数,执行多次
#effect_row = cursor.executemany("insert into tb7(user,pass,licnese)values(%s,%s,%s)", [("u1","u1pass","11111"),("u2","u2pass","22222")])
  
  
# 提交,不然无法保存新建或者修改的数据
conn.commit()
  
# 关闭游标
cursor.close()
# 关闭连接
conn.close()

注意:存在中文的时候,连接需要添加charset=’utf8’,否则中文显示乱码。

 

每次都连接关闭很麻烦,使用上下文管理,简化连接过程

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
 
import pymysql
import contextlib
#定义上下文管理器,连接后自动关闭连接
@contextlib.contextmanager
def mysql(host='127.0.0.1', port=3306, user='root', passwd='root', db='ikeepstudying',charset='utf8'):
  conn = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db, charset=charset)
  cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
  try:
    yield cursor
  finally:
    conn.commit()
    cursor.close()
    conn.close()
 
# 执行sql
with mysql() as cursor:
  print(cursor)
  row_count = cursor.execute("select * from users")
  row_1 = cursor.fetchone()
  print row_count, row_1

 

 

或者:

import pymysql
config={
    "host":"127.0.0.1",
    "user":"root",
    "password":"LBLB1212@@",
    "database":"dbforpymysql"
}
db = pymysql.connect(**config)
with db.cursor(cursor=pymysql.cursors.DictCursor) as cursor:  #获取数据库连接的对象
    sql = "SELECT * FROM userinfo"   
    cursor.execute(sql)
    res = cursor.fetchone()
    print(res)
    cursor.scroll(2,mode='relative')
    res = cursor.fetchone()
    print(res)
    cursor.close()
db.close()

#运行结果
{'id': 1, 'username': 'frank', 'passwd': '123'}
{'id': 5, 'username': 'bob', 'passwd': '123'}

 

本文:Python: MySQL 数据库连接, PyMySQL 驱动, pymysql 操作MySQL数据库, 防止SQL注入

发表评论