changeset 1548:736ec76bbf42

lucene log work
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 27 Sep 2020 22:07:18 -0600
parents f24a9ba7551e
children 41c32da4cbd1
files src/goodjava/io/IoUtils.java src/goodjava/lucene/api/GoodIndexWriter.java src/goodjava/lucene/api/GoodWriter.java src/goodjava/lucene/backup/Backup.java src/goodjava/lucene/backup/BackupIndexWriter.java src/goodjava/lucene/logging/FilterGoodWriter.java src/goodjava/lucene/logging/LoggingIndexWriter.java src/luan/modules/lucene/Lucene.luan src/luan/modules/lucene/LuceneIndex.java
diffstat 9 files changed, 169 insertions(+), 82 deletions(-) [+]
line wrap: on
line diff
--- a/src/goodjava/io/IoUtils.java	Thu Sep 24 15:33:56 2020 -0600
+++ b/src/goodjava/io/IoUtils.java	Sun Sep 27 22:07:18 2020 -0600
@@ -9,6 +9,7 @@
 import java.io.StringWriter;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.attribute.FileTime;
 import java.security.Security;
 import javax.net.ssl.SSLSocketFactory;
 import javax.net.ssl.SSLServerSocketFactory;
@@ -54,6 +55,10 @@
 		Files.createSymbolicLink( link.toPath(), existing.toPath() );
 	}
 
+	public static long getCreationTime(File f) throws IOException {
+		return ((FileTime)Files.getAttribute(f.toPath(),"creationTime")).toMillis();
+	}
+
 	public static void copyAll(InputStream in,OutputStream out)
 		throws IOException
 	{
--- a/src/goodjava/lucene/api/GoodIndexWriter.java	Thu Sep 24 15:33:56 2020 -0600
+++ b/src/goodjava/lucene/api/GoodIndexWriter.java	Sun Sep 27 22:07:18 2020 -0600
@@ -1,20 +1,13 @@
 package goodjava.lucene.api;
 
 import java.io.IOException;
-import java.util.Map;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.search.Query;
 
 
-public interface GoodIndexWriter {
+public interface GoodIndexWriter extends GoodWriter {
 	public void close() throws IOException;
-	public void commit() throws IOException;
 	public void rollback() throws IOException;
-	public void deleteAll() throws IOException;
-	public void deleteDocuments(Query query) throws IOException;
-	public void addDocument(Map<String,Object> storedFields) throws IOException;
-	public void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException;
 	public void reindexDocuments(String keyFieldName,Query query) throws IOException;
-	public void tag(String tag) throws IOException;
 	public IndexWriter getLuceneIndexWriter();
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/goodjava/lucene/api/GoodWriter.java	Sun Sep 27 22:07:18 2020 -0600
@@ -0,0 +1,17 @@
+package goodjava.lucene.api;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.Query;
+
+
+public interface GoodWriter {
+	public IndexReader openReader() throws IOException;
+	public void commit() throws IOException;
+	public void deleteAll() throws IOException;
+	public void deleteDocuments(Query query) throws IOException;
+	public void addDocument(Map<String,Object> storedFields) throws IOException;
+	public void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException;
+	public void tag(String tag) throws IOException;
+}
--- a/src/goodjava/lucene/backup/Backup.java	Thu Sep 24 15:33:56 2020 -0600
+++ b/src/goodjava/lucene/backup/Backup.java	Sun Sep 27 22:07:18 2020 -0600
@@ -105,16 +105,16 @@
 		}
 		if( call.cmd.equals("add") ) {
 			boolean complete = true;
-			List<LogFile> logs = new ArrayList<LogFile>();
-			for( Object obj : logInfo ) {
-				Map fileInfo = (Map)obj;
+			final LogFile[] logs = new LogFile[logInfo.size()];
+			for( int i=0; i<logs.length; i++ ) {
+				Map fileInfo = (Map)logInfo.get(i);
 				String name = (String)fileInfo.get("name");
 				File f = new File(dir,name);
 				if( !f.exists() ) {
 					complete = false;
 					break;
 				}
-				logs.add( new LogFile(f) );
+				logs[i] = new LogFile(f);
 			}
 			if( complete ) {
 				LoggingIndexWriter.writeIndex(logs,index);
--- a/src/goodjava/lucene/backup/BackupIndexWriter.java	Thu Sep 24 15:33:56 2020 -0600
+++ b/src/goodjava/lucene/backup/BackupIndexWriter.java	Sun Sep 27 22:07:18 2020 -0600
@@ -33,8 +33,10 @@
 	private boolean isSyncPending = false;
 	private final ExecutorService exec = Executors.newSingleThreadExecutor();
 
-	public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,String name,String password) throws IOException {
-		super(indexWriter,logDir);
+	public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,long logTime,String name,String password)
+		throws IOException
+	{
+		super(indexWriter,logDir,logTime);
 		if( backupDomains == null )
 			throw new RuntimeException("must set backupDomains");
 		this.name = name;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/goodjava/lucene/logging/FilterGoodWriter.java	Sun Sep 27 22:07:18 2020 -0600
@@ -0,0 +1,54 @@
+package goodjava.lucene.logging;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.index.IndexReader;
+import goodjava.lucene.api.GoodWriter;
+
+
+public class FilterGoodWriter implements GoodWriter {
+	protected final GoodWriter writer;
+	protected boolean isActive = true;
+
+	protected FilterGoodWriter(GoodWriter writer) {
+		this.writer = writer;
+	}
+
+	public IndexReader openReader() throws IOException {
+		return writer.openReader();
+	}
+
+	public void commit() throws IOException {
+		if( !isActive )
+			return;
+		writer.commit();
+	}
+
+	public void deleteAll() throws IOException {
+		if( !isActive )
+			return;
+		writer.deleteAll();
+	}
+
+	public void deleteDocuments(Query query) throws IOException {
+		if( !isActive )
+			return;
+		writer.deleteDocuments(query);
+	}
+
+	public void addDocument(Map<String,Object> storedFields) throws IOException {
+		if( !isActive )
+			return;
+		writer.addDocument(storedFields);
+	}
+
+	public void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException {
+		if( !isActive )
+			return;
+		writer.updateDocument(keyFieldName,storedFields);
+	}
+
+	public void tag(String tag) throws IOException {}
+
+}
--- a/src/goodjava/lucene/logging/LoggingIndexWriter.java	Thu Sep 24 15:33:56 2020 -0600
+++ b/src/goodjava/lucene/logging/LoggingIndexWriter.java	Sun Sep 27 22:07:18 2020 -0600
@@ -10,8 +10,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.HashSet;
-import java.util.List;
-import java.util.ArrayList;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.apache.lucene.document.Document;
@@ -29,6 +27,7 @@
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import goodjava.io.IoUtils;
+import goodjava.lucene.api.GoodWriter;
 import goodjava.lucene.api.GoodIndexWriter;
 import goodjava.lucene.api.LuceneIndexWriter;
 import goodjava.lucene.api.GoodCollector;
@@ -39,7 +38,7 @@
 
 public class LoggingIndexWriter implements GoodIndexWriter {
 	private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class);
-	private static final int version = 1;
+	private static final int version = 2;
 	private static final int OP_DELETE_ALL = 1;
 	private static final int OP_DELETE_DOCUMENTS = 2;
 	private static final int OP_ADD_DOCUMENT = 3;
@@ -50,14 +49,18 @@
 	public final LuceneIndexWriter indexWriter;
 	public boolean wasCreated;
 	private final File logDir;
-	protected final List<LogFile> logs = new ArrayList<LogFile>();
+	private final long logTime;
+	protected final LogFile[] logs = new LogFile[3];
 	private LogOutputStream log;
 	private final File index;
 	private final SemaphoreLock mergeLock = new SemaphoreLock();
 
-	public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException {
+	public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir,long logTime)
+		throws IOException
+	{
 		this.indexWriter = indexWriter;
 		this.logDir = logDir;
+		this.logTime = logTime;
 		IoUtils.mkdirs(logDir);
 		if( !logDir.isDirectory() )
 			throw new RuntimeException();
@@ -66,10 +69,9 @@
 			DataInputStream dis = new DataInputStream(new FileInputStream(index));
 			try {
 				if( dis.readInt() == version ) {
-					final int n = dis.readInt();
-					for( int i=0; i<n; i++ ) {
+					for( int i=0; i<logs.length; i++ ) {
 						File file = new File( logDir, dis.readUTF() );
-						logs.add( new LogFile(file) );
+						logs[i] = new LogFile(file);
 					}
 					deleteUnusedFiles();
 					setLog();
@@ -84,6 +86,10 @@
 		wasCreated = true;
 	}
 
+	public IndexReader openReader() throws IOException {
+		return indexWriter.openReader();
+	}
+
 	public IndexWriter getLuceneIndexWriter() {
 		return indexWriter.getLuceneIndexWriter();
 	}
@@ -91,7 +97,7 @@
 	private void setLog() throws IOException {
 		if( log != null )
 			log.close();
-		log = logs.get(logs.size()-1).output();
+		log = logs[2].output();
 	}
 /*
 	public synchronized boolean isMerging() {
@@ -118,18 +124,29 @@
 
 	private void newLogs2() throws IOException {
 		logger.info("building new logs");
-		logs.clear();
-		for( int i=0; i<2; i++ ) {
-			logs.add( newLogFile() );
+		for( int i=0; i<logs.length; i++ ) {
+			logs[i] = newLogFile();
 		}
-		logLucene( System.currentTimeMillis(), logs.get(0), indexWriter );
+		LogOutputStream log = logs[0].output();
+		logLucene( System.currentTimeMillis(), log, indexWriter );
+		log.close();
 		writeIndex();
 		setLog();
 		logger.info("done building new logs");
 	}
 
-	private static void logLucene(long time,LogFile logLucene,LuceneIndexWriter indexWriter) throws IOException {
-		LogOutputStream log = logLucene.output();
+	public synchronized void logLucene()
+		throws IOException
+	{
+		//log.rollback();  ?
+		logLucene( System.currentTimeMillis(), log, indexWriter );
+	}
+
+	private static void logLucene(long time,LogOutputStream log,LuceneIndexWriter indexWriter)
+		throws IOException
+	{
+		log.writeLong(time);
+		log.writeByte(OP_DELETE_ALL);
 		IndexReader reader = indexWriter.openReader();
 		final IndexSearcher searcher = new IndexSearcher(reader);
 		Query query = new MatchAllDocsQuery();
@@ -144,7 +161,6 @@
 		});
 		reader.close();
 		log.commit();
-		log.close();
 	}
 
 	private LogFile newLogFile() throws IOException {
@@ -159,7 +175,7 @@
 		deleteUnusedFiles(logs,index);
 	}
 
-	private static void deleteUnusedFiles(List<LogFile> logs,File index) throws IOException {
+	private static void deleteUnusedFiles(LogFile[] logs,File index) throws IOException {
 		Set<String> used = new HashSet<String>();
 		used.add( index.getName() );
 		for( LogFile lf : logs ) {
@@ -176,11 +192,12 @@
 		writeIndex(logs,index);
 	}
 
-	public static void writeIndex(List<LogFile> logs,File index) throws IOException {
+	public static void writeIndex(LogFile[] logs,File index) throws IOException {
+		if( logs.length != 3 )
+			throw new RuntimeException();
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		DataOutputStream dos = new DataOutputStream(baos);
 		dos.writeInt(version);
-		dos.writeInt(logs.size());
 		for( LogFile lf : logs ) {
 			String fileName = lf.file.getName();
 			dos.writeUTF(fileName);
@@ -194,28 +211,34 @@
 	}
 
 	private void mergeLogs() throws IOException {
-		//logger.info("merge");
-		if( logs.size() <= 3 )
+		logger.info("merge");
+		if( !mergeLock.isLocked() ) {
+			logger.error("merge without lock");
 			return;
-		LogFile first = logs.get(0);
-		LogFile second = logs.get(1);
+		}
+		LogFile first = logs[0];
+		LogFile second = logs[1];
 		long lastTime = second.file.lastModified();
 		File dirFile = new File(logDir,"merge");
 		if( dirFile.exists() )
 			throw new RuntimeException();
 		Directory dir = FSDirectory.open(dirFile);
 		LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
-		playLog( first.input(), mergeWriter, null );
-		playLog( second.input(), mergeWriter, null );
+		playLog( first.input(), mergeWriter );
+		playLog( second.input(), mergeWriter );
 		mergeWriter.commit();
 		LogFile merge = newLogFile();
-		logLucene( lastTime, merge, mergeWriter );
+		LogOutputStream log = merge.output();
+		logLucene( lastTime, log, mergeWriter );
+		log.close();
 		mergeWriter.close();
 		synchronized(this) {
 			//check();
-			logs.remove(0);
-			logs.set(0,merge);
+			logs[0] = merge;
+			logs[1] = logs[2];
+			logs[2] = newLogFile();
 			writeIndex();
+			setLog();
 			//check(null);
 		}
 	}
@@ -265,7 +288,7 @@
 	protected boolean doCheck(SortField sortField) throws IOException {
 		boolean ok = true;
 		IndexReader indexReader;
-		List<LogInputStream> logReaders;
+		LogInputStream[] logReaders;
 		synchronized(this) {
 			indexReader = indexWriter.openReader();
 			logReaders = logReaders(logs);
@@ -277,7 +300,7 @@
 			IoUtils.deleteRecursively(dirFile);
 			Directory dir = FSDirectory.open(dirFile);
 			LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
-			playLogs(logReaders,checkWriter,null);
+			playLogs(logReaders,checkWriter);
 			//logger.info("check lucene");
 			IndexReader checkReader = checkWriter.openReader();
 			int nCheck = checkReader.numDocs();
@@ -375,12 +398,7 @@
 		log.commit();
 		if( mergeLock.isLocked() )
 			return;
-		if( log.logFile.end() > logs.get(0).end() ) {
-			logs.add( newLogFile() );
-			writeIndex();
-			setLog();
-		}
-		if( logs.size() > 3 ) {
+		if( logs[1].file.lastModified() < System.currentTimeMillis() - logTime ) {
 			getMergeLock();
 			new Thread(mergeLogs).start();
 //			mergeLogs.run();
@@ -431,86 +449,81 @@
 	}
 
 	// return whether stopped at tag
-	public synchronized boolean playLogs(String upToTag) throws IOException {
-		return playLogs( logReaders(logs), indexWriter, upToTag );
+	public synchronized void playLogs(GoodWriter writer) throws IOException {
+		if( writer == null )
+			writer = indexWriter;
+		playLogs( logReaders(logs), writer );
 	}
 
-	private static List<LogInputStream> logReaders(List<LogFile> logs) throws IOException {
-		List<LogInputStream> logReaders = new ArrayList<LogInputStream>();
-		for( LogFile log : logs ) {
-			logReaders.add( log.input() );
+	private static LogInputStream[] logReaders(LogFile[] logs) throws IOException {
+		LogInputStream[] logReaders = new LogInputStream[logs.length];
+		for( int i=0; i<logs.length; i++ ) {
+			logReaders[i] = logs[i].input();
 		}
 		return logReaders;
 	}
 
-	private static boolean playLogs(List<LogInputStream> logReaders,LuceneIndexWriter indexWriter,String upToTag)
+	private static void playLogs(LogInputStream[] logReaders,GoodWriter indexWriter)
 		throws IOException
 	{
 		if( numDocs(indexWriter) != 0 )
 			throw new RuntimeException ("not empty");
-		boolean rtn = false;
 		for( LogInputStream reader : logReaders ) {
-			if( playLog(reader,indexWriter,upToTag) ) {
-				rtn = true;
-				break;
-			}
+			playLog(reader,indexWriter);
 		}
 		indexWriter.commit();
-		return rtn;
 	}
 
-	private static int numDocs(LuceneIndexWriter indexWriter) throws IOException {
+	private static int numDocs(GoodWriter indexWriter) throws IOException {
 		IndexReader reader = indexWriter.openReader();
 		int n = reader.numDocs();
 		reader.close();
 		return n;
 	}
 
-	private static boolean playLog(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag)
+	private static void playLog(LogInputStream in,GoodWriter indexWriter)
 		throws IOException
 	{
-		boolean rtn = false;
 		while( in.available() > 0 ) {
-			if( playOp(in,indexWriter,upToTag) ) {
-				rtn = true;
-				break;
-			}
+			playOp(in,indexWriter);
 		}
 		in.close();
-		return rtn;
 	}
 
-	private static boolean playOp(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) throws IOException {
+	private static void playOp(LogInputStream in,GoodWriter indexWriter)
+		throws IOException
+	{
 		in.readLong();  // time
 		int op = in.readByte();
 		switch(op) {
 		case OP_DELETE_ALL:
 			indexWriter.deleteAll();
-			return false;
+			return;
 		case OP_DELETE_DOCUMENTS:
 			{
 				Query query = in.readQuery();
 				//System.out.println("OP_DELETE_DOCUMENTS "+query);
 				indexWriter.deleteDocuments(query);
-				return false;
+				return;
 			}
 		case OP_ADD_DOCUMENT:
 			{
 				Map storedFields = in.readMap();
 				indexWriter.addDocument(storedFields);
-				return false;
+				return;
 			}
 		case OP_UPDATE_DOCUMENT:
 			{
 				String keyFieldName = in.readUTF();
 				Map storedFields = in.readMap();
 				indexWriter.updateDocument(keyFieldName,storedFields);
-				return false;
+				return;
 			}
 		case OP_TAG:
 			{
 				String tag = in.readUTF();
-				return tag.equals(upToTag);
+				indexWriter.tag(tag);
+				return;
 			}
 		default:
 			throw new RuntimeException("invalid op "+op);
--- a/src/luan/modules/lucene/Lucene.luan	Thu Sep 24 15:33:56 2020 -0600
+++ b/src/luan/modules/lucene/Lucene.luan	Sun Sep 27 22:07:18 2020 -0600
@@ -9,6 +9,7 @@
 local Html = require "luan:Html.luan"
 local Number = require "luan:Number.luan"
 local integer = Number.integer or error()
+local Time = require "luan:Time.luan"
 local Io = require "luan:Io.luan"
 local uri = Io.uri or error()
 local String = require "luan:String.luan"
@@ -46,6 +47,7 @@
 	index_dir = get_file(index_dir)
 	options = options or {}
 	options.log_dir = options.log_dir and get_file(options.log_dir)
+	options.log_time = options.log_time or Time.period{days=30}
 	local java_index = LuceneIndex.getLuceneIndex(index_dir,options)
 	index.java = java_index
 
--- a/src/luan/modules/lucene/LuceneIndex.java	Thu Sep 24 15:33:56 2020 -0600
+++ b/src/luan/modules/lucene/LuceneIndex.java	Sun Sep 27 22:07:18 2020 -0600
@@ -144,6 +144,7 @@
 	private final PostgresBackup postgresBackup;
 	private boolean wasCreated;
 	private final File logDir;
+	private final long logTime;
 
 	private LuceneIndex(Luan luan,File indexDir,LuanTable options)
 		throws LuanException, IOException, ClassNotFoundException, SQLException
@@ -156,6 +157,7 @@
 		LuanTable postgresSpec = Utils.removeTable(options,"postgres_spec");
 		LuanFunction supplementer = Utils.removeFunction(options,"supplementer");
 		logDir = (File)options.remove("log_dir");
+		logTime = (Long)options.remove("log_time");
 		Utils.checkEmpty(options);
 
 		mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields);
@@ -191,7 +193,7 @@
 		boolean wasCreated = !fsDir.getDirectory().exists();
 		writer = new LuceneIndexWriter(fsDir,config);
 		if( logDir != null )
-			writer = new LoggingIndexWriter((LuceneIndexWriter)writer,logDir);
+			writer = new LoggingIndexWriter((LuceneIndexWriter)writer,logDir,logTime);
 		reader = DirectoryReader.open(fsDir);
 		searcher = new IndexSearcher(reader);
 		initId();
@@ -741,11 +743,11 @@
 		writeLock.lock();
 		boolean ok = false;
 		try {
-			IndexWriter iw = writer.getLuceneIndexWriter();
-			iw.deleteAll();
+			writer.tag("restore_from_postgres");
+			writer.deleteAll();
 			postgresBackup.restoreLucene(this);
 			ok = true;
-			iw.commit();
+			writer.commit();
 			wrote();
 			ensure_open();  // refresh searcher
 			initId();
@@ -791,7 +793,6 @@
 			iw.deleteAll();
 			loggingWriter.playLogs(null);
 			ok = true;
-			iw.commit();
 			wrote();
 			ensure_open();  // refresh searcher
 			initId();