view lucene/src/luan/modules/lucene/LuceneIndex.java @ 618:5e495e4e560b

add lucene indexed_only_fields
author Franklin Schmidt <fschmidt@gmail.com>
date Fri, 01 Jan 2016 01:24:10 -0700
parents e54c1646eed0
children 89eb02f9827f
line wrap: on
line source

package luan.modules.lucene;

import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
import java.util.Collections;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.ZipOutputStream;
import java.util.zip.ZipEntry;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.core.KeywordAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.document.IntField;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.DoubleField;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.Scorer;
import sane.lucene.queryparser.SaneQueryParser;
import sane.lucene.queryparser.FieldParser;
import sane.lucene.queryparser.MultiFieldParser;
import sane.lucene.queryparser.StringFieldParser;
import sane.lucene.queryparser.NumberFieldParser;
import sane.lucene.queryparser.ParseException;
import luan.modules.Utils;
import luan.Luan;
import luan.LuanState;
import luan.LuanTable;
import luan.LuanFunction;
import luan.LuanJavaFunction;
import luan.LuanException;
import luan.LuanMeta;
import luan.LuanRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public final class LuceneIndex implements Closeable {
	private static final Logger logger = LoggerFactory.getLogger(LuceneIndex.class);

	private static final String FLD_NEXT_ID = "nextId";
	public static final StringFieldParser STRING_FIELD_PARSER = new StringFieldParser(new KeywordAnalyzer());

	private final ReentrantLock writeLock = new ReentrantLock();
	private final File indexDir;
	final SnapshotDeletionPolicy snapshotDeletionPolicy;
	private final IndexWriter writer;
	private DirectoryReader reader;
	private IndexSearcher searcher;
	private final ThreadLocal<IndexSearcher> threadLocalSearcher = new ThreadLocal<IndexSearcher>();
	private boolean isClosed = false;
	private final MultiFieldParser mfp;
	public final LuanTable indexed_only_fields = new LuanTable();

	public LuceneIndex(LuanState luan,String indexDirStr,FieldParser defaultFieldParser,String[] defaultFields) throws LuanException, IOException {
		mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields);
		mfp.fields.put( "type", STRING_FIELD_PARSER );
		mfp.fields.put( "id", NumberFieldParser.LONG );
		File indexDir = new File(indexDirStr);
		this.indexDir = indexDir;
		Directory dir = FSDirectory.open(indexDir);
		Version version = Version.LUCENE_4_9;		
		Analyzer analyzer = STRING_FIELD_PARSER.analyzer;
		if( defaultFieldParser instanceof StringFieldParser ) {
			StringFieldParser sfp = (StringFieldParser)defaultFieldParser;
			analyzer = sfp.analyzer;
		}
		IndexWriterConfig conf = new IndexWriterConfig(version,analyzer);
		snapshotDeletionPolicy = new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy());
		conf.setIndexDeletionPolicy(snapshotDeletionPolicy);
		writer = new IndexWriter(dir,conf);
		writer.commit();  // commit index creation
		reader = DirectoryReader.open(dir);
		luan.onClose(this);
		searcher = new IndexSearcher(reader);
		initId(luan);
	}



	public void delete_all() throws IOException {
		boolean commit = !writeLock.isHeldByCurrentThread();
		writeLock.lock();
		try {
			writer.deleteAll();
			id = idLim = 0;
			if(commit) writer.commit();
		} finally {
			writeLock.unlock();
		}
	}

	private static Term term(String key,long value) {
		BytesRef br = new BytesRef();
		NumericUtils.longToPrefixCoded(value,0,br);
		return new Term(key,br);
	}

	public void delete(LuanState luan,String queryStr) throws LuanException, IOException, ParseException {
		Query query = SaneQueryParser.parseQuery(mfp,queryStr);

		boolean commit = !writeLock.isHeldByCurrentThread();
		writeLock.lock();
		try {
			writer.deleteDocuments(query);
			if(commit) writer.commit();
		} finally {
			writeLock.unlock();
		}
	}

	public void save(LuanState luan,LuanTable doc) throws LuanException, IOException {
		Set indexedOnlySet = new HashSet();
		Object typeObj = doc.get(luan,"type");
		if( typeObj==null )
			throw new LuanException(luan,"missing 'type' field");
		if( !(typeObj instanceof String) )
			throw new LuanException(luan,"type must be string");
		String type = (String)typeObj;
		Object indexedOnlyObj = indexed_only_fields.get(luan,type);
		if( indexedOnlyObj != null ) {
			if( !(indexedOnlyObj instanceof LuanTable) )
				throw new LuanException(luan,"indexed_only_fields elements must be tables");
			LuanTable indexedOnly = (LuanTable)indexedOnlyObj;
			for( Map.Entry<Object,Object> entry : indexedOnly.iterable(luan) ) {
				Object key = entry.getKey();
				if( !(key instanceof String) )
					throw new LuanException(luan,"indexed_only_fields."+type+" entries must be strings");
				String name = (String)key;
				Object value = entry.getValue();
				if( !(value instanceof LuanFunction) )
					throw new LuanException(luan,"indexed_only_fields."+type+" values must be functions");
				LuanFunction fn = (LuanFunction)value;
				value = Luan.first(fn.call(luan,new Object[]{doc}));
				doc.put(luan, name, value );
				indexedOnlySet.add(name);
			}
		}
		Object obj = doc.get(luan,"id");
		Long id;
		try {
			id = (Long)obj;
		} catch(ClassCastException e) {
			throw new LuanException(luan,"id should be Long but is "+obj.getClass().getSimpleName());
		}

		boolean commit = !writeLock.isHeldByCurrentThread();
		writeLock.lock();
		try {
			if( id == null ) {
				id = nextId(luan);
				doc.put(luan,"id",id);
				writer.addDocument(toLucene(luan,doc,indexedOnlySet));
			} else {
				writer.updateDocument( term("id",id), toLucene(luan,doc,indexedOnlySet) );
			}
			if(commit) writer.commit();
		} finally {
			writeLock.unlock();
		}
	}

	public void update_in_transaction(LuanState luan,LuanFunction fn) throws IOException, LuanException {
		boolean commit = !writeLock.isHeldByCurrentThread();
		writeLock.lock();
		try {
			fn.call(luan);
			if(commit) writer.commit();
		} finally {
			writeLock.unlock();
		}
	}


	private long id = 0;
	private long idLim = 0;
	private final int idBatch = 10;

	private void initId(LuanState luan) throws LuanException, IOException {
		TopDocs td = searcher.search(new TermQuery(new Term("type","next_id")),1);
/*
		// tmp hack
		if( td.totalHits == 0 ) {
			td = searcher.search(new TermQuery(new Term("type index","next_id")),1);
			if( td.totalHits == 1 ) {
				long idLim = (Long)searcher.doc(td.scoreDocs[0].doc).getField(FLD_NEXT_ID).numericValue();
				LuanTable doc = new LuanTable();
				doc.rawPut( "type", "next_id" );
				doc.rawPut( FLD_NEXT_ID, idLim );
				writer.addDocument(toLucene(luan,doc));
				writer.commit();
			}
		}
*/
		switch(td.totalHits) {
		case 0:
			break;  // do nothing
		case 1:
			idLim = (Long)searcher.doc(td.scoreDocs[0].doc).getField(FLD_NEXT_ID).numericValue();
			id = idLim;
			break;
		default:
			throw new RuntimeException();
		}
	}

	public synchronized long nextId(LuanState luan) throws LuanException, IOException {
		if( ++id > idLim ) {
			idLim += idBatch;
			LuanTable doc = new LuanTable();
			doc.rawPut( "type", "next_id" );
			doc.rawPut( FLD_NEXT_ID, idLim );
			writer.updateDocument(new Term("type","next_id"),toLucene(luan,doc,Collections.EMPTY_SET));
		}
		return id;
	}


	public void backup(LuanState luan,String zipFile) throws LuanException, IOException {
		if( !zipFile.endsWith(".zip") )
			throw new LuanException(luan,"file "+zipFile+" doesn't end with '.zip'");
		IndexCommit ic = snapshotDeletionPolicy.snapshot();
		try {
			ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile));
			for( String fileName : ic.getFileNames() ) {
				out.putNextEntry(new ZipEntry(fileName));
				FileInputStream in = new FileInputStream(new File(indexDir,fileName));
				Utils.copyAll(in,out);
				in.close();
				out.closeEntry();
			}
			out.close();
		} finally {
			snapshotDeletionPolicy.release(ic);
		}
	}



	public String to_string() {
		return writer.getDirectory().toString();
	}

	public void close() throws IOException {
		if( !isClosed ) {
			writer.close();
			reader.close();
			isClosed = true;
		}
	}

	protected void finalize() throws Throwable {
		if( !isClosed ) {
			logger.error("not closed");
			close();
		}
		super.finalize();
	}



	private static class DocFn extends LuanFunction {
		final IndexSearcher searcher;
		int docID;

		DocFn(IndexSearcher searcher) {
			this.searcher = searcher;
		}

		@Override public Object call(LuanState luan,Object[] args) throws LuanException {
			try {
				return toTable(luan,searcher.doc(docID));
			} catch(IOException e) {
				throw new LuanException(luan,e);
			}
		}
	}

	private static abstract class MyCollector extends Collector {
		int docBase;
		int i = 0;

		@Override public void setScorer(Scorer scorer) {}
		@Override public void setNextReader(AtomicReaderContext context) {
			this.docBase = context.docBase;
		}
		@Override public boolean acceptsDocsOutOfOrder() {
			return true;
		}
	}

	private synchronized IndexSearcher openSearcher() throws IOException {
		DirectoryReader newReader = DirectoryReader.openIfChanged(reader);
		if( newReader != null ) {
			reader.decRef();
			reader = newReader;
			searcher = new IndexSearcher(reader);
		}
		reader.incRef();
		return searcher;
	}

	// call in finally block
	private static void close(IndexSearcher searcher) throws IOException {
		searcher.getIndexReader().decRef();
	}

	public void ensure_open() throws IOException {
		close(openSearcher());
	}

	public int advanced_search( final LuanState luan, String queryStr, LuanFunction fn, Integer n, String sortStr ) throws LuanException, IOException, ParseException {
		Utils.checkNotNull(luan,queryStr);
		Query query = SaneQueryParser.parseQuery(mfp,queryStr);
		IndexSearcher searcher = threadLocalSearcher.get();
		boolean inTransaction = searcher != null;
		if( !inTransaction )
			searcher = openSearcher();
		try {
			if( fn!=null && n==null ) {
				if( sortStr != null )
					throw new LuanException(luan,"sort must be nil when n is nil");
				final DocFn docFn = new DocFn(searcher);
				MyCollector col = new MyCollector() {
					@Override public void collect(int doc) {
						try {
							docFn.docID = docBase + doc;
							fn.call(luan,new Object[]{++i,docFn});
						} catch(LuanException e) {
							throw new LuanRuntimeException(e);
						}
					}
				};
				try {
					searcher.search(query,col);
				} catch(LuanRuntimeException e) {
					throw (LuanException)e.getCause();
				}
				return col.i;
			}
			if( fn==null || n==0 ) {
				TotalHitCountCollector thcc = new TotalHitCountCollector();
				searcher.search(query,thcc);
				return thcc.getTotalHits();
			}
			Sort sort = sortStr==null ? null : SaneQueryParser.parseSort(mfp,sortStr);
			TopDocs td = sort==null ? searcher.search(query,n) : searcher.search(query,n,sort);
			final ScoreDoc[] scoreDocs = td.scoreDocs;
			DocFn docFn = new DocFn(searcher);
			for( int i=0; i<scoreDocs.length; i++ ) {
				docFn.docID = scoreDocs[i].doc;
				fn.call(luan,new Object[]{i+1,docFn});
			}
			return td.totalHits;
		} finally {
			if( !inTransaction )
				close(searcher);
		}
	}

	public Object search_in_transaction(LuanState luan,LuanFunction fn) throws LuanException, IOException {
		if( threadLocalSearcher.get() != null )
			throw new LuanException(luan,"can't nest search_in_transaction calls");
		IndexSearcher searcher = openSearcher();
		threadLocalSearcher.set(searcher);
		try {
			return fn.call(luan);
		} finally {
			threadLocalSearcher.set(null);
			close(searcher);
		}
	}



	public final LuanMeta indexedFieldsMeta = new LuanMeta() {

		@Override public boolean canNewindex() {
			return true;
		}

		@Override public Object __index(LuanState luan,LuanTable tbl,Object key) {
			return mfp.fields.get(key);
		}

		@Override public void __new_index(LuanState luan,LuanTable tbl,Object key,Object value) throws LuanException {
			if( !(key instanceof String) )
				throw new LuanException(luan,"key must be string");
			String field = (String)key;
			if( value==null ) {  // delete
				mfp.fields.remove(field);
				return;
			}
			if( !(value instanceof FieldParser) )
				throw new LuanException(luan,"value must be FieldParser like the values of Lucene.type");
			FieldParser parser = (FieldParser)value;
			mfp.fields.put( field, parser );
		}

		@Override public final Iterator keys(LuanTable tbl) {
			return mfp.fields.keySet().iterator();
		}

		@Override protected String type(LuanTable tbl) {
			return "lucene-indexed-fields";
		}

	};




	private Document toLucene(LuanState luan,LuanTable table,Set indexOnly) throws LuanException {
		Set<String> indexed = mfp.fields.keySet();
		Document doc = new Document();
		for( Map.Entry<Object,Object> entry : table.iterable(luan) ) {
			Object key = entry.getKey();
			if( !(key instanceof String) )
				throw new LuanException(luan,"key must be string");
			String name = (String)key;
			Object value = entry.getValue();
			Field.Store store = indexOnly.contains(name) ? Field.Store.NO : Field.Store.YES;
			if( value instanceof String ) {
				String s = (String)value;
				FieldParser fp = mfp.fields.get(name);
				if( fp != null ) {
					if( fp instanceof StringFieldParser && fp != STRING_FIELD_PARSER ) {
						doc.add(new TextField(name, s, store));
					} else {
						doc.add(new StringField(name, s, store));
					}
				} else {
					doc.add(new StoredField(name, s));
				}
			} else if( value instanceof Integer ) {
				int i = (Integer)value;
				if( indexed.contains(name) ) {
					doc.add(new IntField(name, i, store));
				} else {
					doc.add(new StoredField(name, i));
				}
			} else if( value instanceof Long ) {
				long i = (Long)value;
				if( indexed.contains(name) ) {
					doc.add(new LongField(name, i, store));
				} else {
					doc.add(new StoredField(name, i));
				}
			} else if( value instanceof Double ) {
				double i = (Double)value;
				if( indexed.contains(name) ) {
					doc.add(new DoubleField(name, i, store));
				} else {
					doc.add(new StoredField(name, i));
				}
			} else if( value instanceof byte[] ) {
				byte[] b = (byte[])value;
				doc.add(new StoredField(name, b));
			} else
				throw new LuanException(luan,"invalid value type "+value.getClass()+"' for '"+name+"'");
		}
		return doc;
	}

	private static LuanTable toTable(LuanState luan,Document doc) throws LuanException {
		if( doc==null )
			return null;
		LuanTable table = new LuanTable();
		for( IndexableField ifld : doc ) {
			String name = ifld.name();
			BytesRef br = ifld.binaryValue();
			if( br != null ) {
				table.rawPut(name,br.bytes);
				continue;
			}
			Number n = ifld.numericValue();
			if( n != null ) {
				table.rawPut(name,n);
				continue;
			}
			String s = ifld.stringValue();
			if( s != null ) {
				table.rawPut(name,s);
				continue;
			}
			throw new LuanException(luan,"invalid field type for "+ifld);
		}
		return table;
	}

}