感觉elasticsearch 的 phpclient 没有自己写的curl快,因此封装curl类。

<?php
/**
 * Created by PhpStorm.
 * User: shw
 * Date: 2017/12/28
 * Time: 10:37
 */
class Elasticsearch{
    private $host = "192.168.0.186";
    private $port = "9200";
    private $user = "elastic";
    private $password = "3Y~KyvQ1nkt~I7%PZU^3";
    private $es = null;
    public function __construct(){
        header("Content-type: text/html; charset=utf-8");
        if($this->es == null){
            $this->es ="$this->user:$this->password@$this->host:$this->port";
        }
    }
    public function curl_post($index_name,$type,$data){
        $url = $this->es.'/'.$index_name.'/'.$type.'/_search';
        $post_data = json_encode($data,JSON_UNESCAPED_UNICODE);
        $header = array();
        $header[] = 'Accept:application/json';
        $header[] = 'Content-Type:application/json;charset=utf-8';
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($ch, CURLOPT_POST, 1);
        curl_setopt($ch, CURLOPT_POSTFIELDS,$post_data);
        curl_setopt($ch,CURLOPT_HTTPHEADER,$header);
        $result = curl_exec($ch);
        curl_close($ch);
        return $result;

    }

    public function create_index_mapping($index_name,$doctype,$mapping){
        $url = $this->es.'/'.$index_name.'/'.$doctype.'/_mapping?pretty';
        $post_data = json_encode($mapping,JSON_UNESCAPED_UNICODE);
        $header = array();
        $header[] = 'Accept:application/json';
        $header[] = 'Content-Type:application/json;charset=utf-8';
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($ch, CURLOPT_POST, 1);
        curl_setopt($ch, CURLOPT_POSTFIELDS,$post_data);
        curl_setopt($ch,CURLOPT_HTTPHEADER,$header);
        $result = curl_exec($ch);
        curl_close($ch);
        return $result;
    }

    public function remove_index($index_name){
        $url = $this->es.'/'.$index_name;
        $post_data = array(
        );
        $header = array();
        $header[] = 'Accept:application/json';
        $header[] = 'Content-Type:application/json;charset=utf-8';

        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "DELETE");
        curl_setopt($ch,CURLOPT_HTTPHEADER,$header);
        $result = curl_exec($ch);
        curl_close($ch);
        return $result;
    }

    /**
     * [create_index 创建索引库]
     * @param  [string] $index_name [名称]
     * @return [json]             [结果]
     */
    public function create_index($index_name){
        $url = $this->es.'/'.$index_name;
        $post_data = array(
        );
        $header = array();
        $header[] = 'Accept:application/json';
        $header[] = 'Content-Type:application/json;charset=utf-8';

        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "PUT");
        $result = curl_exec($ch);
        curl_close($ch);
        return $result;
    }

    /**
     * [insert 添加索引数据 ]
     * @param  [type] $index_name [description]
     * @param  [type] $type       [description]
     * @param  [type] $data       [description]
     * @param  [type] $_id        [description]
     * @return [type]             [description]
     */
    public function insert($index_name,$type,$data,$_id){
        $url = $this->es.'/'.$index_name.'/'.$type.'/'.$_id;
        $ch = curl_init();
        $header = array();
        $header[] = 'Accept:application/json';
        $header[] = 'Content-Type:application/json;charset=utf-8';
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "PUT");
        curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data,JSON_UNESCAPED_UNICODE));
        curl_setopt($ch,CURLOPT_HTTPHEADER,$header);
        $result = curl_exec($ch);
        curl_close($ch);
        return $result;
    }

    /**
     * [update 修改索引数据]
     * @param  [type] $index_name [description]
     * @param  [type] $type       [description]
     * @param  [type] $data       [description]
     * @param  [type] $_id        [description]
     * @return [type]             [description]
     */
    public function update($index_name,$type,$data,$_id){
        $url = $this->es.'/'.$index_name.'/'.$type.'/'.$_id.'/_update';
        $ch = curl_init();
        $header = array();
        $header[] = 'Accept:application/json';
        $header[] = 'Content-Type:application/json;charset=utf-8';
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($ch, CURLOPT_POST, 1);
        curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data,JSON_UNESCAPED_UNICODE));
        curl_setopt($ch,CURLOPT_HTTPHEADER,$header);
        $result = curl_exec($ch);
        curl_close($ch);
        return $result;
    }

    public function delete($index_name,$type,$_id){
        $url = $this->es.'/'.$index_name.'/'.$type.'/'.$_id;
        $post_data = array(
        );
        $header = array();
        $header[] = 'Accept:application/json';
        $header[] = 'Content-Type:application/json;charset=utf-8';

        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "DELETE");
        curl_setopt($ch,CURLOPT_HTTPHEADER,$header);
        $result = curl_exec($ch);
        curl_close($ch);
        return $result;
    }

    /**
     * [搜索]
     * @param  [string] $index_name [索引名称]
     * @param  [array] $data       [条件]
     * @param  [int] $size       [返回行数]
     * @param  [array] $sort       [排序]
     * @return [json]             [结果]
     */
    public function search($index_name,$doctype,$data){
        return $this->curl_post($index_name,$doctype,$data);
    }


    //match_phrase完全匹配不分词  match任意一词匹配
    public function demo(){
        $select = array(
                "bool" => array(
                    "should"=>array(
                        "match" => array('title'=>'电视机')
                    )
                )
        );
        $data = array(
            'size' => 10,
            'query' => $select,
            'sort'=> ['_id'=>['order'=>'asc']]
        ); 
        print_r(json_decode($this->curl_post('chinacms','cms_news_main',$data),true));
    }

    public function demo2(){
        $select = array(
                "bool" => array(
                    "must" => array(
                        "match_phrase" => array('title'=>'电视机'),
                    ),

                    "filter" => array(
                        "terms" => array(
                            "status" => [2,3]
                        )
                    )
                )
        );
        $data = array(
            'size' => 10,
            'query' => $select,
            'sort'=> ['_id'=>['order'=>'asc']]
        ); 
        print_r(json_decode($this->curl_post('chinacms','cms_news_main',$data),true));
    }

    public function demo3(){
        $select = array(
                    "bool" => array(
                        "must" => array(
                            array("match_phrase" => array('title'=>'电视机')),
                            array("term" => array("hangye_id"=>122)),
                            //array("term" => array("status"=>3))
                            array("terms" => array("status"=>[2,3]))
                        )
                    )
        );
        $data = array(
            'size' => 10,
            'query' => $select,
            'sort'=> ['_id'=>['order'=>'asc']]
        ); 
        print_r(json_decode($this->curl_post('chinacms','cms_news_main',$data),true));
    }

    public function get_by_itemid($itemid){
        $select = array(
                    "bool" => array(
                        "must" => array(
                            array("term" => array("itemid"=>$itemid))
                        )
                    )
        );
        $data = array(
            'size' => 10,
            'query' => $select,
            'sort'=> ['_id'=>['order'=>'asc']]
        ); 
        print_r(json_decode($this->curl_post('chinacms','cms_news_main',$data),true));
    }
}

$es = new Elasticsearch();
//$es->remove_index('chinacms');

//创建一个索引chinacms
//$result = $es->create_index('chinacms');
//设置字段结构
//itemid,hangye_id,status,userid,title,sources,class
// $mapping = array(
//         'cms_news_main' => array(
//                 "properties" => array(
//                         "itemid" => ['type'=>'integer'],
//                         "hangye_id" => ['type'=>'integer'],
//                         "status" => ['type'=>'integer'],
//                         "userid" => ['type'=>'integer'],
//                         "title" => ['type'=>'text'],
//                         "sources" => ['type'=>'text'],
//                         "class" => ['type'=>'text']
//                     )
//             )
//     );
// $result = $es->create_index_mapping('chinacms','cms_news_main',$mapping);
//return { "acknowledged" : true }
//-----------------------------------------------------------------------
//
//创建产品索引 cms_sell_main
//itemid,title,hangye_id,status,sources,catid,userid

// $sell_mapping = array(
//     'cms_sell_main' => array(
//             "properties" => array(
//                     "itemid" => ['type'=>'integer'],
//                     "hangye_id" => ['type'=>'integer'],
//                     "status" => ['type'=>'integer'],
//                     "userid" => ['type'=>'integer'],
//                     "title" => ['type'=>'text'],
//                     "sources" => ['type'=>'text'],
//                     "catid" => ['type'=>'integer']
//                 )
//         )
//     );

// $es->create_index('product');
// $result = $es->create_index_mapping('product','cms_sell_main',$sell_mapping);

//创建图库索引库
//
//

$photo_mapping = array(
    'cms_photo_main' => array(
            "properties" => array(
                    "itemid" => ['type'=>'integer'],
                    "hangye_id" => ['type'=>'integer'],
                    "status" => ['type'=>'integer'],
                    "userid" => ['type'=>'integer'],
                    "title" => ['type'=>'text'],
                    "sources" => ['type'=>'text'],
                    "catid" => ['type'=>'integer'],
                    "brandusername" => ['type'=>'text'],
                    "lanmu" => ['type'=>'text']
                )
        )
    );
//$result = $es->remove_index('photo');
$es->create_index('photo');
$result = $es->create_index_mapping('photo','cms_photo_main',$photo_mapping);

//$es->demo3();
//$result = $es->create_index('hq');
/**
$doctype = "user";
$id = 15;
$data = array(
        'username'=>'李绮婷',
        'userage'=>16,
        'usertel'=>'13779531261'
    );
$result = $es->insert('hq',$doctype,$data,$id);
**/
//
//$result = $es->delete('hq','user',11);
//
/**
$doctype = "user";
$id = 15;
$data = array(
        'doc' => array(
            //'username'=>'李绮婷',
            //'userage'=>16,
            'usertel'=>'123456789'
        )
    );
$result = $es->update('hq',$doctype,$data,$id);
**/

//$result = $es->get_by_itemid(1014287);
print_r($result);

#20180710补充,需要注意insert方法和update方法传的数组不一样 ,如果使用了insert方法一要样的数组
#需要调整update方法为 如下
/**
     * [update 修改索引数据]
     * @param  [type] $index_name [description]
     * @param  [type] $type       [description]
     * @param  [type] $data       [description]
     * @param  [type] $_id        [description]
     * @return [type]             [description]
     */
    public function update($index_name,$type,$data,$_id){
        $data  = array(
            'doc'=>$data
        );
        $url = $this->es.'/'.$index_name.'/'.$type.'/'.$_id.'/_update';
        $ch = curl_init();
        $header = array();
        $header[] = 'Accept:application/json';
        $header[] = 'Content-Type:application/json;charset=utf-8';
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($ch, CURLOPT_POST, 1);
        curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data,JSON_UNESCAPED_UNICODE));
        curl_setopt($ch,CURLOPT_HTTPHEADER,$header);
        $result = curl_exec($ch);
        curl_close($ch);
        return $result;
    }

python初始化索引数据

#!/usr/bin/python
#coding:utf-8
import urllib2
import MySQLdb
import threading
import sys
import re
import os
import json
from elasticsearch import Elasticsearch
reload(sys) 
sys.setdefaultencoding('utf-8')

def db_obj():
    conf = ['localhost','root','pwd','chinacms4']
    return MySQLdb.connect(conf[0],conf[1],conf[2],conf[3],charset="utf8")

es = Elasticsearch( "127.0.0.1:9200",http_auth=('elastic', '3Y~KyvQ1nkt~I7%PZU^3'))

def es_put(index_name,_id,type_name,data):
    global es
    try:
        es.index( index=index_name,doc_type=type_name,id=_id,body=data )
        print str(_id)+"add success!"
    except Exception,e:
        print repr(e)
def http_put(url,data):
    request = urllib2.Request(url,data)
    request.add_header('User-Agent', "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36")
    try:
        request = urllib2.urlopen(request)
        rv = request.read()
        request.close()
        return rv
    except Exception,e:
        print "............................................................................................"
        print url
        print data
        print e
        print "............................................................................................"
def strip_tags(html):
    #过滤
    dr = re.compile(r'<([^>]+)>',re.S)
    html = dr.sub('',html)
    html = re.sub(r'[\n]+',r'', html, flags=re.S)
    html = html.replace('\t','').replace('\n','').replace(' ','')
    html = html.replace(unichr(8206).encode('utf-8'), '') \
        .replace(unichr(8207).encode('utf-8'), '') \
        .replace(unichr(8205).encode('utf-8'), '') \
        .replace(unichr(8204).encode('utf-8'), '') \
        .replace(unichr(8234).encode('utf-8'), '') \
        .replace(unichr(8237).encode('utf-8'), '') \
        .replace(unichr(8238).encode('utf-8'), '') \
        .replace(unichr(8236).encode('utf-8'), '') \
        .replace(unichr(8302).encode('utf-8'), '') \
        .replace(unichr(8303).encode('utf-8'), '') \
        .replace(unichr(8299).encode('utf-8'), '') \
        .replace(unichr(8298).encode('utf-8'), '') \
        .replace(unichr(8301).encode('utf-8'), '') \
        .replace(unichr(8300).encode('utf-8'), '') \
        .replace(unichr(30).encode('utf-8'), '') \
    .replace(unichr(13).encode('utf-8'), '') \
    .replace(unichr(299).encode('utf-8'), '') \
    .replace(unichr(230).encode('utf-8'), '') \
    .replace(unichr(228).encode('utf-8'), '') \
    .replace(unichr(102).encode('utf-8'), '') \
    .replace(unichr(232).encode('utf-8'), '') \
    .replace(unichr(233).encode('utf-8'), '') \
    .replace(unichr(110).encode('utf-8'), '') \
    .replace(unichr(229).encode('utf-8'), '') \
        .replace(unichr(31).encode('utf-8'), '') 
    html = s = re.compile('[\\x00-\\x08\\x0b-\\x0c\\x0e-\\x1f]').sub('',html) 
    return html.strip()

def create_index(start,end):
        f = open('/scripts/news_title.log.txt', 'w')
        for i in range(start,end):
                db = db_obj()
                cur = db.cursor()
                sql = "select itemid,hangye_id,status,userid,title,sources,class from cms_news_main where itemid = %s"%i;
                cur.execute(sql)
                result = cur.fetchone()
                if result != None:
                        data = '{"itemid":"'+str(result[0])+'","hangye_id":"'+str(result[1])+'","status":"'+str(result[2])+'","userid":"'+str(result[3])+'","title":"'+str(result[4])+'","sources":"'+str(result[5])+'","class":"'+str(result[6])+'"}'
                        try:
                                es.index(index='caijiqi_news_title', doc_type='title', refresh=True, body=data,id=result[0])
                                print str(i)+">>>>>>>>>>>>>>>>> 添加成功"
                        except Exception,e:
                                print e
                                print >> f,str(i)+"\r\n"
                else:
                        print "itemid"+str(i)+"无数据"
                db.close()
        f.close()
#data = '{"content":"五千年的风和雨啊"}'
#es_put("chinacms4",1,"content",data)           
#es.index(index='blog', doc_type='blog_content', refresh=True, body=data,id=1)
#print data
#es.index(index="chinacms4",doc_type="content",data,id=result[0])

#创建300个线程

for i in range(0,100):
   threading.Thread(target=create_index,args=(i*10000,(i+1)*10000)).start()

#for i in range(0,30):
#        threading.Thread(target=create_index,args=(i*100000,(i+1)*100000)).start()