diff src/luan/modules/lucene/LuceneIndex.java @ 775:1a68fc55a80c

simplify dir structure
author Franklin Schmidt <fschmidt@gmail.com>
date Fri, 26 Aug 2016 14:36:40 -0600
parents lucene/src/luan/modules/lucene/LuceneIndex.java@9092e52f94eb
children 6b8ea0a9b7c9
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/luan/modules/lucene/LuceneIndex.java	Fri Aug 26 14:36:40 2016 -0600
@@ -0,0 +1,631 @@
+package luan.modules.lucene;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.zip.ZipOutputStream;
+import java.util.zip.ZipEntry;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.document.IntField;
+import org.apache.lucene.document.LongField;
+import org.apache.lucene.document.DoubleField;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.SnapshotDeletionPolicy;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.util.Version;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.NumericUtils;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TotalHitCountCollector;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.highlight.Formatter;
+import org.apache.lucene.search.highlight.Highlighter;
+import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
+import org.apache.lucene.search.highlight.NullFragmenter;
+import org.apache.lucene.search.highlight.QueryScorer;
+import org.apache.lucene.search.highlight.TokenGroup;
+import luan.modules.lucene.queryparser.SaneQueryParser;
+import luan.modules.lucene.queryparser.FieldParser;
+import luan.modules.lucene.queryparser.MultiFieldParser;
+import luan.modules.lucene.queryparser.StringFieldParser;
+import luan.modules.lucene.queryparser.NumberFieldParser;
+import luan.modules.lucene.queryparser.ParseException;
+import luan.modules.Utils;
+import luan.Luan;
+import luan.LuanState;
+import luan.LuanTable;
+import luan.LuanFunction;
+import luan.LuanException;
+import luan.LuanMeta;
+import luan.LuanRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public final class LuceneIndex implements Closeable {
+	private static final Logger logger = LoggerFactory.getLogger(LuceneIndex.class);
+
+	private static final String FLD_NEXT_ID = "nextId";
+	public static final StringFieldParser STRING_FIELD_PARSER = new StringFieldParser(new KeywordAnalyzer());
+
+	private static final Version version = Version.LUCENE_4_9;
+	private final ReentrantLock writeLock = new ReentrantLock();
+	private final File indexDir;
+	private SnapshotDeletionPolicy snapshotDeletionPolicy;
+	private IndexWriter writer;
+	private DirectoryReader reader;
+	private IndexSearcher searcher;
+	private final ThreadLocal<IndexSearcher> threadLocalSearcher = new ThreadLocal<IndexSearcher>();
+	private boolean isClosed = true;
+	private final MultiFieldParser mfp;
+	public final LuanTable indexed_only_fields = new LuanTable();
+	private final Analyzer analyzer;
+
+	private static ConcurrentMap<File,AtomicInteger> globalWriteCounters = new ConcurrentHashMap<File,AtomicInteger>();
+	private File fileDir;
+	private int writeCount;
+
+	public LuceneIndex(LuanState luan,String indexDirStr,FieldParser defaultFieldParser,String[] defaultFields) throws LuanException, IOException {
+		mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields);
+		mfp.fields.put( "type", STRING_FIELD_PARSER );
+		mfp.fields.put( "id", NumberFieldParser.LONG );
+		File indexDir = new File(indexDirStr);
+		this.indexDir = indexDir;
+		Analyzer analyzer = STRING_FIELD_PARSER.analyzer;
+		if( defaultFieldParser instanceof StringFieldParser ) {
+			StringFieldParser sfp = (StringFieldParser)defaultFieldParser;
+			analyzer = sfp.analyzer;
+		}
+		this.analyzer = analyzer;
+		luan.onClose(this);
+		reopen();
+	}
+
+	public void reopen() throws LuanException, IOException {
+		if( !isClosed )  throw new RuntimeException();
+		isClosed = false;
+		IndexWriterConfig conf = new IndexWriterConfig(version,analyzer);
+		snapshotDeletionPolicy = new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy());
+		conf.setIndexDeletionPolicy(snapshotDeletionPolicy);
+		FSDirectory dir = FSDirectory.open(indexDir);
+		fileDir = dir.getDirectory();
+		globalWriteCounters.putIfAbsent(fileDir,new AtomicInteger());
+		writer = new IndexWriter(dir,conf);
+		writer.commit();  // commit index creation
+		reader = DirectoryReader.open(dir);
+		searcher = new IndexSearcher(reader);
+		initId();
+	}
+
+	private int globalWriteCount() {
+		return globalWriteCounters.get(fileDir).get();
+	}
+
+	private void wrote() {
+		globalWriteCounters.get(fileDir).incrementAndGet();
+	}
+
+	public void delete_all() throws IOException {
+		boolean commit = !writeLock.isHeldByCurrentThread();
+		writeLock.lock();
+		try {
+			writer.deleteAll();
+			id = idLim = 0;
+			if(commit) writer.commit();
+		} finally {
+			wrote();
+			writeLock.unlock();
+		}
+	}
+
+	private static Term term(String key,long value) {
+		BytesRef br = new BytesRef();
+		NumericUtils.longToPrefixCoded(value,0,br);
+		return new Term(key,br);
+	}
+
+	public void delete(LuanState luan,String queryStr) throws LuanException, IOException, ParseException {
+		Query query = SaneQueryParser.parseQuery(mfp,queryStr);
+
+		boolean commit = !writeLock.isHeldByCurrentThread();
+		writeLock.lock();
+		try {
+			writer.deleteDocuments(query);
+			if(commit) writer.commit();
+		} finally {
+			wrote();
+			writeLock.unlock();
+		}
+	}
+
+	public void save(LuanState luan,LuanTable doc) throws LuanException, IOException {
+		Set indexedOnlySet = new HashSet();
+		Object typeObj = doc.get(luan,"type");
+		if( typeObj==null )
+			throw new LuanException("missing 'type' field");
+		if( !(typeObj instanceof String) )
+			throw new LuanException("type must be string");
+		String type = (String)typeObj;
+		Object indexedOnlyObj = indexed_only_fields.get(luan,type);
+		if( indexedOnlyObj != null ) {
+			if( !(indexedOnlyObj instanceof LuanTable) )
+				throw new LuanException("indexed_only_fields elements must be tables");
+			LuanTable indexedOnly = (LuanTable)indexedOnlyObj;
+			for( Map.Entry<Object,Object> entry : indexedOnly.iterable(luan) ) {
+				Object key = entry.getKey();
+				if( !(key instanceof String) )
+					throw new LuanException("indexed_only_fields."+type+" entries must be strings");
+				String name = (String)key;
+				Object value = entry.getValue();
+				if( !(value instanceof LuanFunction) )
+					throw new LuanException("indexed_only_fields."+type+" values must be functions");
+				LuanFunction fn = (LuanFunction)value;
+				value = Luan.first(fn.call(luan,new Object[]{doc}));
+				doc.put(luan, name, value );
+				indexedOnlySet.add(name);
+			}
+		}
+		Object obj = doc.get(luan,"id");
+		Long id;
+		try {
+			id = (Long)obj;
+		} catch(ClassCastException e) {
+			throw new LuanException("id should be Long but is "+obj.getClass().getSimpleName());
+		}
+
+		boolean commit = !writeLock.isHeldByCurrentThread();
+		writeLock.lock();
+		try {
+			if( id == null ) {
+				id = nextId(luan);
+				doc.put(luan,"id",id);
+				writer.addDocument(toLucene(luan,doc,indexedOnlySet));
+			} else {
+				writer.updateDocument( term("id",id), toLucene(luan,doc,indexedOnlySet) );
+			}
+			if(commit) writer.commit();
+		} finally {
+			wrote();
+			writeLock.unlock();
+		}
+	}
+
+	public void update_in_transaction(LuanState luan,LuanFunction fn) throws IOException, LuanException {
+		boolean commit = !writeLock.isHeldByCurrentThread();
+		writeLock.lock();
+		try {
+			fn.call(luan);
+			if(commit) writer.commit();
+		} finally {
+			wrote();
+			writeLock.unlock();
+		}
+	}
+
+	public void run_in_lock(LuanState luan,LuanFunction fn) throws IOException, LuanException {
+		if( writeLock.isHeldByCurrentThread() )
+			throw new RuntimeException();
+		writeLock.lock();
+		try {
+			synchronized(this) {
+				fn.call(luan);
+			}
+		} finally {
+			wrote();
+			writeLock.unlock();
+		}
+	}
+
+
+	private long id;
+	private long idLim;
+	private final int idBatch = 10;
+
+	private void initId() throws LuanException, IOException {
+		TopDocs td = searcher.search(new TermQuery(new Term("type","next_id")),1);
+		switch(td.totalHits) {
+		case 0:
+			id = 0;
+			idLim = 0;
+			break;
+		case 1:
+			idLim = (Long)searcher.doc(td.scoreDocs[0].doc).getField(FLD_NEXT_ID).numericValue();
+			id = idLim;
+			break;
+		default:
+			throw new RuntimeException();
+		}
+	}
+
+	public synchronized long nextId(LuanState luan) throws LuanException, IOException {
+		if( ++id > idLim ) {
+			idLim += idBatch;
+			LuanTable doc = new LuanTable();
+			doc.rawPut( "type", "next_id" );
+			doc.rawPut( FLD_NEXT_ID, idLim );
+			writer.updateDocument(new Term("type","next_id"),toLucene(luan,doc,Collections.EMPTY_SET));
+			wrote();
+		}
+		return id;
+	}
+
+/*
+	public void backup(String zipFile) throws LuanException, IOException {
+		if( !zipFile.endsWith(".zip") )
+			throw new LuanException("file "+zipFile+" doesn't end with '.zip'");
+		IndexCommit ic = snapshotDeletionPolicy.snapshot();
+		try {
+			ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile));
+			for( String fileName : ic.getFileNames() ) {
+				out.putNextEntry(new ZipEntry(fileName));
+				FileInputStream in = new FileInputStream(new File(indexDir,fileName));
+				Utils.copyAll(in,out);
+				in.close();
+				out.closeEntry();
+			}
+			out.close();
+		} finally {
+			snapshotDeletionPolicy.release(ic);
+		}
+	}
+*/
+	public Object snapshot(LuanState luan,LuanFunction fn) throws LuanException, IOException {
+		IndexCommit ic = snapshotDeletionPolicy.snapshot();
+		try {
+			String dir = fileDir.toString();
+			LuanTable fileNames = new LuanTable(new ArrayList(ic.getFileNames()));
+			return fn.call(luan,new Object[]{dir,fileNames});
+		} finally {
+			snapshotDeletionPolicy.release(ic);
+		}
+	}
+
+
+
+	public String to_string() {
+		return writer.getDirectory().toString();
+	}
+
+	public void close() throws IOException {
+		if( !isClosed ) {
+			writer.close();
+			reader.close();
+			isClosed = true;
+		}
+	}
+
+	protected void finalize() throws Throwable {
+		if( !isClosed ) {
+			logger.error("not closed");
+			close();
+		}
+		super.finalize();
+	}
+
+
+
+	private static class DocFn extends LuanFunction {
+		final IndexSearcher searcher;
+		int docID;
+
+		DocFn(IndexSearcher searcher) {
+			this.searcher = searcher;
+		}
+
+		@Override public Object call(LuanState luan,Object[] args) throws LuanException {
+			try {
+				return toTable(searcher.doc(docID));
+			} catch(IOException e) {
+				throw new LuanException(e);
+			}
+		}
+	}
+
+	private static abstract class MyCollector extends Collector {
+		int docBase;
+		int i = 0;
+
+		@Override public void setScorer(Scorer scorer) {}
+		@Override public void setNextReader(AtomicReaderContext context) {
+			this.docBase = context.docBase;
+		}
+		@Override public boolean acceptsDocsOutOfOrder() {
+			return true;
+		}
+	}
+
+	private synchronized IndexSearcher openSearcher() throws IOException {
+		int gwc = globalWriteCount();
+		if( writeCount != gwc ) {
+			writeCount = gwc;
+			DirectoryReader newReader = DirectoryReader.openIfChanged(reader);
+			if( newReader != null ) {
+				reader.decRef();
+				reader = newReader;
+				searcher = new IndexSearcher(reader);
+			}
+		}
+		reader.incRef();
+		return searcher;
+	}
+
+	// call in finally block
+	private static void close(IndexSearcher searcher) throws IOException {
+		searcher.getIndexReader().decRef();
+	}
+
+	public void ensure_open() throws IOException {
+		close(openSearcher());
+	}
+
+	public int advanced_search( final LuanState luan, String queryStr, LuanFunction fn, Integer n, String sortStr ) throws LuanException, IOException, ParseException {
+		Utils.checkNotNull(queryStr);
+		Query query = SaneQueryParser.parseQuery(mfp,queryStr);
+		IndexSearcher searcher = threadLocalSearcher.get();
+		boolean inTransaction = searcher != null;
+		if( !inTransaction )
+			searcher = openSearcher();
+		try {
+			if( fn!=null && n==null ) {
+				if( sortStr != null )
+					throw new LuanException("sort must be nil when n is nil");
+				final DocFn docFn = new DocFn(searcher);
+				MyCollector col = new MyCollector() {
+					@Override public void collect(int doc) {
+						try {
+							docFn.docID = docBase + doc;
+							fn.call(luan,new Object[]{++i,docFn});
+						} catch(LuanException e) {
+							throw new LuanRuntimeException(e);
+						}
+					}
+				};
+				try {
+					searcher.search(query,col);
+				} catch(LuanRuntimeException e) {
+					throw (LuanException)e.getCause();
+				}
+				return col.i;
+			}
+			if( fn==null || n==0 ) {
+				TotalHitCountCollector thcc = new TotalHitCountCollector();
+				searcher.search(query,thcc);
+				return thcc.getTotalHits();
+			}
+			Sort sort = sortStr==null ? null : SaneQueryParser.parseSort(mfp,sortStr);
+			TopDocs td = sort==null ? searcher.search(query,n) : searcher.search(query,n,sort);
+			final ScoreDoc[] scoreDocs = td.scoreDocs;
+			DocFn docFn = new DocFn(searcher);
+			for( int i=0; i<scoreDocs.length; i++ ) {
+				docFn.docID = scoreDocs[i].doc;
+				fn.call(luan,new Object[]{i+1,docFn});
+			}
+			return td.totalHits;
+		} finally {
+			if( !inTransaction )
+				close(searcher);
+		}
+	}
+
+	public Object search_in_transaction(LuanState luan,LuanFunction fn) throws LuanException, IOException {
+		if( threadLocalSearcher.get() != null )
+			throw new LuanException("can't nest search_in_transaction calls");
+		IndexSearcher searcher = openSearcher();
+		threadLocalSearcher.set(searcher);
+		try {
+			return fn.call(luan);
+		} finally {
+			threadLocalSearcher.set(null);
+			close(searcher);
+		}
+	}
+
+
+
+	public final LuanMeta indexedFieldsMeta = new LuanMeta() {
+
+		@Override public boolean canNewindex() {
+			return true;
+		}
+
+		@Override public Object __index(LuanState luan,LuanTable tbl,Object key) {
+			return mfp.fields.get(key);
+		}
+
+		@Override public void __new_index(LuanState luan,LuanTable tbl,Object key,Object value) throws LuanException {
+			if( !(key instanceof String) )
+				throw new LuanException("key must be string");
+			String field = (String)key;
+			if( value==null ) {  // delete
+				mfp.fields.remove(field);
+				return;
+			}
+			if( !(value instanceof FieldParser) )
+				throw new LuanException("value must be FieldParser like the values of Lucene.type");
+			FieldParser parser = (FieldParser)value;
+			mfp.fields.put( field, parser );
+		}
+
+		@Override public final Iterator keys(LuanTable tbl) {
+			return mfp.fields.keySet().iterator();
+		}
+
+		@Override protected String type(LuanTable tbl) {
+			return "lucene-indexed-fields";
+		}
+
+	};
+
+
+
+	private IndexableField newField(String name,Object value,Field.Store store,Set<String> indexed)
+		throws LuanException
+	{
+		if( value instanceof String ) {
+			String s = (String)value;
+			FieldParser fp = mfp.fields.get(name);
+			if( fp != null ) {
+				if( fp instanceof StringFieldParser && fp != STRING_FIELD_PARSER ) {
+					return new TextField(name, s, store);
+				} else {
+					return new StringField(name, s, store);
+				}
+			} else {
+				return new StoredField(name, s);
+			}
+		} else if( value instanceof Integer ) {
+			int i = (Integer)value;
+			if( indexed.contains(name) ) {
+				return new IntField(name, i, store);
+			} else {
+				return new StoredField(name, i);
+			}
+		} else if( value instanceof Long ) {
+			long i = (Long)value;
+			if( indexed.contains(name) ) {
+				return new LongField(name, i, store);
+			} else {
+				return new StoredField(name, i);
+			}
+		} else if( value instanceof Double ) {
+			double i = (Double)value;
+			if( indexed.contains(name) ) {
+				return new DoubleField(name, i, store);
+			} else {
+				return new StoredField(name, i);
+			}
+		} else if( value instanceof byte[] ) {
+			byte[] b = (byte[])value;
+			return new StoredField(name, b);
+		} else
+			throw new LuanException("invalid value type "+value.getClass()+"' for '"+name+"'");
+	}
+
+	private Document toLucene(LuanState luan,LuanTable table,Set indexOnly) throws LuanException {
+		Set<String> indexed = mfp.fields.keySet();
+		Document doc = new Document();
+		for( Map.Entry<Object,Object> entry : table.iterable(luan) ) {
+			Object key = entry.getKey();
+			if( !(key instanceof String) )
+				throw new LuanException("key must be string");
+			String name = (String)key;
+			Object value = entry.getValue();
+			Field.Store store = indexOnly.contains(name) ? Field.Store.NO : Field.Store.YES;
+			if( !(value instanceof LuanTable) ) {
+				doc.add(newField(name, value, store, indexed));
+			} else { // list
+				LuanTable list = (LuanTable)value;
+				for( Object el : list.asList() ) {
+					doc.add(newField(name, el, store, indexed));
+				}
+			}
+		}
+		return doc;
+	}
+
+	private static Object getValue(IndexableField ifld) throws LuanException {
+		BytesRef br = ifld.binaryValue();
+		if( br != null )
+			return br.bytes;
+		Number n = ifld.numericValue();
+		if( n != null )
+			return n;
+		String s = ifld.stringValue();
+		if( s != null )
+			return s;
+		throw new LuanException("invalid field type for "+ifld);
+	}
+
+	private static LuanTable toTable(Document doc) throws LuanException {
+		if( doc==null )
+			return null;
+		LuanTable table = new LuanTable();
+		for( IndexableField ifld : doc ) {
+			String name = ifld.name();
+			Object value = getValue(ifld);
+			Object old = table.rawGet(name);
+			if( old == null ) {
+				table.rawPut(name,value);
+			} else {
+				LuanTable list;
+				if( old instanceof LuanTable ) {
+					list = (LuanTable)old;
+				} else {
+					list = new LuanTable();
+					list.rawPut(1,old);
+					table.rawPut(name,list);
+				}
+				list.rawPut(list.rawLength()+1,value);
+			}
+		}
+		return table;
+	}
+
+
+	public LuanFunction highlighter(LuanState luan,String queryStr,LuanFunction formatter) throws ParseException {
+		Query query = SaneQueryParser.parseQuery(mfp,queryStr);
+		Formatter fmt = new Formatter() {
+			public String highlightTerm(String originalText,TokenGroup tokenGroup) {
+				if( tokenGroup.getTotalScore() <= 0 )
+					return originalText;
+				try {
+					return (String)Luan.first(formatter.call(luan,new Object[]{originalText}));
+				} catch(LuanException e) {
+					throw new LuanRuntimeException(e);
+				}
+			}
+		};
+		Highlighter hl = new Highlighter( fmt, new QueryScorer(query) );
+		hl.setTextFragmenter( new NullFragmenter() );
+		return new LuanFunction() {
+			@Override public String call(LuanState luan,Object[] args) throws LuanException {
+				String text = (String)args[0];
+				try {
+					String s = hl.getBestFragment(analyzer,null,text);
+					return s!=null ? s : text;
+				} catch(LuanRuntimeException e) {
+					throw (LuanException)e.getCause();
+				} catch(IOException e) {
+					throw new RuntimeException(e);
+				} catch(InvalidTokenOffsetsException e) {
+					throw new RuntimeException(e);
+				}
+			}
+		};
+	}
+}