2019年1月13日13:49
2017年11月04日 21:57:31 woodcol 阅读数:1824 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/fengmm521/article/details/78446412 最近一直在弄一个python的服务器端,使用了mysql数据库,因为想要访问数据库快一些,所以打算使用多线程建一个连接线程池,当有mysql数据库请求时,只要使用列队将请求数据发送给mysql连接池中的空闲线程来请求数据就好了。请求到数据后发送给客户端的用户数据缓存中。由用户逻辑处理线程处理结束后再返回结果给客户端。 当然了,实际上这个工具还可以用于其他想使用多线程处理的地方,比如像btc程序化交易系统。可以在交易平台申请很多个交易帐号,建立交易连接时每一个帐号一个线程,同时使用tor网络与交易平台建立连接,这样我就可以使用一个客户端同时使用多个IP地址管理多个btc交易帐号同时参与同一个交易平台交易。可以达到不引起btc交易平台和其他交易者察觉的方式控制币市价格了。据我所知,有的庄家就是这么干的。
下边是我写的一个mysql多线程池测试代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#mysql连接线程池,初始化时设置3个连接池,连接池保持使用率小于80%,如果连接池使用率大于等于80%时则自动增加mysql连接量直到使用率小于80%,连接池最大50个
import threading
import time
import mysqlobj
from Queue import Queue
_CONNECTTOOL = None
def requestComplete(n):
_CONNECTTOOL.requestComplete(n)
class _connectThread(threading.Thread):
def __init__(self, t_name,requstqueue,responeQueue,signal,ptime = 3):
threading.Thread.__init__(self, name=t_name)
self.signal = signal #线程条件锁
self.mysqlobj = mysqlobj.mysqlobj()
self.threadFunc = None #mysql调用返回值
self.requstqueue = requstqueue #mysql请求列队
self.responeQueue = responeQueue #mysql返回列队
self.timer = 0 #超时定时器
self.account = '' #作数据库mysql请求的帐号
self.askType = 1 #1.线程空闲,可以接受请求,2.正在发送数据库请求,等待返回结果,3.数据库结果已返回或请求超时,正在处理请求结果
self.connectobj = None #当前请求连接数据对象
self.conncetTimer = ptime #默认请求3秒无返回,则认为数据库请求超时
def _setNewConnect(self,newobj):
self.askType = 2
self.timer = time.time() #开始发送mysql请求时间
self.connectobj = newobj #接收mysqly请求数据对象
self._sendRequest()
def _sendRequest(self):
if self.connectobj.backfunc != None:
print 'thname=%s\n'%(self.getName())
time.sleep(4)
#self.connectobj.cmdstr = self.mysqlobj.execute(self.connectobj.cmdstr)
self.askType = 3
self.responeQueue.put(self.connectobj)
self.connectobj.comfunc(int(self.getName()))
self.askType = 1
def run(self):
while(True):
if self.askType == 1 and (not self.requstqueue.empty()): #线程是否空闲,请求队列中是否有请求对象
objtmp = self.requstqueue.get()
self._setNewConnect(objtmp)
else:#没有数据请求,线程进入等待唤醒模式
print 'thread wait:%s\n'%(self.getName())
self.signal.clear()
self.signal.wait() #请求结束等待下一次请求来唤醒
class _askObj():
def __init__(self,account,cmdstr,backfunc,ptype = 'sreach'):#inset,del,update,sreach,分别为增加数据,删除数据,修改数据,查找数据,backfunc为查找到的数据返回
self.account = account
self.cmdstr = cmdstr
self.ptype = ptype #数据库请求类型
self.backfunc = backfunc #数据库请求返回回调函数
self.outtimer = time.time() #数据库查找时间,用作查找超时处理,现在定义默认查找5秒未返回就超时,
self.threadname = ''
self.comfunc = requestComplete
class mysqlConnectThreads():
def __init__(self,maxcon = 50,mincon = 3,addpencent = 80):
self.maxcon = maxcon
self.mincon = mincon
self.singal = threading.Event()
self.addpencent = addpencent
self.concount = 0 #当前已连接mysql数量
self.conthreads = {}
self.singals = {}
self.runthread = []
self.waitthread = []
self.threadCount = 0
self.conpencent = 0.0 #当前线程池使用率
self.requestQueue = Queue() #mysql请求列队
self.responeQueue = Queue() #mysql返回列队
self._createConncets()
#初始化mysql连接池
def _createConncets(self):
global _CONNECTTOOL
_CONNECTTOOL = self
self.singals['1'] = threading.Event()
self.conthreads['1'] = _connectThread('1',self.requestQueue, self.responeQueue,self.singals['1'])
self.conthreads['1'].setDaemon(True)
self.conthreads['1'].start()
self.singals['2'] = threading.Event()
self.conthreads['2'] = _connectThread('2',self.requestQueue, self.responeQueue,self.singals['2'])
self.conthreads['2'].setDaemon(True)
self.conthreads['2'].start()
self.singals['3'] = threading.Event()
self.conthreads['3'] = _connectThread('3',self.requestQueue, self.responeQueue,self.singals['3'])
self.conthreads['3'].setDaemon(True)
self.conthreads['3'].start()
self.threadCount = 3
self.waitthread = [1,2,3]
#使用线程池中线程发送mysql命令
def mysqlexecute(self,account,cmdstr,backfunc,ptype = 'sreach'):
reqtmp = _askObj(account,cmdstr,backfunc,ptype)
self.requestQueue.put(reqtmp)
qs = self.requestQueue.qsize()
self.conpencent = (float)(qs/len(self.conthreads))
print 'qs=%d\n'%(qs)
if self.conpencent >= 0.8 and self.threadCount < self.maxcon:
self.threadCount +=1
self.singals[str(self.threadCount)] = threading.Event()
self.conthreads[str(self.threadCount)] = _connectThread(str(self.threadCount),self.requestQueue,self.responeQueue,self.singals[str(self.threadCount)])
self.conthreads[str(self.threadCount)].setDaemon(True)
self.conthreads[str(self.threadCount)].start()
print 'thread count:%d\n'%(self.threadCount)
if len(self.waitthread) > 0:
n = self.waitthread.pop()
self.runthread.append(n)
self.singals[str(n)].set()
def requestComplete(self,n):
if not self.responeQueue.empty():
reqtmp = self.responeQueue.get()
reqtmp.backfunc(reqtmp.account,reqtmp.cmdstr)
self.waitthread.append(n)
def asktest(backaccount,backdat):
print 'asktest:%s,%s'%(backaccount,backdat)
#classtest
if __name__ == '__main__':
con = mysqlConnectThreads()
ac = 1
while(True):
ac +=1
con.mysqlexecute(str(ac), 'ask'+str(ac), asktest)
time.sleep(1)
然后是mysql访问数据库的代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import MySQLdb
from subprocess import Popen,PIPE
# host='localhost',
# port = 3306,
# user='root',
# passwd='7668150Tt00',
# db ='sanguogame',
# CREATE TABLE `sanguogame`.`new_table` (
# `id` INT NOT NULL AUTO_INCREMENT,
# `name` VARCHAR(45) NOT NULL,
# `class` VARCHAR(45) NOT NULL,
# `age` VARCHAR(45) NOT NULL,
# PRIMARY KEY (`id`, `name`));
class mysqlobj():
def __init__(self,addr = 'localhost',port = 3306,usname = 'root',uspw = '123456Qq',defDB = 'test'):
self.mysqladdr = addr #mysql地址
self.mysqlport = port #mysql端口
self.mysqlusername = usname #mysql登陆用户名
self.mysqlpassword = uspw #mysql登陆密码
self.mysqlDefaleDB = defDB #mysql默认登陆数据库
self.connectManger = None #mysql连接管理器
self.mysqlcursor = None #mysql消息收发器
self._connectMysql() #连接mysql数据库
def _connectMysql(self):
self.connectManger = MySQLdb.connect(
host = self.mysqladdr,
port = self.mysqlport,
user = self.mysqlusername,
passwd = self.mysqlpassword,
db = self.mysqlDefaleDB,
charset="utf8"
)
self.mysqlcursor = self.connectManger.cursor()
#调用mysql命令
def execute(self,cmdstr):
if self.mysqlcursor:
return self.mysqlcursor.execute(cmdstr)
else:
return -999#mysql 未连接
def inPutDataWithSqlFile(self,sqlfilepath):
process = Popen('/usr/local/mysql/bin/mysql -h%s -P%s -u%s -p%s %s' %(self.mysqladdr, self.mysqlport, self.mysqlusername, self.mysqlpassword, self.mysqlDefaleDB), stdout=PIPE, stdin=PIPE, shell=True)
output = process.communicate('source '+sqlfilepath)
print output
多线程间等待与启动交互事件参考: http://www.jb51.net/article/63512.htm