JONCHAN-CN
3/30/2019 - 10:02 AM

使用python多线程连接mysql

使用python多线程连接mysql

2019年1月13日13:49

2017年11月04日 21:57:31 woodcol 阅读数:1824 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/fengmm521/article/details/78446412 最近一直在弄一个python的服务器端,使用了mysql数据库,因为想要访问数据库快一些,所以打算使用多线程建一个连接线程池,当有mysql数据库请求时,只要使用列队将请求数据发送给mysql连接池中的空闲线程来请求数据就好了。请求到数据后发送给客户端的用户数据缓存中。由用户逻辑处理线程处理结束后再返回结果给客户端。 当然了,实际上这个工具还可以用于其他想使用多线程处理的地方,比如像btc程序化交易系统。可以在交易平台申请很多个交易帐号,建立交易连接时每一个帐号一个线程,同时使用tor网络与交易平台建立连接,这样我就可以使用一个客户端同时使用多个IP地址管理多个btc交易帐号同时参与同一个交易平台交易。可以达到不引起btc交易平台和其他交易者察觉的方式控制币市价格了。据我所知,有的庄家就是这么干的。

下边是我写的一个mysql多线程池测试代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#mysql连接线程池,初始化时设置3个连接池,连接池保持使用率小于80%,如果连接池使用率大于等于80%时则自动增加mysql连接量直到使用率小于80%,连接池最大50个

import threading  
import time
import mysqlobj
from Queue import Queue  

_CONNECTTOOL = None

def requestComplete(n):
    _CONNECTTOOL.requestComplete(n)
    
class _connectThread(threading.Thread):  
    def __init__(self, t_name,requstqueue,responeQueue,signal,ptime = 3):  
        threading.Thread.__init__(self, name=t_name) 
        self.signal = signal #线程条件锁
        self.mysqlobj = mysqlobj.mysqlobj()
        self.threadFunc = None              #mysql调用返回值
        self.requstqueue = requstqueue      #mysql请求列队
        self.responeQueue = responeQueue    #mysql返回列队
        self.timer = 0                      #超时定时器
        self.account = ''                   #作数据库mysql请求的帐号
        self.askType =  1                   #1.线程空闲,可以接受请求,2.正在发送数据库请求,等待返回结果,3.数据库结果已返回或请求超时,正在处理请求结果
        self.connectobj = None              #当前请求连接数据对象
        self.conncetTimer = ptime           #默认请求3秒无返回,则认为数据库请求超时
    def _setNewConnect(self,newobj):
        self.askType = 2
        self.timer = time.time()        #开始发送mysql请求时间
        self.connectobj = newobj        #接收mysqly请求数据对象
        self._sendRequest()
    def _sendRequest(self):
        if self.connectobj.backfunc != None:
            print 'thname=%s\n'%(self.getName())
            time.sleep(4)
            #self.connectobj.cmdstr =  self.mysqlobj.execute(self.connectobj.cmdstr)
            self.askType = 3
            self.responeQueue.put(self.connectobj)
            self.connectobj.comfunc(int(self.getName()))
            self.askType = 1
    def run(self):
        while(True):
            if self.askType == 1 and (not self.requstqueue.empty()):  #线程是否空闲,请求队列中是否有请求对象
                objtmp = self.requstqueue.get()
                self._setNewConnect(objtmp)
            else:#没有数据请求,线程进入等待唤醒模式
                print 'thread wait:%s\n'%(self.getName())
                self.signal.clear()
                self.signal.wait()   #请求结束等待下一次请求来唤醒
        
class _askObj():
    def __init__(self,account,cmdstr,backfunc,ptype = 'sreach'):#inset,del,update,sreach,分别为增加数据,删除数据,修改数据,查找数据,backfunc为查找到的数据返回
        self.account = account
        self.cmdstr = cmdstr
        self.ptype = ptype                  #数据库请求类型
        self.backfunc = backfunc            #数据库请求返回回调函数
        self.outtimer = time.time()         #数据库查找时间,用作查找超时处理,现在定义默认查找5秒未返回就超时,
        self.threadname = ''
        self.comfunc = requestComplete

class mysqlConnectThreads():
    def __init__(self,maxcon = 50,mincon = 3,addpencent = 80):
        self.maxcon = maxcon
        self.mincon = mincon
        self.singal = threading.Event()
        self.addpencent = addpencent
        self.concount = 0               #当前已连接mysql数量
        self.conthreads = {}
        self.singals = {}
        self.runthread = []
        self.waitthread = []
        self.threadCount = 0
        self.conpencent = 0.0           #当前线程池使用率
        self.requestQueue = Queue()     #mysql请求列队
        self.responeQueue = Queue()     #mysql返回列队
        self._createConncets()
    #初始化mysql连接池
    def _createConncets(self):
        global _CONNECTTOOL
        _CONNECTTOOL = self
        self.singals['1'] = threading.Event()
        self.conthreads['1'] = _connectThread('1',self.requestQueue, self.responeQueue,self.singals['1']) 
        self.conthreads['1'].setDaemon(True)
        self.conthreads['1'].start()
        self.singals['2'] = threading.Event()
        self.conthreads['2'] = _connectThread('2',self.requestQueue, self.responeQueue,self.singals['2']) 
        self.conthreads['2'].setDaemon(True)
        self.conthreads['2'].start()
        self.singals['3'] = threading.Event()
        self.conthreads['3'] = _connectThread('3',self.requestQueue, self.responeQueue,self.singals['3']) 
        self.conthreads['3'].setDaemon(True)
        self.conthreads['3'].start()
        self.threadCount = 3
        self.waitthread = [1,2,3]
    #使用线程池中线程发送mysql命令
    def mysqlexecute(self,account,cmdstr,backfunc,ptype = 'sreach'):
        reqtmp = _askObj(account,cmdstr,backfunc,ptype)
        self.requestQueue.put(reqtmp)
        qs = self.requestQueue.qsize()
        self.conpencent = (float)(qs/len(self.conthreads))
        print 'qs=%d\n'%(qs)
        if self.conpencent >= 0.8 and self.threadCount < self.maxcon:
            self.threadCount +=1
            self.singals[str(self.threadCount)] = threading.Event()
            self.conthreads[str(self.threadCount)] = _connectThread(str(self.threadCount),self.requestQueue,self.responeQueue,self.singals[str(self.threadCount)])
            self.conthreads[str(self.threadCount)].setDaemon(True)
            self.conthreads[str(self.threadCount)].start()
        print 'thread count:%d\n'%(self.threadCount)
        if len(self.waitthread) > 0:
            n = self.waitthread.pop()
            self.runthread.append(n)
            self.singals[str(n)].set()
    def requestComplete(self,n):
        if not self.responeQueue.empty():
            reqtmp = self.responeQueue.get()
            reqtmp.backfunc(reqtmp.account,reqtmp.cmdstr)
        self.waitthread.append(n)
def asktest(backaccount,backdat):
    print 'asktest:%s,%s'%(backaccount,backdat)
#classtest
if __name__ == '__main__':
    con = mysqlConnectThreads()
    ac = 1
    while(True):
        ac +=1
        con.mysqlexecute(str(ac), 'ask'+str(ac), asktest)
        time.sleep(1)

然后是mysql访问数据库的代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import MySQLdb
from subprocess import Popen,PIPE  

#         host='localhost',
#         port = 3306,
#         user='root',
#         passwd='7668150Tt00',
#         db ='sanguogame',

# CREATE TABLE `sanguogame`.`new_table` (
#   `id` INT NOT NULL AUTO_INCREMENT,
#   `name` VARCHAR(45) NOT NULL,
#   `class` VARCHAR(45) NOT NULL,
#   `age` VARCHAR(45) NOT NULL,
#     PRIMARY KEY (`id`, `name`));


class mysqlobj():
    def __init__(self,addr = 'localhost',port = 3306,usname = 'root',uspw = '123456Qq',defDB = 'test'):
        self.mysqladdr = addr           #mysql地址
        self.mysqlport = port           #mysql端口
        self.mysqlusername = usname     #mysql登陆用户名
        self.mysqlpassword = uspw       #mysql登陆密码
        self.mysqlDefaleDB = defDB      #mysql默认登陆数据库
        self.connectManger = None       #mysql连接管理器
        self.mysqlcursor = None         #mysql消息收发器
        self._connectMysql()            #连接mysql数据库
    def _connectMysql(self):
        self.connectManger = MySQLdb.connect(
                                             host = self.mysqladdr,
                                             port = self.mysqlport,
                                             user = self.mysqlusername,
                                             passwd = self.mysqlpassword,
                                             db = self.mysqlDefaleDB,
                                             charset="utf8"
                                             )
        self.mysqlcursor = self.connectManger.cursor()
    #调用mysql命令
    def execute(self,cmdstr):
        if self.mysqlcursor:
            return self.mysqlcursor.execute(cmdstr)
        else:
            return -999#mysql 未连接
    def inPutDataWithSqlFile(self,sqlfilepath):
        process = Popen('/usr/local/mysql/bin/mysql -h%s -P%s -u%s -p%s %s'  %(self.mysqladdr, self.mysqlport, self.mysqlusername, self.mysqlpassword, self.mysqlDefaleDB), stdout=PIPE, stdin=PIPE, shell=True)  
        output = process.communicate('source '+sqlfilepath)
        print output

多线程间等待与启动交互事件参考: http://www.jb51.net/article/63512.htm

来自 https://blog.csdn.net/fengmm521/article/details/78446412