python使用多线程快速把大量mysql数据导入elasticsearch

使用python多线程,运行时自定义线程数,自动计算每个线程处理的数据量,连接mysql读取数据,处理成需要的字段入到elasticsearch。
运行效果图:
dc2.gif

dxc1228.gif

10个线程 运行会在这里实时刷新,方便查看每个线程处理到的id数:

[root@localhost shw]# python put_album.py 
 {"0": "2105success ", "1": "196723success ", "2": "392557null", "3": "587819null", "4": "782519null", "5": "977482null", "6": "1172186null", "7": "1366897null", "8": "1561614null", "9": "1754368success "} "}

代码如下:

#!/usr/bin/python
#coding:utf-8
import requests,re,os,time,urllib,urllib2,random
import json,MySQLdb,sys,math,threading
reload(sys)
sys.setdefaultencoding('utf-8')

ES_CFG = {"host":"127.0.0.1","port":"9200","user":"elastic","password":"123456","index_name":"es_album","doc":"zhuanji"}
ES_OBJ = {}
ES_OBJ['album_url'] = "http://"+ES_CFG['host']+":"+str(ES_CFG['port']+"/"+ES_CFG['index_name']+'/'+ES_CFG['doc'])
MSG = {}
#数据库连接对象
def db_obj():
    return MySQLdb.connect('127.0.0.1','root','123456','dbname',port=3308,charset="utf8")

def api_post(url,data):
    jdata_str = json.dumps(data)
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko',"Content-Type": "application/json"}
    #print http_post('http://www.xxxx.com/api_pnews',jdata_str)
    res = requests.post(url,data=jdata_str,auth=(ES_CFG['user'],ES_CFG['password']),headers=headers)
    res.close()
    return res

def strip_tags(html):
    if html == None:
        return '';
    try:
        dr = re.compile(r'<([^>]+)>',re.S)
        html = dr.sub('',html)
        html = re.sub(r'[\n]+',r'', html, flags=re.S)
        return html.strip()
    except Exception as e:
        print e
        print html
        exit(0)

def pretreat(html):
    #"去标签"
    str_txt=strip_tags(html)
    str_txt = re.sub('\n|\t','',str_txt)
    str_txt = re.sub('"','',str_txt)
    str_txt = re.sub("'",'',str_txt)
    str_txt = re.sub(' ','',str_txt)
    str_txt = re.sub("\xa0",'',str_txt)
    str_txt = re.sub("\u3000",'',str_txt)
    return str_txt

def create_index(start,end,msg_id):
    db = db_obj()
    url = ES_OBJ['album_url']
        for i in range(start,end):
        time.sleep(0.01)
            cur = db.cursor()
            sql = "select id,name,keywords,desption from shwcms_album where id = %s"%i;
            cur.execute(sql)
            result = cur.fetchone()
            if result != None:
            data = {}
            data['itemid'] = result[0]
            data['album_name']=pretreat(result[1])
            data['keywords']=pretreat(result[2])
            data['desption']=pretreat(result[3])
            res = api_post(url,data)
            #print json.dumps(data)
            if res != None:
                MSG[msg_id] = (str(i)+'success ')
            else:
                MSG[msg_id] = str(i)+"fail"
            else:
                    MSG[msg_id] = str(i)+"null"
        db.close()

max_threading = 10 #定义100个线程
max_id = 1947072 #最大值
start_id = 1 #起始值
size = int(math.ceil((max_id-start_id+1)/max_threading))
#print size

for i in range(0,max_threading):
    MSG[i] = "";
        threading.Thread(target=create_index,args=(i*size,(i+1)*size,i)).start()

while True:
    sys.stdout.write("\r %s" %json.dumps(MSG))
        sys.stdout.flush()
        time.sleep(1)

标签: 多线程, elasticsearch

非特殊说明,本博所有文章均为博主原创。

最新文章

发表评论