感觉elasticsearch 的 phpclient 没有自己写的curl快,因此封装curl类。
<?php
/**
* Created by PhpStorm.
* User: shw
* Date: 2017/12/28
* Time: 10:37
*/
class Elasticsearch{
private $host = "192.168.0.186";
private $port = "9200";
private $user = "elastic";
private $password = "3Y~KyvQ1nkt~I7%PZU^3";
private $es = null;
public function __construct(){
header("Content-type: text/html; charset=utf-8");
if($this->es == null){
$this->es ="$this->user:$this->password@$this->host:$this->port";
}
}
public function curl_post($index_name,$type,$data){
$url = $this->es.'/'.$index_name.'/'.$type.'/_search';
$post_data = json_encode($data,JSON_UNESCAPED_UNICODE);
$header = array();
$header[] = 'Accept:application/json';
$header[] = 'Content-Type:application/json;charset=utf-8';
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS,$post_data);
curl_setopt($ch,CURLOPT_HTTPHEADER,$header);
$result = curl_exec($ch);
curl_close($ch);
return $result;
}
public function create_index_mapping($index_name,$doctype,$mapping){
$url = $this->es.'/'.$index_name.'/'.$doctype.'/_mapping?pretty';
$post_data = json_encode($mapping,JSON_UNESCAPED_UNICODE);
$header = array();
$header[] = 'Accept:application/json';
$header[] = 'Content-Type:application/json;charset=utf-8';
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS,$post_data);
curl_setopt($ch,CURLOPT_HTTPHEADER,$header);
$result = curl_exec($ch);
curl_close($ch);
return $result;
}
public function remove_index($index_name){
$url = $this->es.'/'.$index_name;
$post_data = array(
);
$header = array();
$header[] = 'Accept:application/json';
$header[] = 'Content-Type:application/json;charset=utf-8';
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_setopt($ch,CURLOPT_HTTPHEADER,$header);
$result = curl_exec($ch);
curl_close($ch);
return $result;
}
/**
* [create_index 创建索引库]
* @param [string] $index_name [名称]
* @return [json] [结果]
*/
public function create_index($index_name){
$url = $this->es.'/'.$index_name;
$post_data = array(
);
$header = array();
$header[] = 'Accept:application/json';
$header[] = 'Content-Type:application/json;charset=utf-8';
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "PUT");
$result = curl_exec($ch);
curl_close($ch);
return $result;
}
/**
* [insert 添加索引数据 ]
* @param [type] $index_name [description]
* @param [type] $type [description]
* @param [type] $data [description]
* @param [type] $_id [description]
* @return [type] [description]
*/
public function insert($index_name,$type,$data,$_id){
$url = $this->es.'/'.$index_name.'/'.$type.'/'.$_id;
$ch = curl_init();
$header = array();
$header[] = 'Accept:application/json';
$header[] = 'Content-Type:application/json;charset=utf-8';
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "PUT");
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data,JSON_UNESCAPED_UNICODE));
curl_setopt($ch,CURLOPT_HTTPHEADER,$header);
$result = curl_exec($ch);
curl_close($ch);
return $result;
}
/**
* [update 修改索引数据]
* @param [type] $index_name [description]
* @param [type] $type [description]
* @param [type] $data [description]
* @param [type] $_id [description]
* @return [type] [description]
*/
public function update($index_name,$type,$data,$_id){
$url = $this->es.'/'.$index_name.'/'.$type.'/'.$_id.'/_update';
$ch = curl_init();
$header = array();
$header[] = 'Accept:application/json';
$header[] = 'Content-Type:application/json;charset=utf-8';
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data,JSON_UNESCAPED_UNICODE));
curl_setopt($ch,CURLOPT_HTTPHEADER,$header);
$result = curl_exec($ch);
curl_close($ch);
return $result;
}
public function delete($index_name,$type,$_id){
$url = $this->es.'/'.$index_name.'/'.$type.'/'.$_id;
$post_data = array(
);
$header = array();
$header[] = 'Accept:application/json';
$header[] = 'Content-Type:application/json;charset=utf-8';
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_setopt($ch,CURLOPT_HTTPHEADER,$header);
$result = curl_exec($ch);
curl_close($ch);
return $result;
}
/**
* [搜索]
* @param [string] $index_name [索引名称]
* @param [array] $data [条件]
* @param [int] $size [返回行数]
* @param [array] $sort [排序]
* @return [json] [结果]
*/
public function search($index_name,$doctype,$data){
return $this->curl_post($index_name,$doctype,$data);
}
//match_phrase完全匹配不分词 match任意一词匹配
public function demo(){
$select = array(
"bool" => array(
"should"=>array(
"match" => array('title'=>'电视机')
)
)
);
$data = array(
'size' => 10,
'query' => $select,
'sort'=> ['_id'=>['order'=>'asc']]
);
print_r(json_decode($this->curl_post('chinacms','cms_news_main',$data),true));
}
public function demo2(){
$select = array(
"bool" => array(
"must" => array(
"match_phrase" => array('title'=>'电视机'),
),
"filter" => array(
"terms" => array(
"status" => [2,3]
)
)
)
);
$data = array(
'size' => 10,
'query' => $select,
'sort'=> ['_id'=>['order'=>'asc']]
);
print_r(json_decode($this->curl_post('chinacms','cms_news_main',$data),true));
}
public function demo3(){
$select = array(
"bool" => array(
"must" => array(
array("match_phrase" => array('title'=>'电视机')),
array("term" => array("hangye_id"=>122)),
//array("term" => array("status"=>3))
array("terms" => array("status"=>[2,3]))
)
)
);
$data = array(
'size' => 10,
'query' => $select,
'sort'=> ['_id'=>['order'=>'asc']]
);
print_r(json_decode($this->curl_post('chinacms','cms_news_main',$data),true));
}
public function get_by_itemid($itemid){
$select = array(
"bool" => array(
"must" => array(
array("term" => array("itemid"=>$itemid))
)
)
);
$data = array(
'size' => 10,
'query' => $select,
'sort'=> ['_id'=>['order'=>'asc']]
);
print_r(json_decode($this->curl_post('chinacms','cms_news_main',$data),true));
}
}
$es = new Elasticsearch();
//$es->remove_index('chinacms');
//创建一个索引chinacms
//$result = $es->create_index('chinacms');
//设置字段结构
//itemid,hangye_id,status,userid,title,sources,class
// $mapping = array(
// 'cms_news_main' => array(
// "properties" => array(
// "itemid" => ['type'=>'integer'],
// "hangye_id" => ['type'=>'integer'],
// "status" => ['type'=>'integer'],
// "userid" => ['type'=>'integer'],
// "title" => ['type'=>'text'],
// "sources" => ['type'=>'text'],
// "class" => ['type'=>'text']
// )
// )
// );
// $result = $es->create_index_mapping('chinacms','cms_news_main',$mapping);
//return { "acknowledged" : true }
//-----------------------------------------------------------------------
//
//创建产品索引 cms_sell_main
//itemid,title,hangye_id,status,sources,catid,userid
// $sell_mapping = array(
// 'cms_sell_main' => array(
// "properties" => array(
// "itemid" => ['type'=>'integer'],
// "hangye_id" => ['type'=>'integer'],
// "status" => ['type'=>'integer'],
// "userid" => ['type'=>'integer'],
// "title" => ['type'=>'text'],
// "sources" => ['type'=>'text'],
// "catid" => ['type'=>'integer']
// )
// )
// );
// $es->create_index('product');
// $result = $es->create_index_mapping('product','cms_sell_main',$sell_mapping);
//创建图库索引库
//
//
$photo_mapping = array(
'cms_photo_main' => array(
"properties" => array(
"itemid" => ['type'=>'integer'],
"hangye_id" => ['type'=>'integer'],
"status" => ['type'=>'integer'],
"userid" => ['type'=>'integer'],
"title" => ['type'=>'text'],
"sources" => ['type'=>'text'],
"catid" => ['type'=>'integer'],
"brandusername" => ['type'=>'text'],
"lanmu" => ['type'=>'text']
)
)
);
//$result = $es->remove_index('photo');
$es->create_index('photo');
$result = $es->create_index_mapping('photo','cms_photo_main',$photo_mapping);
//$es->demo3();
//$result = $es->create_index('hq');
/**
$doctype = "user";
$id = 15;
$data = array(
'username'=>'李绮婷',
'userage'=>16,
'usertel'=>'13779531261'
);
$result = $es->insert('hq',$doctype,$data,$id);
**/
//
//$result = $es->delete('hq','user',11);
//
/**
$doctype = "user";
$id = 15;
$data = array(
'doc' => array(
//'username'=>'李绮婷',
//'userage'=>16,
'usertel'=>'123456789'
)
);
$result = $es->update('hq',$doctype,$data,$id);
**/
//$result = $es->get_by_itemid(1014287);
print_r($result);
#20180710补充,需要注意insert方法和update方法传的数组不一样 ,如果使用了insert方法一要样的数组
#需要调整update方法为 如下
/**
* [update 修改索引数据]
* @param [type] $index_name [description]
* @param [type] $type [description]
* @param [type] $data [description]
* @param [type] $_id [description]
* @return [type] [description]
*/
public function update($index_name,$type,$data,$_id){
$data = array(
'doc'=>$data
);
$url = $this->es.'/'.$index_name.'/'.$type.'/'.$_id.'/_update';
$ch = curl_init();
$header = array();
$header[] = 'Accept:application/json';
$header[] = 'Content-Type:application/json;charset=utf-8';
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data,JSON_UNESCAPED_UNICODE));
curl_setopt($ch,CURLOPT_HTTPHEADER,$header);
$result = curl_exec($ch);
curl_close($ch);
return $result;
}
python初始化索引数据
#!/usr/bin/python
#coding:utf-8
import urllib2
import MySQLdb
import threading
import sys
import re
import os
import json
from elasticsearch import Elasticsearch
reload(sys)
sys.setdefaultencoding('utf-8')
def db_obj():
conf = ['localhost','root','pwd','chinacms4']
return MySQLdb.connect(conf[0],conf[1],conf[2],conf[3],charset="utf8")
es = Elasticsearch( "127.0.0.1:9200",http_auth=('elastic', '3Y~KyvQ1nkt~I7%PZU^3'))
def es_put(index_name,_id,type_name,data):
global es
try:
es.index( index=index_name,doc_type=type_name,id=_id,body=data )
print str(_id)+"add success!"
except Exception,e:
print repr(e)
def http_put(url,data):
request = urllib2.Request(url,data)
request.add_header('User-Agent', "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36")
try:
request = urllib2.urlopen(request)
rv = request.read()
request.close()
return rv
except Exception,e:
print "............................................................................................"
print url
print data
print e
print "............................................................................................"
def strip_tags(html):
#过滤
dr = re.compile(r'<([^>]+)>',re.S)
html = dr.sub('',html)
html = re.sub(r'[\n]+',r'', html, flags=re.S)
html = html.replace('\t','').replace('\n','').replace(' ','')
html = html.replace(unichr(8206).encode('utf-8'), '') \
.replace(unichr(8207).encode('utf-8'), '') \
.replace(unichr(8205).encode('utf-8'), '') \
.replace(unichr(8204).encode('utf-8'), '') \
.replace(unichr(8234).encode('utf-8'), '') \
.replace(unichr(8237).encode('utf-8'), '') \
.replace(unichr(8238).encode('utf-8'), '') \
.replace(unichr(8236).encode('utf-8'), '') \
.replace(unichr(8302).encode('utf-8'), '') \
.replace(unichr(8303).encode('utf-8'), '') \
.replace(unichr(8299).encode('utf-8'), '') \
.replace(unichr(8298).encode('utf-8'), '') \
.replace(unichr(8301).encode('utf-8'), '') \
.replace(unichr(8300).encode('utf-8'), '') \
.replace(unichr(30).encode('utf-8'), '') \
.replace(unichr(13).encode('utf-8'), '') \
.replace(unichr(299).encode('utf-8'), '') \
.replace(unichr(230).encode('utf-8'), '') \
.replace(unichr(228).encode('utf-8'), '') \
.replace(unichr(102).encode('utf-8'), '') \
.replace(unichr(232).encode('utf-8'), '') \
.replace(unichr(233).encode('utf-8'), '') \
.replace(unichr(110).encode('utf-8'), '') \
.replace(unichr(229).encode('utf-8'), '') \
.replace(unichr(31).encode('utf-8'), '')
html = s = re.compile('[\\x00-\\x08\\x0b-\\x0c\\x0e-\\x1f]').sub('',html)
return html.strip()
def create_index(start,end):
f = open('/scripts/news_title.log.txt', 'w')
for i in range(start,end):
db = db_obj()
cur = db.cursor()
sql = "select itemid,hangye_id,status,userid,title,sources,class from cms_news_main where itemid = %s"%i;
cur.execute(sql)
result = cur.fetchone()
if result != None:
data = '{"itemid":"'+str(result[0])+'","hangye_id":"'+str(result[1])+'","status":"'+str(result[2])+'","userid":"'+str(result[3])+'","title":"'+str(result[4])+'","sources":"'+str(result[5])+'","class":"'+str(result[6])+'"}'
try:
es.index(index='caijiqi_news_title', doc_type='title', refresh=True, body=data,id=result[0])
print str(i)+">>>>>>>>>>>>>>>>> 添加成功"
except Exception,e:
print e
print >> f,str(i)+"\r\n"
else:
print "itemid"+str(i)+"无数据"
db.close()
f.close()
#data = '{"content":"五千年的风和雨啊"}'
#es_put("chinacms4",1,"content",data)
#es.index(index='blog', doc_type='blog_content', refresh=True, body=data,id=1)
#print data
#es.index(index="chinacms4",doc_type="content",data,id=result[0])
#创建300个线程
for i in range(0,100):
threading.Thread(target=create_index,args=(i*10000,(i+1)*10000)).start()
#for i in range(0,30):
# threading.Thread(target=create_index,args=(i*100000,(i+1)*100000)).start()
标签: elasticsearch
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:https://www.isres.com/linux/84.html