Es索引的我们可以理解为数据入库的一个过程。我们知道Es是基于Lucene框架的一个分布式检索平台。索引的同样也是基于Lucene创建的,只不过在其上层做了一些封闭。

         Es的索引过程比较通用的大体上有两种方式,其一是得用自身Rvier从数据库中拉数据,当然现在已经有了很多相关插件,Mysql、MDB等数据库。这种方式可以做到近时实索引,因为River是定时从数据库拉数据与索引数据进行比对。这种方式经较适合数据有周期的更新。

         下面以Mysql-River  plugins为例:

1、    安装Mysql-River 插件

bin/plugin -install /path/to/plugin/river-mysql.zip

2、    当安装好Mysql-River plugin 后,一般可以马上使用,但建立重新加载Es集群。查看log中是否正确的加载了Mysql-River Plugin(在后面我们讲到如何开发相关Plugin)。

3、    配置Es索引与Mysql 数据之间的对应关系。

建立索引(相关Mapping 信息如下:)

curl -XPUT 127.0.0.1:9200/elasticsearchindexname/elasticsearchtypename/_mapping -d

"elasticsearchtypename" : {

                   "_timestamp":{

                            "enabled":true

                   }

}

                   将River索引的配置也提交到Es集群中:

                   curl -XPUT 127.0.0.1:9200/_river/river-mysql/_meta –d

                   {

             "type":"mysql",

                "mysql":{

        "index":"elasticsearchindexname",(索引名称)

        "type":"elasticsearchtypename",(类型)

        "hostname":"127.0.0.1:3306",(服务器)

        "database":"ESDATA",(数据库名称)

        "username":"root",(用户名)

        "password":"",(密码)

        "uniqueIdField":"_ID",(标识)

        "query":"select RID,PNAME FROM wf_mds_chn_biaozhun",(SQL语句)

        "deleteOldEntries":"false",

        "interval":"60000"(更新周期)

    }

}

同时你会在Es看到你的索引中开始导数据了,当然些时也会出现一个对应的保存配置的索引,现在很多River都只能索引字段与数据库的字段一一对应。如果需要个性化定制,可以到Github上下载相关代码进行修改。我们可以看到只要继续River(接口)和AbstractRiverComponent(类)便可以进行相关开发了。

public class MysqlRiver extends AbstractRiverComponent implements River

 

         另外一种索引方式当然就是我们把数据Put到Es中去了,最简单的我们可以用下面命令就完成:

$ curl -XPUT 'http://localhost:9200/twitter/tweet/1' -d '{

    "user" : "kimchy",

    "post_date" : "2009-11-15T14:12:12",

    "message" : "trying out Elastic Search"

}'

对上面的命令解释一下:

Twitter:索引名称

Tweet:类型名称

1:ID值

具体我会在下篇中讲解索引名称和类型的关系,当然-d 后面的就是值了。这是单条Put数据的方式虽然简单,但是如果说数据量很大的情况下还是不建议用这种方式,可以改用批量导入的方式也就是传说中的Bluk了,Bluk原量很简单,我们把数据放到缓存中,一次Put多条数据到Es集群中去,Bluk当然要用代码实现了,给出一个例子如下:

public static void Index() throws ElasticSearchException, IOException, NumberFormatException, SQLException {

                   // TODO Auto-generated method stub

                   // Node node = nodeBuilder().client(true).node();

                   Settings settings = ImmutableSettings.settingsBuilder()

                                     .put("cluster.name", "elasticsearch_wf").build();

                   Client client = new TransportClient(settings)

                                     .addTransportAddress(new InetSocketTransportAddress(

                                                        "168.160.200.250", 9300));

                  

                   ////������е��ܼ�¼��ֳ�5000��һ�����ѯ

                   int countRe=100000000; //MySqlClass.getCount("select count(*) from test");

                   if(countRe>0)

                   {

                            int readercount=1;

                            if(countRe>5000)

                            {

                                     readercount=countRe%5000==0?countRe/5000:countRe/5000+1;

                            }

                           

                            ////ÿ�ζ�ȡ5000���¼

                            for(int j=0;j<readercount;j++)

                            {

                                     ResultSet rs = MySqlClass.executeQuery("select * from test");

                                     BulkRequestBuilder bulkRequest = client.prepareBulk();

                                     try {

 

                                               if (rs != null) {

                                                        int i = 1;

                                                        while (rs.next()) {

                                                                 bulkRequest.add(client.prepareIndex("qtest", String.valueOf(i++)).setSource(

                                                                                    jsonBuilder().startObject()

                                                                                                       .field("id", rs.getInt("id"))

                                                                                                       .field("�й�", rs.getString("title"))

                                                                                                       .field("AB_EN", rs.getString("descript"))

                                                                                                       .field("AF_CN",rs.getString("text"))

                                                                                                       .endObject()));

                                                        }

                                                        BulkResponse bulkResponse = bulkRequest.execute().actionGet();

                                                        if (bulkResponse.hasFailures()) {

                                                                 /* has Failures handler Error */

                                                        }

                                               }

                                     } catch (Exception e) {

                                               e.printStackTrace();

                                     }

                            }

                   }

                   client.close();

         }

上面只是一个简单的例子,大量可以考虑用从线程方式,另外Client链接数其实还是比较占资源的,大家可以考虑将出封闭到一个链接池中,提供效率。

         整个建索引的过程Es在Lucene的基础上还是做了很多的优化,但主体上我们对应到Lucene里面基实就是如下代码:

         public class Index {

         private IndexWriter writer = null;

         private static Analyzer ANALYZER = new IKAnalyzer();

         private String FilePath = null;

 

         public Index(String FilePath, String IndexPath) {

                   try {

                            IndexWriterConfig writerConfig = new IndexWriterConfig(

                                               Version.LUCENE_36, ANALYZER);

                            this.writer = new IndexWriter(

                                               FSDirectory.open(new File(IndexPath)), writerConfig);

                            this.FilePath = FilePath;

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

         /*

          * Init Create Index

          */

         public void Init() {

                   try {

                            if (FilePath.length() > 0) {

                                     // 读目录中txt文件

                                     File file = new File(FilePath);

                                     List<File> files = new ArrayList<File>();

                                     this.ListAllFile(file, files);

 

                                     // //将File转换为 Document对象

                                     for (File sfs : files) {

                                               this.writer.addDocument(this.getDocument(sfs));

                                     }

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

         /*

          * Close Index

          */

         public void Close() {

                   try {

                            this.writer.commit();

                            this.writer.close();

                   } catch (Exception e) {

                            e.printStackTrace();

                   }

         }

 

         /*

          * 获取所有txt文件

          */

         private List<File> ListAllFile(File fileOrDir, List<File> files)

                            throws Exception {

                   if (fileOrDir != null && files != null) {

                            if (fileOrDir.isDirectory()) {

                                     File[] fs = fileOrDir.listFiles();

                                     for (File sfs : fs) {

                                               if (sfs.isDirectory())

                                                        this.ListAllFile(sfs, files);

                                               else files.add(sfs);

                                     }

                            } else {

                                     files.add(fileOrDir);

                            }

                   }

                   return null;

         }

 

         /*

          * Get Document

          */

         private Document getDocument(File f) throws Exception {

                   Document doc = new Document();

                   FileInputStream  is = new FileInputStream(f);

                   byte[] buf = new byte[is.available()];

                   is.read(buf);

                   String contentStr = new String(buf,"GBK");

                   Field content = new Field("content", contentStr, Field.Store.YES,

                                     Field.Index.ANALYZED);

                   doc.add(content);

                   Field path = new Field("path", f.getAbsolutePath(), Field.Store.YES,

                                     Field.Index.ANALYZED);

                   Field size=new Field("size",String.valueOf(f.getTotalSpace()),Field.Store.YES,Field.Index.NOT_ANALYZED);

                   doc.add(size);

                   Random rm=new Random();

                   int year=rm.nextInt(20);

                   Field time=new Field("time",String.valueOf(1990+year),Field.Store.YES,Field.Index.NOT_ANALYZED);

                  doc.add(time);

                   doc.add(path);

                   is.close();

                   return doc;

         }

}