python pymysql基础用法及连接池

ZhangJian 2020-02-29 n次浏览 Python 编辑

使用连接池要用到DBUtils 不存在则要安装 安装pymysql:

pip install pymysql

注意事项: 写操作完成后要提交commit() 不能提交前return

数据库连接:

# 连接配置信息 
config = { 
    'host':'127.0.0.1', 
    'port':3306, 
    'user':'root', 
    'password':'root', 
    'db':'db', 
    'charset':'utf8mb4',         
    'cursorclass':pymysql.cursors.DictCursor, 
     # 游标设置为字典类型默认值为pymysql.cursors.Cursor。
} 
connection = pymysql.connect(**config)

针对result = cursor.fetchall() :
1、用DictCursor:[{'s_task_model_id': 1}] # 创建游标,查询获得的数据以 字典(dict) 形式返回
2、用Cursor时:((1,),) #  返回二维元组 tuple类型
注意:存在中文的时候,连接需要添加charset='utf8',否则中文显示乱码

Connect()的参数列表:
host – 数据库服务器所在的主机。
user – 登录用户名。
password – 登录用户密码。
database – 连接的数据库。
port – 数据库开放的端口。(默认: 3306)
bind_address – 当客户端有多个网络接口时,请指定连接到主机的接口,参数可以是主机名或IP地址。
unix_socket – 使用unix套接字而不是tcp/ip。
charset – 连接字符集。
sql_mode – 默认SQL模式。
read_default_file – 指定my.cnf文件路径,以便从[client]部分读取参数。
conv – 要使用的转换字典,而不是默认值。
use_unicode – 是否默认为unicode字符串,对于Py3k,此选项默认为true。
client_flag – 发送到MySQL的自定义标志。
cursorclass – 使用自定义的游标类。
init_command – 建立连接时要运行的初始SQL语句。
connect_timeout – 建立连接超时时间。(默认: 10,最小: 1,最大: 31536000)
read_default_group – 从配置文件中读取组。
compress – 不支持
named_pipe – 不支持
autocommit – 设置自动提交模式,不设置意味着使用数据库默认。(默认值: False)
local_infile – 是否启用“LOAD LOCAL INFILE”命令的使用。(默认值: False)
max_allowed_packet – 发送到服务器的数据包的最大大小 (以字节为单位,默认值: 16MB),仅用于限制小于默认值 (16KB) 的 “LOAD LOCAL INFILE” 数据包的大小。
defer_connect – 不要显式连接建设,等待连接调用。(默认值: False)
auth_plugin_map – A dict of plugin names to a class that processes that plugin. The class will take the Connection object as the argument to the constructor. The class needs an authenticate method taking an authentication packet as an argument. For the dialog plugin, a prompt(echo, prompt) method can be used (if no authenticate method) for returning a string from the user. (experimental)
db – 连接数据库别名(兼容MySQLdb)
passwd – 密码输入别名(兼容MySQLdb)
binary_prefix – 在bytes和bytearray上添加_binary前缀(默认: False)

connection对象:

方法  描述
begin() 开启事务
commit()    提交事务
cursor(cursor=None) 创建一个游标用来执行语句
ping(reconnect=True)    检查连接是否存活,会重新发起连接
rollback()  回滚事务
close() 关闭连接
select_db(db)   选择数据库
show_warnings() 查看warning信息

cursor对象:

方法  描述
close() 关闭游标。
execute(query, args=None)   执行单条语句,传入需要执行的语句,是string类型;同时可以给查询传入参数,参数可以是tuple、list或dict。执行完成后,会返回执行语句的影响行数,如果有的话。
executemany(query, args)    执行多条INSERT语句,传入需要执行的语句;同时可以给查询传入参数,参数是一个mappings序列。执行完成后,会返回执行语句的影响行数,如果有的话。
fetchone()  获取下一行数据。
fetchall()  获取所有数据。
fetchmany(size=None)    获取几行数据。
read_next() 获取下一行数据。
callproc()  用来调用存储过程。
mogrify()   参数化查询,防止SQL注入。
scroll(num,mode)    移动游标位置。

例子:

#  fetchone、fetchmany、fetchmany
#  cursorclass=pymysql.cursors.SSDictCursor 
#  SSCursor游标类,意在解决数据量大的问题。

cursor = connect.cursor()
try:
    row = cursor.execute(sql) # 返回受影响的行数row
    # cursor.executemany() 针对一个查询运行多个数据 数据的格式[('v1','v2'),('v3','v4')]
    result = cursor.fetchall() # 获取查询的所有记录 结果集是一个对象
    result = cursor.fetchmany(n) # 获取前n条记录
    connect.commit()
except:
    # 如果发生错误则回滚
    connect.rollback()
    # 抛出异常原因
    raise 
finally:
    connect.close()
# 获取自增id
new_id = cursor.lastrowid      
print(new_id)

注入问题:

excute执行SQL语句的时候,必须使用参数化的方式,否则必然产生SQL注入漏洞

1、通常执行
SQL = "SELECT * FROM scrapy_user_summary WHERE s_username='%s' and s_password_hash='%s'" % (username, password)
# 对这条sql注入 username= ' or 1 -- 
# SELECT * FROM scrapy_user_summary WHERE s_username='' or 1 -- ' and s_password_hash=''
# --为注释,即无视后续语句
2、用参数化方式,无需在%s两端加引号
内部执行参数化生成的SQL语句,对特殊字符进行了加\转义,避免注入语句生成。

使用连接池:

面对大量的web请求和插入与查询请求,mysql连接会不稳定,针对错误’Lost connection to MySQL server during query ([Errno 104] Connection reset by peer)’
优势:在程序创建连接的时候从一个空闲的连接中获取,不需要重新初始化连接,提升获取连接的速度
DBUtils是Python的一个用于实现数据库连接池的模块:

mincached,最少的空闲连接数,如果空闲连接数小于这个数,pool会创建一个新的连接。
maxcached,最大的空闲连接数,如果空闲连接数大于这个数,pool会关闭空闲连接。
maxconnections,最大的连接数,进程中最大可创建的线程数。
blocking, 当连接数达到最大连接数时,再次请求时,如果这个值是True,请求连接的程序会一直等待,直到当前连接数小于最大连接数;如果这个值为False,会报错。
masxshared,当连接数达到这个数时,新请求的连接会分享已经分配出去的连接。
---------------
# PooledDB :提供线程间可共享的数据库连接,把连接放回连接池而不是真正的关闭,可重用。
import pymysql
from DBUtils.PooledDB import PooledDB

pool = PooledDB(pymysql, 5, host="127.0.0.1", user='root',
                passwd='root', db='scrapy_test', port=3306, charset="utf8")
conn = pool.connection()
cur = conn.cursor()
SQL = "SELECT * FROM scrapy_config"
r = cur.execute(SQL)
r = cur.fetchall()
print(r)
cur.close()
conn.close()
----------------
# PersistentDB :提供线程专用的数据库连接,线程终止时才真正关闭链接
"""
为每个线程创建一个连接,通过thread.local实现。
"""
from DBUtils.PersistentDB import PersistentDB
import pymysql
import threading

POOL = PersistentDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    closeable=False,
    # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
    threadlocal=None,  # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
    host='127.0.0.1',
    port=3306,
    user='root',
    password='',
    database='flask_demo',
    charset='utf8'
)


def func():
    conn = POOL.connection()  # conn = SteadyDBConnection()
    cursor = conn.cursor()
    cursor.execute('select * from USER WHERE id=1')
    result = cursor.fetchall()
    cursor.close()
    conn.close()  # 不是真的关闭,而是假的关闭。 conn = pymysql.connect()   conn.close()

    conn = POOL.connection()  # 还是刚才使用的连接
    cursor = conn.cursor()
    cursor.execute('select * from USER WHERE id=2')
    result = cursor.fetchall()
    cursor.close()
    conn.close()


for i in range(10):
    t = threading.Thread(target=func)
    t.start()

错误处理:

关于异常见Here

异常  描述
Warning 当有严重警告时触发,例如插入数据是被截断等等。必须是 StandardError 的子类。
Error   警告以外所有其他错误类。必须是 StandardError 的子类。
InterfaceError  当有数据库接口模块本身的错误(而不是数据库的错误)发生时触发。 必须是Error的子类。
DatabaseError   和数据库有关的错误发生时触发。 必须是Error的子类。
DataError   当有数据处理时的错误发生时触发,例如:除零错误,数据超范围等等。 必须是DatabaseError的子类。
OperationalError    指非用户控制的,而是操作数据库时发生的错误。例如:连接意外断开、 数据库名未找到、事务处理失败、内存分配错误等等操作数据库是发生的错误。 必须是DatabaseError的子类。
IntegrityError  完整性相关的错误,例如外键检查失败等。必须是DatabaseError子类。
InternalError   数据库的内部错误,例如游标(cursor)失效了、事务同步失败等等。 必须是DatabaseError子类。
ProgrammingError    程序错误,例如数据表(table)没找到或已存在、SQL语句语法错误、 参数数量错误等等。必须是DatabaseError的子类。
NotSupportedError   不支持错误,指使用了数据库不支持的函数或API等。例如在连接对象上 使用.rollback()函数,然而数据库并不支持事务或者事务已关闭。 必须是DatabaseError的子类。

参考: https://blog.csdn.net/weixin_40976261/article/details/89057633