diff src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1548:736ec76bbf42

lucene log work
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 27 Sep 2020 22:07:18 -0600
parents 35601f15ecc3
children 41c32da4cbd1
line wrap: on
line diff
--- a/src/goodjava/lucene/logging/LoggingIndexWriter.java	Thu Sep 24 15:33:56 2020 -0600
+++ b/src/goodjava/lucene/logging/LoggingIndexWriter.java	Sun Sep 27 22:07:18 2020 -0600
@@ -10,8 +10,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.HashSet;
-import java.util.List;
-import java.util.ArrayList;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.apache.lucene.document.Document;
@@ -29,6 +27,7 @@
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import goodjava.io.IoUtils;
+import goodjava.lucene.api.GoodWriter;
 import goodjava.lucene.api.GoodIndexWriter;
 import goodjava.lucene.api.LuceneIndexWriter;
 import goodjava.lucene.api.GoodCollector;
@@ -39,7 +38,7 @@
 
 public class LoggingIndexWriter implements GoodIndexWriter {
 	private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class);
-	private static final int version = 1;
+	private static final int version = 2;
 	private static final int OP_DELETE_ALL = 1;
 	private static final int OP_DELETE_DOCUMENTS = 2;
 	private static final int OP_ADD_DOCUMENT = 3;
@@ -50,14 +49,18 @@
 	public final LuceneIndexWriter indexWriter;
 	public boolean wasCreated;
 	private final File logDir;
-	protected final List<LogFile> logs = new ArrayList<LogFile>();
+	private final long logTime;
+	protected final LogFile[] logs = new LogFile[3];
 	private LogOutputStream log;
 	private final File index;
 	private final SemaphoreLock mergeLock = new SemaphoreLock();
 
-	public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException {
+	public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir,long logTime)
+		throws IOException
+	{
 		this.indexWriter = indexWriter;
 		this.logDir = logDir;
+		this.logTime = logTime;
 		IoUtils.mkdirs(logDir);
 		if( !logDir.isDirectory() )
 			throw new RuntimeException();
@@ -66,10 +69,9 @@
 			DataInputStream dis = new DataInputStream(new FileInputStream(index));
 			try {
 				if( dis.readInt() == version ) {
-					final int n = dis.readInt();
-					for( int i=0; i<n; i++ ) {
+					for( int i=0; i<logs.length; i++ ) {
 						File file = new File( logDir, dis.readUTF() );
-						logs.add( new LogFile(file) );
+						logs[i] = new LogFile(file);
 					}
 					deleteUnusedFiles();
 					setLog();
@@ -84,6 +86,10 @@
 		wasCreated = true;
 	}
 
+	public IndexReader openReader() throws IOException {
+		return indexWriter.openReader();
+	}
+
 	public IndexWriter getLuceneIndexWriter() {
 		return indexWriter.getLuceneIndexWriter();
 	}
@@ -91,7 +97,7 @@
 	private void setLog() throws IOException {
 		if( log != null )
 			log.close();
-		log = logs.get(logs.size()-1).output();
+		log = logs[2].output();
 	}
 /*
 	public synchronized boolean isMerging() {
@@ -118,18 +124,29 @@
 
 	private void newLogs2() throws IOException {
 		logger.info("building new logs");
-		logs.clear();
-		for( int i=0; i<2; i++ ) {
-			logs.add( newLogFile() );
+		for( int i=0; i<logs.length; i++ ) {
+			logs[i] = newLogFile();
 		}
-		logLucene( System.currentTimeMillis(), logs.get(0), indexWriter );
+		LogOutputStream log = logs[0].output();
+		logLucene( System.currentTimeMillis(), log, indexWriter );
+		log.close();
 		writeIndex();
 		setLog();
 		logger.info("done building new logs");
 	}
 
-	private static void logLucene(long time,LogFile logLucene,LuceneIndexWriter indexWriter) throws IOException {
-		LogOutputStream log = logLucene.output();
+	public synchronized void logLucene()
+		throws IOException
+	{
+		//log.rollback();  ?
+		logLucene( System.currentTimeMillis(), log, indexWriter );
+	}
+
+	private static void logLucene(long time,LogOutputStream log,LuceneIndexWriter indexWriter)
+		throws IOException
+	{
+		log.writeLong(time);
+		log.writeByte(OP_DELETE_ALL);
 		IndexReader reader = indexWriter.openReader();
 		final IndexSearcher searcher = new IndexSearcher(reader);
 		Query query = new MatchAllDocsQuery();
@@ -144,7 +161,6 @@
 		});
 		reader.close();
 		log.commit();
-		log.close();
 	}
 
 	private LogFile newLogFile() throws IOException {
@@ -159,7 +175,7 @@
 		deleteUnusedFiles(logs,index);
 	}
 
-	private static void deleteUnusedFiles(List<LogFile> logs,File index) throws IOException {
+	private static void deleteUnusedFiles(LogFile[] logs,File index) throws IOException {
 		Set<String> used = new HashSet<String>();
 		used.add( index.getName() );
 		for( LogFile lf : logs ) {
@@ -176,11 +192,12 @@
 		writeIndex(logs,index);
 	}
 
-	public static void writeIndex(List<LogFile> logs,File index) throws IOException {
+	public static void writeIndex(LogFile[] logs,File index) throws IOException {
+		if( logs.length != 3 )
+			throw new RuntimeException();
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		DataOutputStream dos = new DataOutputStream(baos);
 		dos.writeInt(version);
-		dos.writeInt(logs.size());
 		for( LogFile lf : logs ) {
 			String fileName = lf.file.getName();
 			dos.writeUTF(fileName);
@@ -194,28 +211,34 @@
 	}
 
 	private void mergeLogs() throws IOException {
-		//logger.info("merge");
-		if( logs.size() <= 3 )
+		logger.info("merge");
+		if( !mergeLock.isLocked() ) {
+			logger.error("merge without lock");
 			return;
-		LogFile first = logs.get(0);
-		LogFile second = logs.get(1);
+		}
+		LogFile first = logs[0];
+		LogFile second = logs[1];
 		long lastTime = second.file.lastModified();
 		File dirFile = new File(logDir,"merge");
 		if( dirFile.exists() )
 			throw new RuntimeException();
 		Directory dir = FSDirectory.open(dirFile);
 		LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
-		playLog( first.input(), mergeWriter, null );
-		playLog( second.input(), mergeWriter, null );
+		playLog( first.input(), mergeWriter );
+		playLog( second.input(), mergeWriter );
 		mergeWriter.commit();
 		LogFile merge = newLogFile();
-		logLucene( lastTime, merge, mergeWriter );
+		LogOutputStream log = merge.output();
+		logLucene( lastTime, log, mergeWriter );
+		log.close();
 		mergeWriter.close();
 		synchronized(this) {
 			//check();
-			logs.remove(0);
-			logs.set(0,merge);
+			logs[0] = merge;
+			logs[1] = logs[2];
+			logs[2] = newLogFile();
 			writeIndex();
+			setLog();
 			//check(null);
 		}
 	}
@@ -265,7 +288,7 @@
 	protected boolean doCheck(SortField sortField) throws IOException {
 		boolean ok = true;
 		IndexReader indexReader;
-		List<LogInputStream> logReaders;
+		LogInputStream[] logReaders;
 		synchronized(this) {
 			indexReader = indexWriter.openReader();
 			logReaders = logReaders(logs);
@@ -277,7 +300,7 @@
 			IoUtils.deleteRecursively(dirFile);
 			Directory dir = FSDirectory.open(dirFile);
 			LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
-			playLogs(logReaders,checkWriter,null);
+			playLogs(logReaders,checkWriter);
 			//logger.info("check lucene");
 			IndexReader checkReader = checkWriter.openReader();
 			int nCheck = checkReader.numDocs();
@@ -375,12 +398,7 @@
 		log.commit();
 		if( mergeLock.isLocked() )
 			return;
-		if( log.logFile.end() > logs.get(0).end() ) {
-			logs.add( newLogFile() );
-			writeIndex();
-			setLog();
-		}
-		if( logs.size() > 3 ) {
+		if( logs[1].file.lastModified() < System.currentTimeMillis() - logTime ) {
 			getMergeLock();
 			new Thread(mergeLogs).start();
 //			mergeLogs.run();
@@ -431,86 +449,81 @@
 	}
 
 	// return whether stopped at tag
-	public synchronized boolean playLogs(String upToTag) throws IOException {
-		return playLogs( logReaders(logs), indexWriter, upToTag );
+	public synchronized void playLogs(GoodWriter writer) throws IOException {
+		if( writer == null )
+			writer = indexWriter;
+		playLogs( logReaders(logs), writer );
 	}
 
-	private static List<LogInputStream> logReaders(List<LogFile> logs) throws IOException {
-		List<LogInputStream> logReaders = new ArrayList<LogInputStream>();
-		for( LogFile log : logs ) {
-			logReaders.add( log.input() );
+	private static LogInputStream[] logReaders(LogFile[] logs) throws IOException {
+		LogInputStream[] logReaders = new LogInputStream[logs.length];
+		for( int i=0; i<logs.length; i++ ) {
+			logReaders[i] = logs[i].input();
 		}
 		return logReaders;
 	}
 
-	private static boolean playLogs(List<LogInputStream> logReaders,LuceneIndexWriter indexWriter,String upToTag)
+	private static void playLogs(LogInputStream[] logReaders,GoodWriter indexWriter)
 		throws IOException
 	{
 		if( numDocs(indexWriter) != 0 )
 			throw new RuntimeException ("not empty");
-		boolean rtn = false;
 		for( LogInputStream reader : logReaders ) {
-			if( playLog(reader,indexWriter,upToTag) ) {
-				rtn = true;
-				break;
-			}
+			playLog(reader,indexWriter);
 		}
 		indexWriter.commit();
-		return rtn;
 	}
 
-	private static int numDocs(LuceneIndexWriter indexWriter) throws IOException {
+	private static int numDocs(GoodWriter indexWriter) throws IOException {
 		IndexReader reader = indexWriter.openReader();
 		int n = reader.numDocs();
 		reader.close();
 		return n;
 	}
 
-	private static boolean playLog(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag)
+	private static void playLog(LogInputStream in,GoodWriter indexWriter)
 		throws IOException
 	{
-		boolean rtn = false;
 		while( in.available() > 0 ) {
-			if( playOp(in,indexWriter,upToTag) ) {
-				rtn = true;
-				break;
-			}
+			playOp(in,indexWriter);
 		}
 		in.close();
-		return rtn;
 	}
 
-	private static boolean playOp(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) throws IOException {
+	private static void playOp(LogInputStream in,GoodWriter indexWriter)
+		throws IOException
+	{
 		in.readLong();  // time
 		int op = in.readByte();
 		switch(op) {
 		case OP_DELETE_ALL:
 			indexWriter.deleteAll();
-			return false;
+			return;
 		case OP_DELETE_DOCUMENTS:
 			{
 				Query query = in.readQuery();
 				//System.out.println("OP_DELETE_DOCUMENTS "+query);
 				indexWriter.deleteDocuments(query);
-				return false;
+				return;
 			}
 		case OP_ADD_DOCUMENT:
 			{
 				Map storedFields = in.readMap();
 				indexWriter.addDocument(storedFields);
-				return false;
+				return;
 			}
 		case OP_UPDATE_DOCUMENT:
 			{
 				String keyFieldName = in.readUTF();
 				Map storedFields = in.readMap();
 				indexWriter.updateDocument(keyFieldName,storedFields);
-				return false;
+				return;
 			}
 		case OP_TAG:
 			{
 				String tag = in.readUTF();
-				return tag.equals(upToTag);
+				indexWriter.tag(tag);
+				return;
 			}
 		default:
 			throw new RuntimeException("invalid op "+op);