使用python多线程,运行时自定义线程数,自动计算每个线程处理的数据量,连接mysql读取数据,处理成需要的字段入到elasticsearch。
运行效果图:
10个线程 运行会在这里实时刷新,方便查看每个线程处理到的id数:
[root@localhost shw]# python put_album.py
{"0": "2105success ", "1": "196723success ", "2": "392557null", "3": "587819null", "4": "782519null", "5": "977482null", "6": "1172186null", "7": "1366897null", "8": "1561614null", "9": "1754368success "} "}
代码如下:
#!/usr/bin/python
#coding:utf-8
import requests,re,os,time,urllib,urllib2,random
import json,MySQLdb,sys,math,threading
reload(sys)
sys.setdefaultencoding('utf-8')
ES_CFG = {"host":"127.0.0.1","port":"9200","user":"elastic","password":"123456","index_name":"es_album","doc":"zhuanji"}
ES_OBJ = {}
ES_OBJ['album_url'] = "http://"+ES_CFG['host']+":"+str(ES_CFG['port']+"/"+ES_CFG['index_name']+'/'+ES_CFG['doc'])
MSG = {}
#数据库连接对象
def db_obj():
return MySQLdb.connect('127.0.0.1','root','123456','dbname',port=3308,charset="utf8")
def api_post(url,data):
jdata_str = json.dumps(data)
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko',"Content-Type": "application/json"}
#print http_post('http://www.xxxx.com/api_pnews',jdata_str)
res = requests.post(url,data=jdata_str,auth=(ES_CFG['user'],ES_CFG['password']),headers=headers)
res.close()
return res
def strip_tags(html):
if html == None:
return '';
try:
dr = re.compile(r'<([^>]+)>',re.S)
html = dr.sub('',html)
html = re.sub(r'[\n]+',r'', html, flags=re.S)
return html.strip()
except Exception as e:
print e
print html
exit(0)
def pretreat(html):
#"去标签"
str_txt=strip_tags(html)
str_txt = re.sub('\n|\t','',str_txt)
str_txt = re.sub('"','',str_txt)
str_txt = re.sub("'",'',str_txt)
str_txt = re.sub(' ','',str_txt)
str_txt = re.sub("\xa0",'',str_txt)
str_txt = re.sub("\u3000",'',str_txt)
return str_txt
def create_index(start,end,msg_id):
db = db_obj()
url = ES_OBJ['album_url']
for i in range(start,end):
time.sleep(0.01)
cur = db.cursor()
sql = "select id,name,keywords,desption from shwcms_album where id = %s"%i;
cur.execute(sql)
result = cur.fetchone()
if result != None:
data = {}
data['itemid'] = result[0]
data['album_name']=pretreat(result[1])
data['keywords']=pretreat(result[2])
data['desption']=pretreat(result[3])
res = api_post(url,data)
#print json.dumps(data)
if res != None:
MSG[msg_id] = (str(i)+'success ')
else:
MSG[msg_id] = str(i)+"fail"
else:
MSG[msg_id] = str(i)+"null"
db.close()
max_threading = 10 #定义100个线程
max_id = 1947072 #最大值
start_id = 1 #起始值
size = int(math.ceil((max_id-start_id+1)/max_threading))
#print size
for i in range(0,max_threading):
MSG[i] = "";
threading.Thread(target=create_index,args=(i*size,(i+1)*size,i)).start()
while True:
sys.stdout.write("\r %s" %json.dumps(MSG))
sys.stdout.flush()
time.sleep(1)
标签: 多线程, elasticsearch
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:https://www.isres.com/default/412.html