下面我将详细介绍如何使用Python3多线程(连接池)操作MySQL插入数据。
准备工作
首先,我们需要安装Python3以及对应的MySQL库。可以使用以下命令进行安装:
pip install pymysql
创建数据库连接池
使用连接池可以最大化利用已经建立的连接,提高程序的性能和并发能力。下面是创建连接池的示例代码:
import pymysql
from DBUtils.PooledDB import PooledDB
pool = PooledDB(
creator=pymysql,
mincached=1,
maxcached=10,
host='localhost',
port=3306,
user='root',
password='password',
database='mydb',
charset='utf8'
)
在这个示例中,我们使用了DBUtils.PooledDB
库来创建连接池。mincached
参数表示连接池中最少保持的连接数,maxcached
参数表示连接池中最大的连接数。使用该库创建的pool
对象可以用来获取数据库连接。
连接池实现多线程插入数据
下面是连接池实现多线程插入数据的示例代码:
from concurrent.futures import ThreadPoolExecutor
def insert(data):
query = "INSERT INTO mytable (name, age) VALUES (%s, %s)"
conn = pool.connection()
cursor = conn.cursor()
cursor.execute(query, data)
conn.commit()
cursor.close()
conn.close()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(insert, ('Alice', 25)) for _ in range(50)]
在这个示例中,我们使用了Python的concurrent.futures
库来实现多线程操作MySQL。ThreadPoolExecutor
对象可以用来创建一个线程池,其中max_workers
表示线程池中线程的最大数量。
executor.submit()
方法用于提交一个任务,该方法会立即返回一个Future
对象。在这个示例中,我们提交了50个插入数据的任务,并指定了参数值('Alice', 25)
。
在insert
函数中,我们使用连接池来获取数据库连接,执行插入操作并关闭数据库连接。使用连接池可以显著提高程序的性能和并发能力。
示例说明
以下是两个使用连接池多线程插入数据的示例:
示例1:批量插入数据
from concurrent.futures import ThreadPoolExecutor
def insert_many(data_list):
query = "INSERT INTO mytable (name, age) VALUES (%s, %s)"
conn = pool.connection()
cursor = conn.cursor()
cursor.executemany(query, data_list)
conn.commit()
cursor.close()
conn.close()
data = [('Alice', 25), ('Bob', 30), ('Charlie', 35), ('David', 40)]
with ThreadPoolExecutor(max_workers=1) as executor:
futures = [executor.submit(insert_many, (data[i:i+2])) for i in range(0, len(data), 2)]
在这个示例中,我们使用executemany()
方法批量插入数据。在insert_many
函数中,我们传入了一个包含多个数据的列表data_list
,使用executemany()
方法批量插入数据。
在主程序中,我们使用一个线程处理相邻的两个数据项,通过循环遍历将数据分组并提交到线程池处理。
示例2:数据插入失败重试
from concurrent.futures import ThreadPoolExecutor, as_completed
def insert_with_retry(data):
query = "INSERT INTO mytable (name, age) VALUES (%s, %s)"
conn = pool.connection()
cursor = conn.cursor()
for i in range(3):
try:
cursor.execute(query, data)
conn.commit()
break
except Exception as e:
print(f"Insert failed, retrying ({i+1}/3)")
cursor.close()
conn.close()
data = [('Alice', 25), ('Bob', 30), ('Charlie', 'invalid'), ('David', 40)]
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(insert_with_retry, d) for d in data]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"Insert failed: {e}")
在这个示例中,我们在insert_with_retry
函数中增加了数据插入失败重试的逻辑。在主程序中,我们提交了包含多个数据的任务列表,并使用as_completed()
函数迭代多个Future
对象并尝试获取其结果。
由于数据中包含了一个无效的年龄值,在插入操作中将会抛出异常。在这种情况下,我们将尝试最多三次执行插入操作。
以上就是Python3多线程(连接池)操作MySQL插入数据的完整攻略。