以下内容已过时,请参考最新博客文章
python使用多线程快速把大量mysql数据导入elasticsearch
以下内容已过时,请参考以上最新博客文章
1.基于网络http post
#!/usr/bin/python
#coding:utf-8
import urllib2
import MySQLdb
import threading
import sys
import re
import os
from elasticsearch import Elasticsearch
reload(sys)
sys.setdefaultencoding('utf-8')
-------------
#conf = ['192.168.0.107','root','pwd','chinacms4']
#db = MySQLdb.connect(conf[0],conf[1],conf[2],conf[3],charset="utf8")
def db_obj():
conf = ['192.168.0.107','root','pwd','chinacms4']
return MySQLdb.connect(conf[0],conf[1],conf[2],conf[3],charset="utf8")
es = Elasticsearch( "192.168.0.107:9200")
#使用es python插件 添加索引数据
def es_put(index_name,_id,type_name,data):
global es
try:
es.index( index=index_name,doc_type=type_name,id=_id,body=data )
print str(_id)+"add success!"
except Exception,e:
print repr(e)
#http_post 数据
#param string url
#param json data
def http_put(url,data):
request = urllib2.Request(url,data)
request.add_header('User-Agent', "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36")
try:
request = urllib2.urlopen(request)
rv = request.read()
request.close()
return rv
except Exception,e:
print "............................................................................................"
print url
print data
print e
print "............................................................................................"
def strip_tags(html):
#过滤
dr = re.compile(r'<([^>]+)>',re.S)
html = dr.sub('',html)
html = re.sub(r'[\n]+',r'', html, flags=re.S)
html = html.replace('\t','').replace('\n','').replace(' ','')
html = html.replace(unichr(8206).encode('utf-8'), '') \
.replace(unichr(8207).encode('utf-8'), '') \
.replace(unichr(8205).encode('utf-8'), '') \
.replace(unichr(8204).encode('utf-8'), '') \
.replace(unichr(8234).encode('utf-8'), '') \
.replace(unichr(8237).encode('utf-8'), '') \
.replace(unichr(8238).encode('utf-8'), '') \
.replace(unichr(8236).encode('utf-8'), '') \
.replace(unichr(8302).encode('utf-8'), '') \
.replace(unichr(8303).encode('utf-8'), '') \
.replace(unichr(8299).encode('utf-8'), '') \
.replace(unichr(8298).encode('utf-8'), '') \
.replace(unichr(8301).encode('utf-8'), '') \
.replace(unichr(8300).encode('utf-8'), '') \
.replace(unichr(30).encode('utf-8'), '') \
.replace(unichr(13).encode('utf-8'), '') \
.replace(unichr(299).encode('utf-8'), '') \
.replace(unichr(230).encode('utf-8'), '') \
.replace(unichr(228).encode('utf-8'), '') \
.replace(unichr(102).encode('utf-8'), '') \
.replace(unichr(232).encode('utf-8'), '') \
.replace(unichr(233).encode('utf-8'), '') \
.replace(unichr(110).encode('utf-8'), '') \
.replace(unichr(229).encode('utf-8'), '') \
.replace(unichr(31).encode('utf-8'), '')
html = s = re.compile('[\\x00-\\x08\\x0b-\\x0c\\x0e-\\x1f]').sub('',html)
return html.strip()
def create_index(start,end):
for i in range(start,end):
db = db_obj()
cur = db.cursor()
sql = "select itemid,content from cms_news_data where itemid = %s"%i;
cur.execute(sql)
result = cur.fetchone()
if result != None:
#print str(result[0])+" "+result[3]
#api_url = "http://192.168.0.107:9200/chinacms4/data/"+str(result[0])
content = strip_tags(str(result[1]))
content = content.replace('"','\"')
content = content.replace("'","\'")
data = '{"content":"'+content+'"}'
data = unicode(data).encode("utf-8")
#print data
#print http_put(api_url,data)
es_put("chinacms4",result[0],"content",data)
else:
print str(i)+"null"
db.close()
#创建300个线程
#for i in range(0,300):
# threading.Thread(target=create_index,args=(i*10000,(i+1)*10000)).start()
for i in range(0,30):
threading.Thread(target=create_index,args=(i*100000,(i+1)*100000)).start()
#threading.Thread(target=create_index,args=(1,500000)).start()
#threading.Thread(target=create_index,args=(500000,1000000)).start()
#threading.Thread(target=create_index,args=(1000000,1500000)).start()
#threading.Thread(target=create_index,args=(1500000,2000000)).start()
#threading.Thread(target=create_index,args=(2500000,3000000)).start()
#创建索引
#创建索引的api
#api_url = "http://192.168.0.107:9200/test/user/9"
#发布测试数据
#print api_url
#data = '{"username":"海莉·贝内特","age":28}'
#result = http_put(api_url,data)
#print result
- 基于python Elasticsearch插件
#!/usr/bin/python
#coding:utf-8
import urllib2
import MySQLdb
import threading
import sys
import re
import os
reload(sys)
sys.setdefaultencoding('utf-8')
#conf = ['192.168.0.107','root','pwd','chinacms4']
#db = MySQLdb.connect(conf[0],conf[1],conf[2],conf[3],charset="utf8")
def db_obj():
conf = ['192.168.0.107','root','pwd','chinacms4']
return MySQLdb.connect(conf[0],conf[1],conf[2],conf[3],charset="utf8")
#http_post 数据
#param string url
#param json data
def http_put(url,data):
request = urllib2.Request(url,data)
request.add_header('User-Agent', "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36")
try:
request = urllib2.urlopen(request)
rv = request.read()
request.close()
return rv
except Exception,e:
print "............................................................................................"
print url
print data
print e
print "............................................................................................"
def strip_tags(html):
#过滤
dr = re.compile(r'<([^>]+)>',re.S)
html = dr.sub('',html)
html = re.sub(r'[\n]+',r'', html, flags=re.S)
return html.strip()
def create_index(start,end):
for i in range(start,end):
db = db_obj()
cur = db.cursor()
sql = "select itemid,content from cms_news_data where itemid = %s"%i;
cur.execute(sql)
result = cur.fetchone()
if result != None:
#print str(result[0])+" "+result[3]
api_url = "http://192.168.0.107:9200/chinacms4/data/"+str(result[0])
content = strip_tags(str(result[1]))
content = content.replace('"','\"')
content = content.replace("'","\'")
data = '{"content":"'+content+'"}'
#print data
print http_put(api_url,data)
else:
print str(i)+"null"
db.close()
#创建20个线程
threading.Thread(target=create_index,args=(1,500000)).start()
threading.Thread(target=create_index,args=(500000,1000000)).start()
threading.Thread(target=create_index,args=(1000000,1500000)).start()
threading.Thread(target=create_index,args=(1500000,2000000)).start()
threading.Thread(target=create_index,args=(2500000,3000000)).start()
#创建索引
#创建索引的api
#api_url = "http://192.168.0.107:9200/test/user/9"
#发布测试数据
#print api_url
#data = '{"username":"海莉·贝内特","age":28}'
#result = http_put(api_url,data)
#print result
标签: 多线程, elasticsearch
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:https://www.isres.com/linux/64.html