view src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1551:9cc4cee39b8b

add LuanOpDoer
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 04 Oct 2020 16:29:54 -0600
parents 41c32da4cbd1
children 52241b69c339
line wrap: on
line source

package goodjava.lucene.logging;

import java.io.File;
import java.io.RandomAccessFile;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.PrefixQuery;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.Sort;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import goodjava.io.IoUtils;
import goodjava.lucene.api.GoodIndexWriter;
import goodjava.lucene.api.LuceneIndexWriter;
import goodjava.lucene.api.GoodCollector;
import goodjava.lucene.api.LuceneUtils;
import goodjava.logging.Logger;
import goodjava.logging.LoggerFactory;


public class LoggingIndexWriter implements GoodIndexWriter {
	private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class);
	private static final int version = 2;
	private static final int OP_DELETE_ALL = 1;
	private static final int OP_DELETE_DOCUMENTS = 2;
	private static final int OP_ADD_DOCUMENT = 3;
	private static final int OP_UPDATE_DOCUMENT = 4;
	private static final int OP_TAG = 5;
	private static final Random rnd = new Random();

	public final LuceneIndexWriter indexWriter;
	public boolean wasCreated;
	private final File logDir;
	private final long logTime;
	protected final LogFile[] logs = new LogFile[3];
	private LogOutputStream log;
	private final File index;
	private final SemaphoreLock mergeLock = new SemaphoreLock();

	public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir,long logTime)
		throws IOException
	{
		this.indexWriter = indexWriter;
		this.logDir = logDir;
		this.logTime = logTime;
		IoUtils.mkdirs(logDir);
		if( !logDir.isDirectory() )
			throw new RuntimeException();
		index = new File(logDir,"index");
		if( index.exists() ) {
			DataInputStream dis = new DataInputStream(new FileInputStream(index));
			try {
				if( dis.readInt() == version ) {
					for( int i=0; i<logs.length; i++ ) {
						File file = new File( logDir, dis.readUTF() );
						logs[i] = new LogFile(file);
					}
					deleteUnusedFiles();
					setLog();
					wasCreated = false;
					return;
				}
			} finally {
				dis.close();
			}
		}
		newLogs();
		wasCreated = true;
	}

	public IndexReader openReader() throws IOException {
		return indexWriter.openReader();
	}

	public IndexWriter getLuceneIndexWriter() {
		return indexWriter.getLuceneIndexWriter();
	}

	private void setLog() throws IOException {
		if( log != null )
			log.close();
		log = logs[2].output();
	}
/*
	public synchronized boolean isMerging() {
		return mergeLock.isLocked();
	}
*/
	private void getMergeLock() {
		try {
			if( !mergeLock.tryLock(1,TimeUnit.MINUTES) )
				throw new RuntimeException("failed to acquire lock");
		} catch(InterruptedException e) {
			throw new RuntimeException(e);
		}
	}

	public synchronized void newLogs() throws IOException {
		getMergeLock();
		try {
			newLogs2();
		} finally {
			mergeLock.unlock();
		}
	}

	private void newLogs2() throws IOException {
		logger.info("building new logs");
		for( int i=0; i<logs.length; i++ ) {
			logs[i] = newLogFile();
		}
		LogOutputStream log = logs[0].output();
		logLucene( System.currentTimeMillis(), log, indexWriter );
		log.close();
		writeIndex();
		setLog();
		logger.info("done building new logs");
	}

	public synchronized void logLucene()
		throws IOException
	{
		//log.rollback();  ?
		logLucene( System.currentTimeMillis(), log, indexWriter );
	}

	private static void logLucene(long time,LogOutputStream log,LuceneIndexWriter indexWriter)
		throws IOException
	{
		log.writeLong(time);
		log.writeByte(OP_DELETE_ALL);
		IndexReader reader = indexWriter.openReader();
		final IndexSearcher searcher = new IndexSearcher(reader);
		Query query = new MatchAllDocsQuery();
		searcher.search( query, new GoodCollector(){
			public void collectDoc(int iDoc) throws IOException {
				Document doc = searcher.doc(iDoc);
				Map<String,Object> storedFields = LuceneUtils.toMap(doc);
				log.writeLong(time);
				log.writeByte(OP_ADD_DOCUMENT);
				log.writeMap(storedFields);
			}
		});
		reader.close();
		log.commit();
	}

	private LogFile newLogFile() throws IOException {
		File file;
		do {
			file = new File(logDir,"_"+rnd.nextInt(100)+".log");
		} while( file.exists() );
		return new LogFile(file);
	}

	private void deleteUnusedFiles() throws IOException {
		deleteUnusedFiles(logs,index);
	}

	private static void deleteUnusedFiles(LogFile[] logs,File index) throws IOException {
		Set<String> used = new HashSet<String>();
		used.add( index.getName() );
		for( LogFile lf : logs ) {
			used.add( lf.file.getName() );
		}
		for( File f : index.getParentFile().listFiles() ) {
			if( !used.contains(f.getName()) ) {
				IoUtils.deleteRecursively(f);
			}
		}
	}

	private void writeIndex() throws IOException {
		writeIndex(logs,index);
	}

	public static void writeIndex(LogFile[] logs,File index) throws IOException {
		if( logs.length != 3 )
			throw new RuntimeException();
		ByteArrayOutputStream baos = new ByteArrayOutputStream();
		DataOutputStream dos = new DataOutputStream(baos);
		dos.writeInt(version);
		for( LogFile lf : logs ) {
			String fileName = lf.file.getName();
			dos.writeUTF(fileName);
		}
		dos.close();
		RandomAccessFile raf = new RandomAccessFile( index, "rwd" );
		raf.write( baos.toByteArray() );
		raf.close();
		deleteUnusedFiles(logs,index);
		//logger.info("writeIndex "+logs.toString());
	}

	private void mergeLogs() throws IOException {
		logger.info("merge");
		if( !mergeLock.isLocked() ) {
			logger.error("merge without lock");
			return;
		}
		LogFile first = logs[0];
		LogFile second = logs[1];
		long lastTime = second.file.lastModified();
		File dirFile = new File(logDir,"merge");
		if( dirFile.exists() )
			throw new RuntimeException();
		Directory dir = FSDirectory.open(dirFile);
		LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
		OpDoer opDoer = new BasicOpDoer(mergeWriter);
		playLog( first.input(), opDoer );
		playLog( second.input(), opDoer );
		mergeWriter.commit();
		LogFile merge = newLogFile();
		LogOutputStream log = merge.output();
		logLucene( lastTime, log, mergeWriter );
		log.close();
		mergeWriter.close();
		synchronized(this) {
			//check();
			logs[0] = merge;
			logs[1] = logs[2];
			logs[2] = newLogFile();
			writeIndex();
			setLog();
			//check(null);
		}
	}
	private final Runnable mergeLogs = new Runnable() { public void run() {
		try {
			mergeLogs();
		} catch(IOException e) {
			throw new RuntimeException(e);
		} finally {
			mergeLock.unlock();
		}
	} };

	private static class DocIter {
		final IndexReader reader;
		final TopDocs td;
		final int n;
		int i = 0;

		DocIter(IndexReader reader,Query query,Sort sort) throws IOException {
			this.reader = reader;
			IndexSearcher searcher = new IndexSearcher(reader);
			this.td = searcher.search(query,10000000,sort);
			this.n = td.scoreDocs.length;
			if( td.totalHits != n )
				throw new RuntimeException();
		}

		Document next() throws IOException {
			return i < n ? reader.document(td.scoreDocs[i++].doc) : null;
		}
	}

	private volatile boolean isChecking = false;

	public boolean check(SortField sortField) throws IOException {
		if( isChecking )
			throw new RuntimeException("another check is running");
		isChecking = true;
		try {
			return doCheck(sortField);
		} finally {
			isChecking = false;
		}
	}

	protected boolean doCheck(SortField sortField) throws IOException {
		boolean ok = true;
		IndexReader indexReader;
		LogInputStream[] logReaders;
		synchronized(this) {
			indexReader = indexWriter.openReader();
			logReaders = logReaders(logs);
		}
		try {
			//logger.info("check start");
			indexWriter.check();
			File dirFile = new File(logDir,"check");
			IoUtils.deleteRecursively(dirFile);
			Directory dir = FSDirectory.open(dirFile);
			LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
			playLogs(logReaders,new BasicOpDoer(checkWriter));
			//logger.info("check lucene");
			IndexReader checkReader = checkWriter.openReader();
			int nCheck = checkReader.numDocs();
			int nOrig = indexReader.numDocs();
			if( nCheck != nOrig ) {
				logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck);
				ok = false;
			}
			if( sortField == null ) {
				if( ok && hash(indexReader) != hash(checkReader) ) {
					logger.error("hash mismatch");
					ok = false;
				}
			} else {
				Sort sort = new Sort(sortField);
				String sortFieldName = sortField.getField();
				Query query = new PrefixQuery(new Term(sortFieldName));
				DocIter origIter = new DocIter(indexReader,query,sort);
				DocIter checkIter = new DocIter(checkReader,query,sort);
				Map<String,Object> origFields = LuceneUtils.toMap(origIter.next());
				Map<String,Object> checkFields = LuceneUtils.toMap(checkIter.next());
				while( origFields!=null && checkFields!=null ) {
					Comparable origFld = (Comparable)origFields.get(sortFieldName);
					Comparable checkFld = (Comparable)checkFields.get(sortFieldName);
					int cmp = origFld.compareTo(checkFld);
					if( cmp==0 ) {
						if( !origFields.equals(checkFields) ) {
							logger.error(sortFieldName+" "+origFld+" not equal");
							logger.error("lucene = "+origFields);
							logger.error("logs = "+checkFields);
							ok = false;
						}
						origFields = LuceneUtils.toMap(origIter.next());
						checkFields = LuceneUtils.toMap(checkIter.next());
					} else if( cmp < 0 ) {
						logger.error(sortFieldName+" "+origFld+" found in lucene but not logs");
						ok = false;
						origFields = LuceneUtils.toMap(origIter.next());
					} else {  // >
						logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene");
						ok = false;
						checkFields = LuceneUtils.toMap(checkIter.next());
					}
				}
				while( origFields!=null ) {
					Comparable origFld = (Comparable)origFields.get(sortFieldName);
					logger.error(sortFieldName+" "+origFld+" found in lucene but not logs");
					ok = false;
					origFields = LuceneUtils.toMap(origIter.next());
				}
				while( checkFields!=null ) {
					Comparable checkFld = (Comparable)checkFields.get(sortFieldName);
					logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene");
					ok = false;
					checkFields = LuceneUtils.toMap(checkIter.next());
				}
				//logger.info("check done");
			}
			checkReader.close();
			checkWriter.close();
			IoUtils.deleteRecursively(dirFile);
			//logger.info("check done");
		} finally {
			indexReader.close();
		}
		return ok;
	}

	private static abstract class HashCollector extends GoodCollector {
		int total = 0;
	}

	private static int hash(IndexReader reader) throws IOException {
		final IndexSearcher searcher = new IndexSearcher(reader);
		Query query = new MatchAllDocsQuery();
		HashCollector col = new HashCollector() {
			public void collectDoc(int iDoc) throws IOException {
				Document doc = searcher.doc(iDoc);
				Map<String,Object> storedFields = LuceneUtils.toMap(doc);
				total += storedFields.hashCode();
			}
		};
		searcher.search(query,col);
		return col.total;
	}

	public synchronized void close() throws IOException {
		indexWriter.close();
		log.commit();
		log.close();
	}

	public synchronized void commit() throws IOException {
		indexWriter.commit();
		log.commit();
		if( mergeLock.isLocked() )
			return;
		if( logs[1].file.lastModified() < System.currentTimeMillis() - logTime ) {
			getMergeLock();
			new Thread(mergeLogs).start();
//			mergeLogs.run();
		}
	}

	public synchronized void rollback() throws IOException {
		indexWriter.rollback();
		log.rollback();
	}

	public synchronized void deleteAll() throws IOException {
		indexWriter.deleteAll();
		writeOp(OP_DELETE_ALL);
	}

	public synchronized void deleteDocuments(Query query) throws IOException {
		indexWriter.deleteDocuments(query);
		writeOp(OP_DELETE_DOCUMENTS);
		log.writeQuery(query);
	}

	public synchronized void addDocument(Map<String,Object> storedFields) throws IOException {
		indexWriter.addDocument(storedFields);
		writeOp(OP_ADD_DOCUMENT);
		log.writeMap(storedFields);
	}

	public synchronized void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException {
		indexWriter.updateDocument(keyFieldName,storedFields);
		writeOp(OP_UPDATE_DOCUMENT);
		log.writeUTF(keyFieldName);
		log.writeMap(storedFields);
	}

	public synchronized void tag(String tag) throws IOException {
		writeOp(OP_TAG);
		log.writeUTF(tag);
	}

	public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException {
		indexWriter.reindexDocuments(keyFieldName,query);
	}

	private void writeOp(int op) throws IOException {
		log.writeLong(System.currentTimeMillis());
		log.writeByte(op);
	}

	public synchronized void playLogs(OpDoer opDoer) throws IOException {
		if( opDoer == null )
			opDoer = new BasicOpDoer(indexWriter);
		playLogs( logReaders(logs), opDoer );
	}

	private static LogInputStream[] logReaders(LogFile[] logs) throws IOException {
		LogInputStream[] logReaders = new LogInputStream[logs.length];
		for( int i=0; i<logs.length; i++ ) {
			logReaders[i] = logs[i].input();
		}
		return logReaders;
	}

	private static void playLogs(LogInputStream[] logReaders,OpDoer opDoer)
		throws IOException
	{
		if( numDocs(opDoer.writer()) != 0 )
			throw new RuntimeException ("not empty");
		for( LogInputStream reader : logReaders ) {
			playLog(reader,opDoer);
		}
		opDoer.commit();
	}

	private static int numDocs(GoodIndexWriter indexWriter) throws IOException {
		IndexReader reader = indexWriter.openReader();
		int n = reader.numDocs();
		reader.close();
		return n;
	}

	private static void playLog(LogInputStream in,OpDoer opDoer)
		throws IOException
	{
		while( in.available() > 0 ) {
			playOp(in,opDoer);
		}
		in.close();
	}

	private static void playOp(LogInputStream in,OpDoer opDoer)
		throws IOException
	{
		long time  = in.readLong();  // time
		int op = in.readByte();
		switch(op) {
		case OP_DELETE_ALL:
			opDoer.deleteAll(time);
			return;
		case OP_DELETE_DOCUMENTS:
			{
				Query query = in.readQuery();
				//System.out.println("OP_DELETE_DOCUMENTS "+query);
				opDoer.deleteDocuments(time,query);
				return;
			}
		case OP_ADD_DOCUMENT:
			{
				Map storedFields = in.readMap();
				opDoer.addDocument(time,storedFields);
				return;
			}
		case OP_UPDATE_DOCUMENT:
			{
				String keyFieldName = in.readUTF();
				Map storedFields = in.readMap();
				opDoer.updateDocument(time,keyFieldName,storedFields);
				return;
			}
		case OP_TAG:
			{
				String tag = in.readUTF();
				opDoer.tag(time,tag);
				return;
			}
		default:
			throw new RuntimeException("invalid op "+op);
		}
	}

	private static void dump(LuceneIndexWriter indexWriter) throws IOException {
		IndexReader reader = indexWriter.openReader();
		IndexSearcher searcher = new IndexSearcher(reader);
		Query query = new MatchAllDocsQuery();
		TopDocs td = searcher.search(query,100);
		System.out.println("totalHits = "+td.totalHits);
		for( int i=0; i<td.scoreDocs.length; i++ ) {
			Document doc = searcher.doc(td.scoreDocs[i].doc);
			System.out.println(LuceneUtils.toMap(doc));
		}
		System.out.println();
		reader.close();
	}
 
}