Mercurial Hosting > luan
diff src/luan/modules/lucene/LuceneIndex.java @ 775:1a68fc55a80c
simplify dir structure
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Fri, 26 Aug 2016 14:36:40 -0600 |
parents | lucene/src/luan/modules/lucene/LuceneIndex.java@9092e52f94eb |
children | 6b8ea0a9b7c9 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/luan/modules/lucene/LuceneIndex.java Fri Aug 26 14:36:40 2016 -0600 @@ -0,0 +1,631 @@ +package luan.modules.lucene; + +import java.io.Closeable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Collections; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.zip.ZipOutputStream; +import java.util.zip.ZipEntry; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.core.KeywordAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.document.IntField; +import org.apache.lucene.document.LongField; +import org.apache.lucene.document.DoubleField; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.AtomicReaderContext; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.util.Version; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.NumericUtils; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TotalHitCountCollector; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.highlight.Formatter; +import org.apache.lucene.search.highlight.Highlighter; +import org.apache.lucene.search.highlight.InvalidTokenOffsetsException; +import org.apache.lucene.search.highlight.NullFragmenter; +import org.apache.lucene.search.highlight.QueryScorer; +import org.apache.lucene.search.highlight.TokenGroup; +import luan.modules.lucene.queryparser.SaneQueryParser; +import luan.modules.lucene.queryparser.FieldParser; +import luan.modules.lucene.queryparser.MultiFieldParser; +import luan.modules.lucene.queryparser.StringFieldParser; +import luan.modules.lucene.queryparser.NumberFieldParser; +import luan.modules.lucene.queryparser.ParseException; +import luan.modules.Utils; +import luan.Luan; +import luan.LuanState; +import luan.LuanTable; +import luan.LuanFunction; +import luan.LuanException; +import luan.LuanMeta; +import luan.LuanRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public final class LuceneIndex implements Closeable { + private static final Logger logger = LoggerFactory.getLogger(LuceneIndex.class); + + private static final String FLD_NEXT_ID = "nextId"; + public static final StringFieldParser STRING_FIELD_PARSER = new StringFieldParser(new KeywordAnalyzer()); + + private static final Version version = Version.LUCENE_4_9; + private final ReentrantLock writeLock = new ReentrantLock(); + private final File indexDir; + private SnapshotDeletionPolicy snapshotDeletionPolicy; + private IndexWriter writer; + private DirectoryReader reader; + private IndexSearcher searcher; + private final ThreadLocal<IndexSearcher> threadLocalSearcher = new ThreadLocal<IndexSearcher>(); + private boolean isClosed = true; + private final MultiFieldParser mfp; + public final LuanTable indexed_only_fields = new LuanTable(); + private final Analyzer analyzer; + + private static ConcurrentMap<File,AtomicInteger> globalWriteCounters = new ConcurrentHashMap<File,AtomicInteger>(); + private File fileDir; + private int writeCount; + + public LuceneIndex(LuanState luan,String indexDirStr,FieldParser defaultFieldParser,String[] defaultFields) throws LuanException, IOException { + mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields); + mfp.fields.put( "type", STRING_FIELD_PARSER ); + mfp.fields.put( "id", NumberFieldParser.LONG ); + File indexDir = new File(indexDirStr); + this.indexDir = indexDir; + Analyzer analyzer = STRING_FIELD_PARSER.analyzer; + if( defaultFieldParser instanceof StringFieldParser ) { + StringFieldParser sfp = (StringFieldParser)defaultFieldParser; + analyzer = sfp.analyzer; + } + this.analyzer = analyzer; + luan.onClose(this); + reopen(); + } + + public void reopen() throws LuanException, IOException { + if( !isClosed ) throw new RuntimeException(); + isClosed = false; + IndexWriterConfig conf = new IndexWriterConfig(version,analyzer); + snapshotDeletionPolicy = new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy()); + conf.setIndexDeletionPolicy(snapshotDeletionPolicy); + FSDirectory dir = FSDirectory.open(indexDir); + fileDir = dir.getDirectory(); + globalWriteCounters.putIfAbsent(fileDir,new AtomicInteger()); + writer = new IndexWriter(dir,conf); + writer.commit(); // commit index creation + reader = DirectoryReader.open(dir); + searcher = new IndexSearcher(reader); + initId(); + } + + private int globalWriteCount() { + return globalWriteCounters.get(fileDir).get(); + } + + private void wrote() { + globalWriteCounters.get(fileDir).incrementAndGet(); + } + + public void delete_all() throws IOException { + boolean commit = !writeLock.isHeldByCurrentThread(); + writeLock.lock(); + try { + writer.deleteAll(); + id = idLim = 0; + if(commit) writer.commit(); + } finally { + wrote(); + writeLock.unlock(); + } + } + + private static Term term(String key,long value) { + BytesRef br = new BytesRef(); + NumericUtils.longToPrefixCoded(value,0,br); + return new Term(key,br); + } + + public void delete(LuanState luan,String queryStr) throws LuanException, IOException, ParseException { + Query query = SaneQueryParser.parseQuery(mfp,queryStr); + + boolean commit = !writeLock.isHeldByCurrentThread(); + writeLock.lock(); + try { + writer.deleteDocuments(query); + if(commit) writer.commit(); + } finally { + wrote(); + writeLock.unlock(); + } + } + + public void save(LuanState luan,LuanTable doc) throws LuanException, IOException { + Set indexedOnlySet = new HashSet(); + Object typeObj = doc.get(luan,"type"); + if( typeObj==null ) + throw new LuanException("missing 'type' field"); + if( !(typeObj instanceof String) ) + throw new LuanException("type must be string"); + String type = (String)typeObj; + Object indexedOnlyObj = indexed_only_fields.get(luan,type); + if( indexedOnlyObj != null ) { + if( !(indexedOnlyObj instanceof LuanTable) ) + throw new LuanException("indexed_only_fields elements must be tables"); + LuanTable indexedOnly = (LuanTable)indexedOnlyObj; + for( Map.Entry<Object,Object> entry : indexedOnly.iterable(luan) ) { + Object key = entry.getKey(); + if( !(key instanceof String) ) + throw new LuanException("indexed_only_fields."+type+" entries must be strings"); + String name = (String)key; + Object value = entry.getValue(); + if( !(value instanceof LuanFunction) ) + throw new LuanException("indexed_only_fields."+type+" values must be functions"); + LuanFunction fn = (LuanFunction)value; + value = Luan.first(fn.call(luan,new Object[]{doc})); + doc.put(luan, name, value ); + indexedOnlySet.add(name); + } + } + Object obj = doc.get(luan,"id"); + Long id; + try { + id = (Long)obj; + } catch(ClassCastException e) { + throw new LuanException("id should be Long but is "+obj.getClass().getSimpleName()); + } + + boolean commit = !writeLock.isHeldByCurrentThread(); + writeLock.lock(); + try { + if( id == null ) { + id = nextId(luan); + doc.put(luan,"id",id); + writer.addDocument(toLucene(luan,doc,indexedOnlySet)); + } else { + writer.updateDocument( term("id",id), toLucene(luan,doc,indexedOnlySet) ); + } + if(commit) writer.commit(); + } finally { + wrote(); + writeLock.unlock(); + } + } + + public void update_in_transaction(LuanState luan,LuanFunction fn) throws IOException, LuanException { + boolean commit = !writeLock.isHeldByCurrentThread(); + writeLock.lock(); + try { + fn.call(luan); + if(commit) writer.commit(); + } finally { + wrote(); + writeLock.unlock(); + } + } + + public void run_in_lock(LuanState luan,LuanFunction fn) throws IOException, LuanException { + if( writeLock.isHeldByCurrentThread() ) + throw new RuntimeException(); + writeLock.lock(); + try { + synchronized(this) { + fn.call(luan); + } + } finally { + wrote(); + writeLock.unlock(); + } + } + + + private long id; + private long idLim; + private final int idBatch = 10; + + private void initId() throws LuanException, IOException { + TopDocs td = searcher.search(new TermQuery(new Term("type","next_id")),1); + switch(td.totalHits) { + case 0: + id = 0; + idLim = 0; + break; + case 1: + idLim = (Long)searcher.doc(td.scoreDocs[0].doc).getField(FLD_NEXT_ID).numericValue(); + id = idLim; + break; + default: + throw new RuntimeException(); + } + } + + public synchronized long nextId(LuanState luan) throws LuanException, IOException { + if( ++id > idLim ) { + idLim += idBatch; + LuanTable doc = new LuanTable(); + doc.rawPut( "type", "next_id" ); + doc.rawPut( FLD_NEXT_ID, idLim ); + writer.updateDocument(new Term("type","next_id"),toLucene(luan,doc,Collections.EMPTY_SET)); + wrote(); + } + return id; + } + +/* + public void backup(String zipFile) throws LuanException, IOException { + if( !zipFile.endsWith(".zip") ) + throw new LuanException("file "+zipFile+" doesn't end with '.zip'"); + IndexCommit ic = snapshotDeletionPolicy.snapshot(); + try { + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile)); + for( String fileName : ic.getFileNames() ) { + out.putNextEntry(new ZipEntry(fileName)); + FileInputStream in = new FileInputStream(new File(indexDir,fileName)); + Utils.copyAll(in,out); + in.close(); + out.closeEntry(); + } + out.close(); + } finally { + snapshotDeletionPolicy.release(ic); + } + } +*/ + public Object snapshot(LuanState luan,LuanFunction fn) throws LuanException, IOException { + IndexCommit ic = snapshotDeletionPolicy.snapshot(); + try { + String dir = fileDir.toString(); + LuanTable fileNames = new LuanTable(new ArrayList(ic.getFileNames())); + return fn.call(luan,new Object[]{dir,fileNames}); + } finally { + snapshotDeletionPolicy.release(ic); + } + } + + + + public String to_string() { + return writer.getDirectory().toString(); + } + + public void close() throws IOException { + if( !isClosed ) { + writer.close(); + reader.close(); + isClosed = true; + } + } + + protected void finalize() throws Throwable { + if( !isClosed ) { + logger.error("not closed"); + close(); + } + super.finalize(); + } + + + + private static class DocFn extends LuanFunction { + final IndexSearcher searcher; + int docID; + + DocFn(IndexSearcher searcher) { + this.searcher = searcher; + } + + @Override public Object call(LuanState luan,Object[] args) throws LuanException { + try { + return toTable(searcher.doc(docID)); + } catch(IOException e) { + throw new LuanException(e); + } + } + } + + private static abstract class MyCollector extends Collector { + int docBase; + int i = 0; + + @Override public void setScorer(Scorer scorer) {} + @Override public void setNextReader(AtomicReaderContext context) { + this.docBase = context.docBase; + } + @Override public boolean acceptsDocsOutOfOrder() { + return true; + } + } + + private synchronized IndexSearcher openSearcher() throws IOException { + int gwc = globalWriteCount(); + if( writeCount != gwc ) { + writeCount = gwc; + DirectoryReader newReader = DirectoryReader.openIfChanged(reader); + if( newReader != null ) { + reader.decRef(); + reader = newReader; + searcher = new IndexSearcher(reader); + } + } + reader.incRef(); + return searcher; + } + + // call in finally block + private static void close(IndexSearcher searcher) throws IOException { + searcher.getIndexReader().decRef(); + } + + public void ensure_open() throws IOException { + close(openSearcher()); + } + + public int advanced_search( final LuanState luan, String queryStr, LuanFunction fn, Integer n, String sortStr ) throws LuanException, IOException, ParseException { + Utils.checkNotNull(queryStr); + Query query = SaneQueryParser.parseQuery(mfp,queryStr); + IndexSearcher searcher = threadLocalSearcher.get(); + boolean inTransaction = searcher != null; + if( !inTransaction ) + searcher = openSearcher(); + try { + if( fn!=null && n==null ) { + if( sortStr != null ) + throw new LuanException("sort must be nil when n is nil"); + final DocFn docFn = new DocFn(searcher); + MyCollector col = new MyCollector() { + @Override public void collect(int doc) { + try { + docFn.docID = docBase + doc; + fn.call(luan,new Object[]{++i,docFn}); + } catch(LuanException e) { + throw new LuanRuntimeException(e); + } + } + }; + try { + searcher.search(query,col); + } catch(LuanRuntimeException e) { + throw (LuanException)e.getCause(); + } + return col.i; + } + if( fn==null || n==0 ) { + TotalHitCountCollector thcc = new TotalHitCountCollector(); + searcher.search(query,thcc); + return thcc.getTotalHits(); + } + Sort sort = sortStr==null ? null : SaneQueryParser.parseSort(mfp,sortStr); + TopDocs td = sort==null ? searcher.search(query,n) : searcher.search(query,n,sort); + final ScoreDoc[] scoreDocs = td.scoreDocs; + DocFn docFn = new DocFn(searcher); + for( int i=0; i<scoreDocs.length; i++ ) { + docFn.docID = scoreDocs[i].doc; + fn.call(luan,new Object[]{i+1,docFn}); + } + return td.totalHits; + } finally { + if( !inTransaction ) + close(searcher); + } + } + + public Object search_in_transaction(LuanState luan,LuanFunction fn) throws LuanException, IOException { + if( threadLocalSearcher.get() != null ) + throw new LuanException("can't nest search_in_transaction calls"); + IndexSearcher searcher = openSearcher(); + threadLocalSearcher.set(searcher); + try { + return fn.call(luan); + } finally { + threadLocalSearcher.set(null); + close(searcher); + } + } + + + + public final LuanMeta indexedFieldsMeta = new LuanMeta() { + + @Override public boolean canNewindex() { + return true; + } + + @Override public Object __index(LuanState luan,LuanTable tbl,Object key) { + return mfp.fields.get(key); + } + + @Override public void __new_index(LuanState luan,LuanTable tbl,Object key,Object value) throws LuanException { + if( !(key instanceof String) ) + throw new LuanException("key must be string"); + String field = (String)key; + if( value==null ) { // delete + mfp.fields.remove(field); + return; + } + if( !(value instanceof FieldParser) ) + throw new LuanException("value must be FieldParser like the values of Lucene.type"); + FieldParser parser = (FieldParser)value; + mfp.fields.put( field, parser ); + } + + @Override public final Iterator keys(LuanTable tbl) { + return mfp.fields.keySet().iterator(); + } + + @Override protected String type(LuanTable tbl) { + return "lucene-indexed-fields"; + } + + }; + + + + private IndexableField newField(String name,Object value,Field.Store store,Set<String> indexed) + throws LuanException + { + if( value instanceof String ) { + String s = (String)value; + FieldParser fp = mfp.fields.get(name); + if( fp != null ) { + if( fp instanceof StringFieldParser && fp != STRING_FIELD_PARSER ) { + return new TextField(name, s, store); + } else { + return new StringField(name, s, store); + } + } else { + return new StoredField(name, s); + } + } else if( value instanceof Integer ) { + int i = (Integer)value; + if( indexed.contains(name) ) { + return new IntField(name, i, store); + } else { + return new StoredField(name, i); + } + } else if( value instanceof Long ) { + long i = (Long)value; + if( indexed.contains(name) ) { + return new LongField(name, i, store); + } else { + return new StoredField(name, i); + } + } else if( value instanceof Double ) { + double i = (Double)value; + if( indexed.contains(name) ) { + return new DoubleField(name, i, store); + } else { + return new StoredField(name, i); + } + } else if( value instanceof byte[] ) { + byte[] b = (byte[])value; + return new StoredField(name, b); + } else + throw new LuanException("invalid value type "+value.getClass()+"' for '"+name+"'"); + } + + private Document toLucene(LuanState luan,LuanTable table,Set indexOnly) throws LuanException { + Set<String> indexed = mfp.fields.keySet(); + Document doc = new Document(); + for( Map.Entry<Object,Object> entry : table.iterable(luan) ) { + Object key = entry.getKey(); + if( !(key instanceof String) ) + throw new LuanException("key must be string"); + String name = (String)key; + Object value = entry.getValue(); + Field.Store store = indexOnly.contains(name) ? Field.Store.NO : Field.Store.YES; + if( !(value instanceof LuanTable) ) { + doc.add(newField(name, value, store, indexed)); + } else { // list + LuanTable list = (LuanTable)value; + for( Object el : list.asList() ) { + doc.add(newField(name, el, store, indexed)); + } + } + } + return doc; + } + + private static Object getValue(IndexableField ifld) throws LuanException { + BytesRef br = ifld.binaryValue(); + if( br != null ) + return br.bytes; + Number n = ifld.numericValue(); + if( n != null ) + return n; + String s = ifld.stringValue(); + if( s != null ) + return s; + throw new LuanException("invalid field type for "+ifld); + } + + private static LuanTable toTable(Document doc) throws LuanException { + if( doc==null ) + return null; + LuanTable table = new LuanTable(); + for( IndexableField ifld : doc ) { + String name = ifld.name(); + Object value = getValue(ifld); + Object old = table.rawGet(name); + if( old == null ) { + table.rawPut(name,value); + } else { + LuanTable list; + if( old instanceof LuanTable ) { + list = (LuanTable)old; + } else { + list = new LuanTable(); + list.rawPut(1,old); + table.rawPut(name,list); + } + list.rawPut(list.rawLength()+1,value); + } + } + return table; + } + + + public LuanFunction highlighter(LuanState luan,String queryStr,LuanFunction formatter) throws ParseException { + Query query = SaneQueryParser.parseQuery(mfp,queryStr); + Formatter fmt = new Formatter() { + public String highlightTerm(String originalText,TokenGroup tokenGroup) { + if( tokenGroup.getTotalScore() <= 0 ) + return originalText; + try { + return (String)Luan.first(formatter.call(luan,new Object[]{originalText})); + } catch(LuanException e) { + throw new LuanRuntimeException(e); + } + } + }; + Highlighter hl = new Highlighter( fmt, new QueryScorer(query) ); + hl.setTextFragmenter( new NullFragmenter() ); + return new LuanFunction() { + @Override public String call(LuanState luan,Object[] args) throws LuanException { + String text = (String)args[0]; + try { + String s = hl.getBestFragment(analyzer,null,text); + return s!=null ? s : text; + } catch(LuanRuntimeException e) { + throw (LuanException)e.getCause(); + } catch(IOException e) { + throw new RuntimeException(e); + } catch(InvalidTokenOffsetsException e) { + throw new RuntimeException(e); + } + } + }; + } +}