python多线程post和插件创建Elasticsearch索引

以下内容已过时,请参考最新博客文章

python使用多线程快速把大量mysql数据导入elasticsearch

以下内容已过时,请参考以上最新博客文章

1.基于网络http post


#!/usr/bin/python
#coding:utf-8
import urllib2
import MySQLdb
import threading
import sys
import re
import os
from elasticsearch import Elasticsearch
reload(sys) 
sys.setdefaultencoding('utf-8')

-------------  
#conf = ['192.168.0.107','root','pwd','chinacms4']
#db = MySQLdb.connect(conf[0],conf[1],conf[2],conf[3],charset="utf8")

def db_obj():
    conf = ['192.168.0.107','root','pwd','chinacms4']
    return MySQLdb.connect(conf[0],conf[1],conf[2],conf[3],charset="utf8")
es = Elasticsearch( "192.168.0.107:9200")

#使用es python插件 添加索引数据
def es_put(index_name,_id,type_name,data):
    global es
    try:
        es.index( index=index_name,doc_type=type_name,id=_id,body=data )
        print str(_id)+"add success!"
    except Exception,e:
        print repr(e)

#http_post 数据
#param string url
#param json data

def http_put(url,data):
    request = urllib2.Request(url,data)
    request.add_header('User-Agent', "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36")
    try:
        request = urllib2.urlopen(request)
        rv = request.read()
        request.close()
        return rv
    except Exception,e:
        print "............................................................................................"
        print url
        print data
        print e
        print "............................................................................................"


def strip_tags(html):
    #过滤
    dr = re.compile(r'<([^>]+)>',re.S)
    html = dr.sub('',html)
    html = re.sub(r'[\n]+',r'', html, flags=re.S)
    html = html.replace('\t','').replace('\n','').replace(' ','')
    html = html.replace(unichr(8206).encode('utf-8'), '') \
        .replace(unichr(8207).encode('utf-8'), '') \
        .replace(unichr(8205).encode('utf-8'), '') \
        .replace(unichr(8204).encode('utf-8'), '') \
        .replace(unichr(8234).encode('utf-8'), '') \
        .replace(unichr(8237).encode('utf-8'), '') \
        .replace(unichr(8238).encode('utf-8'), '') \
        .replace(unichr(8236).encode('utf-8'), '') \
        .replace(unichr(8302).encode('utf-8'), '') \
        .replace(unichr(8303).encode('utf-8'), '') \
        .replace(unichr(8299).encode('utf-8'), '') \
        .replace(unichr(8298).encode('utf-8'), '') \
        .replace(unichr(8301).encode('utf-8'), '') \
        .replace(unichr(8300).encode('utf-8'), '') \
        .replace(unichr(30).encode('utf-8'), '') \
    .replace(unichr(13).encode('utf-8'), '') \
    .replace(unichr(299).encode('utf-8'), '') \
    .replace(unichr(230).encode('utf-8'), '') \
    .replace(unichr(228).encode('utf-8'), '') \
    .replace(unichr(102).encode('utf-8'), '') \
    .replace(unichr(232).encode('utf-8'), '') \
    .replace(unichr(233).encode('utf-8'), '') \
    .replace(unichr(110).encode('utf-8'), '') \
    .replace(unichr(229).encode('utf-8'), '') \
        .replace(unichr(31).encode('utf-8'), '') 
    html = s = re.compile('[\\x00-\\x08\\x0b-\\x0c\\x0e-\\x1f]').sub('',html) 
    return html.strip()


def create_index(start,end):
    for i in range(start,end):
        db = db_obj()
        cur = db.cursor()
        sql = "select itemid,content from cms_news_data where itemid = %s"%i;
        cur.execute(sql)
        result = cur.fetchone()
        if result != None:
            #print str(result[0])+" "+result[3]
            #api_url = "http://192.168.0.107:9200/chinacms4/data/"+str(result[0])
            content = strip_tags(str(result[1]))
            content = content.replace('"','\"')
            content = content.replace("'","\'")
            data = '{"content":"'+content+'"}'
            data = unicode(data).encode("utf-8")
            #print data
            #print http_put(api_url,data)
            es_put("chinacms4",result[0],"content",data)        
        else:
            print str(i)+"null"
        db.close()

#创建300个线程

#for i in range(0,300):
#    threading.Thread(target=create_index,args=(i*10000,(i+1)*10000)).start()

for i in range(0,30):
        threading.Thread(target=create_index,args=(i*100000,(i+1)*100000)).start()

#threading.Thread(target=create_index,args=(1,500000)).start()
#threading.Thread(target=create_index,args=(500000,1000000)).start()
#threading.Thread(target=create_index,args=(1000000,1500000)).start()
#threading.Thread(target=create_index,args=(1500000,2000000)).start()
#threading.Thread(target=create_index,args=(2500000,3000000)).start()


#创建索引

#创建索引的api
#api_url = "http://192.168.0.107:9200/test/user/9"
#发布测试数据
#print api_url
#data = '{"username":"海莉·贝内特","age":28}'
#result = http_put(api_url,data)
#print result
  1. 基于python Elasticsearch插件

#!/usr/bin/python
#coding:utf-8
import urllib2
import MySQLdb
import threading
import sys
import re
import os
reload(sys) 
sys.setdefaultencoding('utf-8')

#conf = ['192.168.0.107','root','pwd','chinacms4']
#db = MySQLdb.connect(conf[0],conf[1],conf[2],conf[3],charset="utf8")

def db_obj():
    conf = ['192.168.0.107','root','pwd','chinacms4']
    return MySQLdb.connect(conf[0],conf[1],conf[2],conf[3],charset="utf8")

#http_post 数据
#param string url
#param json data

def http_put(url,data):
    request = urllib2.Request(url,data)
    request.add_header('User-Agent', "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36")
    try:
        request = urllib2.urlopen(request)
        rv = request.read()
        request.close()
        return rv
    except Exception,e:
        print "............................................................................................"
        print url
        print data
        print e
        print "............................................................................................"


def strip_tags(html):
    #过滤
    dr = re.compile(r'<([^>]+)>',re.S)
    html = dr.sub('',html)
    html = re.sub(r'[\n]+',r'', html, flags=re.S)
    return html.strip()


def create_index(start,end):
    for i in range(start,end):
        db = db_obj()
        cur = db.cursor()
        sql = "select itemid,content from cms_news_data where itemid = %s"%i;
        cur.execute(sql)
        result = cur.fetchone()
        if result != None:
            #print str(result[0])+" "+result[3]
            api_url = "http://192.168.0.107:9200/chinacms4/data/"+str(result[0])
            content = strip_tags(str(result[1]))
            content = content.replace('"','\"')
            content = content.replace("'","\'")
            data = '{"content":"'+content+'"}'
            #print data
            print http_put(api_url,data)        
        else:
            print str(i)+"null"
        db.close()

#创建20个线程

threading.Thread(target=create_index,args=(1,500000)).start()
threading.Thread(target=create_index,args=(500000,1000000)).start()
threading.Thread(target=create_index,args=(1000000,1500000)).start()
threading.Thread(target=create_index,args=(1500000,2000000)).start()
threading.Thread(target=create_index,args=(2500000,3000000)).start()


#创建索引

#创建索引的api
#api_url = "http://192.168.0.107:9200/test/user/9"
#发布测试数据
#print api_url
#data = '{"username":"海莉·贝内特","age":28}'
#result = http_put(api_url,data)
#print result

标签: 多线程, elasticsearch

非特殊说明,本博所有文章均为博主原创。

最新文章

发表评论