comparison src/luan/modules/lucene/LuceneIndex.java @ 775:1a68fc55a80c

simplify dir structure
author Franklin Schmidt <fschmidt@gmail.com>
date Fri, 26 Aug 2016 14:36:40 -0600
parents lucene/src/luan/modules/lucene/LuceneIndex.java@9092e52f94eb
children 6b8ea0a9b7c9
comparison
equal deleted inserted replaced
774:3e30cf310e56 775:1a68fc55a80c
1 package luan.modules.lucene;
2
3 import java.io.Closeable;
4 import java.io.File;
5 import java.io.FileOutputStream;
6 import java.io.FileInputStream;
7 import java.io.IOException;
8 import java.util.Iterator;
9 import java.util.Map;
10 import java.util.List;
11 import java.util.ArrayList;
12 import java.util.Set;
13 import java.util.HashSet;
14 import java.util.Collections;
15 import java.util.concurrent.ConcurrentMap;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.atomic.AtomicInteger;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.zip.ZipOutputStream;
21 import java.util.zip.ZipEntry;
22 import org.apache.lucene.analysis.Analyzer;
23 import org.apache.lucene.analysis.core.KeywordAnalyzer;
24 import org.apache.lucene.document.Document;
25 import org.apache.lucene.document.Field;
26 import org.apache.lucene.document.StoredField;
27 import org.apache.lucene.document.StringField;
28 import org.apache.lucene.document.TextField;
29 import org.apache.lucene.document.IntField;
30 import org.apache.lucene.document.LongField;
31 import org.apache.lucene.document.DoubleField;
32 import org.apache.lucene.index.IndexableField;
33 import org.apache.lucene.index.IndexWriter;
34 import org.apache.lucene.index.IndexWriterConfig;
35 import org.apache.lucene.index.DirectoryReader;
36 import org.apache.lucene.index.Term;
37 import org.apache.lucene.index.SnapshotDeletionPolicy;
38 import org.apache.lucene.index.IndexCommit;
39 import org.apache.lucene.index.AtomicReaderContext;
40 import org.apache.lucene.store.Directory;
41 import org.apache.lucene.store.FSDirectory;
42 import org.apache.lucene.util.Version;
43 import org.apache.lucene.util.BytesRef;
44 import org.apache.lucene.util.NumericUtils;
45 import org.apache.lucene.search.Query;
46 import org.apache.lucene.search.TermQuery;
47 import org.apache.lucene.search.TopDocs;
48 import org.apache.lucene.search.Sort;
49 import org.apache.lucene.search.SortField;
50 import org.apache.lucene.search.IndexSearcher;
51 import org.apache.lucene.search.TotalHitCountCollector;
52 import org.apache.lucene.search.ScoreDoc;
53 import org.apache.lucene.search.Collector;
54 import org.apache.lucene.search.Scorer;
55 import org.apache.lucene.search.BooleanClause;
56 import org.apache.lucene.search.highlight.Formatter;
57 import org.apache.lucene.search.highlight.Highlighter;
58 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
59 import org.apache.lucene.search.highlight.NullFragmenter;
60 import org.apache.lucene.search.highlight.QueryScorer;
61 import org.apache.lucene.search.highlight.TokenGroup;
62 import luan.modules.lucene.queryparser.SaneQueryParser;
63 import luan.modules.lucene.queryparser.FieldParser;
64 import luan.modules.lucene.queryparser.MultiFieldParser;
65 import luan.modules.lucene.queryparser.StringFieldParser;
66 import luan.modules.lucene.queryparser.NumberFieldParser;
67 import luan.modules.lucene.queryparser.ParseException;
68 import luan.modules.Utils;
69 import luan.Luan;
70 import luan.LuanState;
71 import luan.LuanTable;
72 import luan.LuanFunction;
73 import luan.LuanException;
74 import luan.LuanMeta;
75 import luan.LuanRuntimeException;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78
79
80 public final class LuceneIndex implements Closeable {
81 private static final Logger logger = LoggerFactory.getLogger(LuceneIndex.class);
82
83 private static final String FLD_NEXT_ID = "nextId";
84 public static final StringFieldParser STRING_FIELD_PARSER = new StringFieldParser(new KeywordAnalyzer());
85
86 private static final Version version = Version.LUCENE_4_9;
87 private final ReentrantLock writeLock = new ReentrantLock();
88 private final File indexDir;
89 private SnapshotDeletionPolicy snapshotDeletionPolicy;
90 private IndexWriter writer;
91 private DirectoryReader reader;
92 private IndexSearcher searcher;
93 private final ThreadLocal<IndexSearcher> threadLocalSearcher = new ThreadLocal<IndexSearcher>();
94 private boolean isClosed = true;
95 private final MultiFieldParser mfp;
96 public final LuanTable indexed_only_fields = new LuanTable();
97 private final Analyzer analyzer;
98
99 private static ConcurrentMap<File,AtomicInteger> globalWriteCounters = new ConcurrentHashMap<File,AtomicInteger>();
100 private File fileDir;
101 private int writeCount;
102
103 public LuceneIndex(LuanState luan,String indexDirStr,FieldParser defaultFieldParser,String[] defaultFields) throws LuanException, IOException {
104 mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields);
105 mfp.fields.put( "type", STRING_FIELD_PARSER );
106 mfp.fields.put( "id", NumberFieldParser.LONG );
107 File indexDir = new File(indexDirStr);
108 this.indexDir = indexDir;
109 Analyzer analyzer = STRING_FIELD_PARSER.analyzer;
110 if( defaultFieldParser instanceof StringFieldParser ) {
111 StringFieldParser sfp = (StringFieldParser)defaultFieldParser;
112 analyzer = sfp.analyzer;
113 }
114 this.analyzer = analyzer;
115 luan.onClose(this);
116 reopen();
117 }
118
119 public void reopen() throws LuanException, IOException {
120 if( !isClosed ) throw new RuntimeException();
121 isClosed = false;
122 IndexWriterConfig conf = new IndexWriterConfig(version,analyzer);
123 snapshotDeletionPolicy = new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy());
124 conf.setIndexDeletionPolicy(snapshotDeletionPolicy);
125 FSDirectory dir = FSDirectory.open(indexDir);
126 fileDir = dir.getDirectory();
127 globalWriteCounters.putIfAbsent(fileDir,new AtomicInteger());
128 writer = new IndexWriter(dir,conf);
129 writer.commit(); // commit index creation
130 reader = DirectoryReader.open(dir);
131 searcher = new IndexSearcher(reader);
132 initId();
133 }
134
135 private int globalWriteCount() {
136 return globalWriteCounters.get(fileDir).get();
137 }
138
139 private void wrote() {
140 globalWriteCounters.get(fileDir).incrementAndGet();
141 }
142
143 public void delete_all() throws IOException {
144 boolean commit = !writeLock.isHeldByCurrentThread();
145 writeLock.lock();
146 try {
147 writer.deleteAll();
148 id = idLim = 0;
149 if(commit) writer.commit();
150 } finally {
151 wrote();
152 writeLock.unlock();
153 }
154 }
155
156 private static Term term(String key,long value) {
157 BytesRef br = new BytesRef();
158 NumericUtils.longToPrefixCoded(value,0,br);
159 return new Term(key,br);
160 }
161
162 public void delete(LuanState luan,String queryStr) throws LuanException, IOException, ParseException {
163 Query query = SaneQueryParser.parseQuery(mfp,queryStr);
164
165 boolean commit = !writeLock.isHeldByCurrentThread();
166 writeLock.lock();
167 try {
168 writer.deleteDocuments(query);
169 if(commit) writer.commit();
170 } finally {
171 wrote();
172 writeLock.unlock();
173 }
174 }
175
176 public void save(LuanState luan,LuanTable doc) throws LuanException, IOException {
177 Set indexedOnlySet = new HashSet();
178 Object typeObj = doc.get(luan,"type");
179 if( typeObj==null )
180 throw new LuanException("missing 'type' field");
181 if( !(typeObj instanceof String) )
182 throw new LuanException("type must be string");
183 String type = (String)typeObj;
184 Object indexedOnlyObj = indexed_only_fields.get(luan,type);
185 if( indexedOnlyObj != null ) {
186 if( !(indexedOnlyObj instanceof LuanTable) )
187 throw new LuanException("indexed_only_fields elements must be tables");
188 LuanTable indexedOnly = (LuanTable)indexedOnlyObj;
189 for( Map.Entry<Object,Object> entry : indexedOnly.iterable(luan) ) {
190 Object key = entry.getKey();
191 if( !(key instanceof String) )
192 throw new LuanException("indexed_only_fields."+type+" entries must be strings");
193 String name = (String)key;
194 Object value = entry.getValue();
195 if( !(value instanceof LuanFunction) )
196 throw new LuanException("indexed_only_fields."+type+" values must be functions");
197 LuanFunction fn = (LuanFunction)value;
198 value = Luan.first(fn.call(luan,new Object[]{doc}));
199 doc.put(luan, name, value );
200 indexedOnlySet.add(name);
201 }
202 }
203 Object obj = doc.get(luan,"id");
204 Long id;
205 try {
206 id = (Long)obj;
207 } catch(ClassCastException e) {
208 throw new LuanException("id should be Long but is "+obj.getClass().getSimpleName());
209 }
210
211 boolean commit = !writeLock.isHeldByCurrentThread();
212 writeLock.lock();
213 try {
214 if( id == null ) {
215 id = nextId(luan);
216 doc.put(luan,"id",id);
217 writer.addDocument(toLucene(luan,doc,indexedOnlySet));
218 } else {
219 writer.updateDocument( term("id",id), toLucene(luan,doc,indexedOnlySet) );
220 }
221 if(commit) writer.commit();
222 } finally {
223 wrote();
224 writeLock.unlock();
225 }
226 }
227
228 public void update_in_transaction(LuanState luan,LuanFunction fn) throws IOException, LuanException {
229 boolean commit = !writeLock.isHeldByCurrentThread();
230 writeLock.lock();
231 try {
232 fn.call(luan);
233 if(commit) writer.commit();
234 } finally {
235 wrote();
236 writeLock.unlock();
237 }
238 }
239
240 public void run_in_lock(LuanState luan,LuanFunction fn) throws IOException, LuanException {
241 if( writeLock.isHeldByCurrentThread() )
242 throw new RuntimeException();
243 writeLock.lock();
244 try {
245 synchronized(this) {
246 fn.call(luan);
247 }
248 } finally {
249 wrote();
250 writeLock.unlock();
251 }
252 }
253
254
255 private long id;
256 private long idLim;
257 private final int idBatch = 10;
258
259 private void initId() throws LuanException, IOException {
260 TopDocs td = searcher.search(new TermQuery(new Term("type","next_id")),1);
261 switch(td.totalHits) {
262 case 0:
263 id = 0;
264 idLim = 0;
265 break;
266 case 1:
267 idLim = (Long)searcher.doc(td.scoreDocs[0].doc).getField(FLD_NEXT_ID).numericValue();
268 id = idLim;
269 break;
270 default:
271 throw new RuntimeException();
272 }
273 }
274
275 public synchronized long nextId(LuanState luan) throws LuanException, IOException {
276 if( ++id > idLim ) {
277 idLim += idBatch;
278 LuanTable doc = new LuanTable();
279 doc.rawPut( "type", "next_id" );
280 doc.rawPut( FLD_NEXT_ID, idLim );
281 writer.updateDocument(new Term("type","next_id"),toLucene(luan,doc,Collections.EMPTY_SET));
282 wrote();
283 }
284 return id;
285 }
286
287 /*
288 public void backup(String zipFile) throws LuanException, IOException {
289 if( !zipFile.endsWith(".zip") )
290 throw new LuanException("file "+zipFile+" doesn't end with '.zip'");
291 IndexCommit ic = snapshotDeletionPolicy.snapshot();
292 try {
293 ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile));
294 for( String fileName : ic.getFileNames() ) {
295 out.putNextEntry(new ZipEntry(fileName));
296 FileInputStream in = new FileInputStream(new File(indexDir,fileName));
297 Utils.copyAll(in,out);
298 in.close();
299 out.closeEntry();
300 }
301 out.close();
302 } finally {
303 snapshotDeletionPolicy.release(ic);
304 }
305 }
306 */
307 public Object snapshot(LuanState luan,LuanFunction fn) throws LuanException, IOException {
308 IndexCommit ic = snapshotDeletionPolicy.snapshot();
309 try {
310 String dir = fileDir.toString();
311 LuanTable fileNames = new LuanTable(new ArrayList(ic.getFileNames()));
312 return fn.call(luan,new Object[]{dir,fileNames});
313 } finally {
314 snapshotDeletionPolicy.release(ic);
315 }
316 }
317
318
319
320 public String to_string() {
321 return writer.getDirectory().toString();
322 }
323
324 public void close() throws IOException {
325 if( !isClosed ) {
326 writer.close();
327 reader.close();
328 isClosed = true;
329 }
330 }
331
332 protected void finalize() throws Throwable {
333 if( !isClosed ) {
334 logger.error("not closed");
335 close();
336 }
337 super.finalize();
338 }
339
340
341
342 private static class DocFn extends LuanFunction {
343 final IndexSearcher searcher;
344 int docID;
345
346 DocFn(IndexSearcher searcher) {
347 this.searcher = searcher;
348 }
349
350 @Override public Object call(LuanState luan,Object[] args) throws LuanException {
351 try {
352 return toTable(searcher.doc(docID));
353 } catch(IOException e) {
354 throw new LuanException(e);
355 }
356 }
357 }
358
359 private static abstract class MyCollector extends Collector {
360 int docBase;
361 int i = 0;
362
363 @Override public void setScorer(Scorer scorer) {}
364 @Override public void setNextReader(AtomicReaderContext context) {
365 this.docBase = context.docBase;
366 }
367 @Override public boolean acceptsDocsOutOfOrder() {
368 return true;
369 }
370 }
371
372 private synchronized IndexSearcher openSearcher() throws IOException {
373 int gwc = globalWriteCount();
374 if( writeCount != gwc ) {
375 writeCount = gwc;
376 DirectoryReader newReader = DirectoryReader.openIfChanged(reader);
377 if( newReader != null ) {
378 reader.decRef();
379 reader = newReader;
380 searcher = new IndexSearcher(reader);
381 }
382 }
383 reader.incRef();
384 return searcher;
385 }
386
387 // call in finally block
388 private static void close(IndexSearcher searcher) throws IOException {
389 searcher.getIndexReader().decRef();
390 }
391
392 public void ensure_open() throws IOException {
393 close(openSearcher());
394 }
395
396 public int advanced_search( final LuanState luan, String queryStr, LuanFunction fn, Integer n, String sortStr ) throws LuanException, IOException, ParseException {
397 Utils.checkNotNull(queryStr);
398 Query query = SaneQueryParser.parseQuery(mfp,queryStr);
399 IndexSearcher searcher = threadLocalSearcher.get();
400 boolean inTransaction = searcher != null;
401 if( !inTransaction )
402 searcher = openSearcher();
403 try {
404 if( fn!=null && n==null ) {
405 if( sortStr != null )
406 throw new LuanException("sort must be nil when n is nil");
407 final DocFn docFn = new DocFn(searcher);
408 MyCollector col = new MyCollector() {
409 @Override public void collect(int doc) {
410 try {
411 docFn.docID = docBase + doc;
412 fn.call(luan,new Object[]{++i,docFn});
413 } catch(LuanException e) {
414 throw new LuanRuntimeException(e);
415 }
416 }
417 };
418 try {
419 searcher.search(query,col);
420 } catch(LuanRuntimeException e) {
421 throw (LuanException)e.getCause();
422 }
423 return col.i;
424 }
425 if( fn==null || n==0 ) {
426 TotalHitCountCollector thcc = new TotalHitCountCollector();
427 searcher.search(query,thcc);
428 return thcc.getTotalHits();
429 }
430 Sort sort = sortStr==null ? null : SaneQueryParser.parseSort(mfp,sortStr);
431 TopDocs td = sort==null ? searcher.search(query,n) : searcher.search(query,n,sort);
432 final ScoreDoc[] scoreDocs = td.scoreDocs;
433 DocFn docFn = new DocFn(searcher);
434 for( int i=0; i<scoreDocs.length; i++ ) {
435 docFn.docID = scoreDocs[i].doc;
436 fn.call(luan,new Object[]{i+1,docFn});
437 }
438 return td.totalHits;
439 } finally {
440 if( !inTransaction )
441 close(searcher);
442 }
443 }
444
445 public Object search_in_transaction(LuanState luan,LuanFunction fn) throws LuanException, IOException {
446 if( threadLocalSearcher.get() != null )
447 throw new LuanException("can't nest search_in_transaction calls");
448 IndexSearcher searcher = openSearcher();
449 threadLocalSearcher.set(searcher);
450 try {
451 return fn.call(luan);
452 } finally {
453 threadLocalSearcher.set(null);
454 close(searcher);
455 }
456 }
457
458
459
460 public final LuanMeta indexedFieldsMeta = new LuanMeta() {
461
462 @Override public boolean canNewindex() {
463 return true;
464 }
465
466 @Override public Object __index(LuanState luan,LuanTable tbl,Object key) {
467 return mfp.fields.get(key);
468 }
469
470 @Override public void __new_index(LuanState luan,LuanTable tbl,Object key,Object value) throws LuanException {
471 if( !(key instanceof String) )
472 throw new LuanException("key must be string");
473 String field = (String)key;
474 if( value==null ) { // delete
475 mfp.fields.remove(field);
476 return;
477 }
478 if( !(value instanceof FieldParser) )
479 throw new LuanException("value must be FieldParser like the values of Lucene.type");
480 FieldParser parser = (FieldParser)value;
481 mfp.fields.put( field, parser );
482 }
483
484 @Override public final Iterator keys(LuanTable tbl) {
485 return mfp.fields.keySet().iterator();
486 }
487
488 @Override protected String type(LuanTable tbl) {
489 return "lucene-indexed-fields";
490 }
491
492 };
493
494
495
496 private IndexableField newField(String name,Object value,Field.Store store,Set<String> indexed)
497 throws LuanException
498 {
499 if( value instanceof String ) {
500 String s = (String)value;
501 FieldParser fp = mfp.fields.get(name);
502 if( fp != null ) {
503 if( fp instanceof StringFieldParser && fp != STRING_FIELD_PARSER ) {
504 return new TextField(name, s, store);
505 } else {
506 return new StringField(name, s, store);
507 }
508 } else {
509 return new StoredField(name, s);
510 }
511 } else if( value instanceof Integer ) {
512 int i = (Integer)value;
513 if( indexed.contains(name) ) {
514 return new IntField(name, i, store);
515 } else {
516 return new StoredField(name, i);
517 }
518 } else if( value instanceof Long ) {
519 long i = (Long)value;
520 if( indexed.contains(name) ) {
521 return new LongField(name, i, store);
522 } else {
523 return new StoredField(name, i);
524 }
525 } else if( value instanceof Double ) {
526 double i = (Double)value;
527 if( indexed.contains(name) ) {
528 return new DoubleField(name, i, store);
529 } else {
530 return new StoredField(name, i);
531 }
532 } else if( value instanceof byte[] ) {
533 byte[] b = (byte[])value;
534 return new StoredField(name, b);
535 } else
536 throw new LuanException("invalid value type "+value.getClass()+"' for '"+name+"'");
537 }
538
539 private Document toLucene(LuanState luan,LuanTable table,Set indexOnly) throws LuanException {
540 Set<String> indexed = mfp.fields.keySet();
541 Document doc = new Document();
542 for( Map.Entry<Object,Object> entry : table.iterable(luan) ) {
543 Object key = entry.getKey();
544 if( !(key instanceof String) )
545 throw new LuanException("key must be string");
546 String name = (String)key;
547 Object value = entry.getValue();
548 Field.Store store = indexOnly.contains(name) ? Field.Store.NO : Field.Store.YES;
549 if( !(value instanceof LuanTable) ) {
550 doc.add(newField(name, value, store, indexed));
551 } else { // list
552 LuanTable list = (LuanTable)value;
553 for( Object el : list.asList() ) {
554 doc.add(newField(name, el, store, indexed));
555 }
556 }
557 }
558 return doc;
559 }
560
561 private static Object getValue(IndexableField ifld) throws LuanException {
562 BytesRef br = ifld.binaryValue();
563 if( br != null )
564 return br.bytes;
565 Number n = ifld.numericValue();
566 if( n != null )
567 return n;
568 String s = ifld.stringValue();
569 if( s != null )
570 return s;
571 throw new LuanException("invalid field type for "+ifld);
572 }
573
574 private static LuanTable toTable(Document doc) throws LuanException {
575 if( doc==null )
576 return null;
577 LuanTable table = new LuanTable();
578 for( IndexableField ifld : doc ) {
579 String name = ifld.name();
580 Object value = getValue(ifld);
581 Object old = table.rawGet(name);
582 if( old == null ) {
583 table.rawPut(name,value);
584 } else {
585 LuanTable list;
586 if( old instanceof LuanTable ) {
587 list = (LuanTable)old;
588 } else {
589 list = new LuanTable();
590 list.rawPut(1,old);
591 table.rawPut(name,list);
592 }
593 list.rawPut(list.rawLength()+1,value);
594 }
595 }
596 return table;
597 }
598
599
600 public LuanFunction highlighter(LuanState luan,String queryStr,LuanFunction formatter) throws ParseException {
601 Query query = SaneQueryParser.parseQuery(mfp,queryStr);
602 Formatter fmt = new Formatter() {
603 public String highlightTerm(String originalText,TokenGroup tokenGroup) {
604 if( tokenGroup.getTotalScore() <= 0 )
605 return originalText;
606 try {
607 return (String)Luan.first(formatter.call(luan,new Object[]{originalText}));
608 } catch(LuanException e) {
609 throw new LuanRuntimeException(e);
610 }
611 }
612 };
613 Highlighter hl = new Highlighter( fmt, new QueryScorer(query) );
614 hl.setTextFragmenter( new NullFragmenter() );
615 return new LuanFunction() {
616 @Override public String call(LuanState luan,Object[] args) throws LuanException {
617 String text = (String)args[0];
618 try {
619 String s = hl.getBestFragment(analyzer,null,text);
620 return s!=null ? s : text;
621 } catch(LuanRuntimeException e) {
622 throw (LuanException)e.getCause();
623 } catch(IOException e) {
624 throw new RuntimeException(e);
625 } catch(InvalidTokenOffsetsException e) {
626 throw new RuntimeException(e);
627 }
628 }
629 };
630 }
631 }