changeset 1538:634f6765830e

use goodjava/lucene/logging
author Franklin Schmidt <fschmidt@gmail.com>
date Fri, 07 Aug 2020 21:42:16 -0600
parents f7649ad6e3e7
children c27dc6af87ca
files src/goodjava/lucene/backup/BackupIndexWriter.java src/goodjava/lucene/logging/LoggingIndexWriter.java src/goodjava/lucene/logging/SemaphoreLock.java src/luan/modules/lucene/Lucene.luan src/luan/modules/lucene/LuceneIndex.java
diffstat 5 files changed, 101 insertions(+), 27 deletions(-) [+]
line wrap: on
line diff
diff -r f7649ad6e3e7 -r 634f6765830e src/goodjava/lucene/backup/BackupIndexWriter.java
--- a/src/goodjava/lucene/backup/BackupIndexWriter.java	Fri Aug 07 13:38:25 2020 -0600
+++ b/src/goodjava/lucene/backup/BackupIndexWriter.java	Fri Aug 07 21:42:16 2020 -0600
@@ -58,9 +58,11 @@
 		}
 	}
 
-	protected void doCheck(SortField sortField) throws IOException {
-		super.doCheck(sortField);
-		runSyncWithChecksum();
+	protected boolean doCheck(SortField sortField) throws IOException {
+		boolean ok = super.doCheck(sortField);
+		if( ok )
+			runSyncWithChecksum();
+		return ok;
 	}
 
 	public void runSync() {
diff -r f7649ad6e3e7 -r 634f6765830e src/goodjava/lucene/logging/LoggingIndexWriter.java
--- a/src/goodjava/lucene/logging/LoggingIndexWriter.java	Fri Aug 07 13:38:25 2020 -0600
+++ b/src/goodjava/lucene/logging/LoggingIndexWriter.java	Fri Aug 07 21:42:16 2020 -0600
@@ -13,6 +13,7 @@
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
@@ -50,7 +51,7 @@
 	protected final List<LogFile> logs = new ArrayList<LogFile>();
 	private LogOutputStream log;
 	private final File index;
-	private boolean isMerging = false;
+	private final SemaphoreLock mergeLock = new SemaphoreLock();
 
 	public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException {
 		this.indexWriter = indexWriter;
@@ -92,18 +93,30 @@
 			log.close();
 		log = logs.get(logs.size()-1).output();
 	}
-
+/*
 	public synchronized boolean isMerging() {
-		return isMerging;
+		return mergeLock.isLocked();
 	}
-
-	private synchronized void isNotMerging() {
-		isMerging = false;
+*/
+	private void getMergeLock() {
+		try {
+			if( !mergeLock.tryLock(1,TimeUnit.MINUTES) )
+				throw new RuntimeException("failed to acquire lock");
+		} catch(InterruptedException e) {
+			throw new RuntimeException(e);
+		}
 	}
 
 	public synchronized void newLogs() throws IOException {
-		if( isMerging )
-			throw new RuntimeException("merging");
+		getMergeLock();
+		try {
+			newLogs2();
+		} finally {
+			mergeLock.unlock();
+		}
+	}
+
+	private void newLogs2() throws IOException {
 		logger.info("building new logs");
 		logs.clear();
 		for( int i=0; i<2; i++ ) {
@@ -182,6 +195,8 @@
 
 	private void mergeLogs() throws IOException {
 		//logger.info("merge");
+		if( logs.size() <= 3 )
+			return;
 		LogFile first = logs.get(0);
 		LogFile second = logs.get(1);
 		long lastTime = second.file.lastModified();
@@ -210,7 +225,7 @@
 		} catch(IOException e) {
 			throw new RuntimeException(e);
 		} finally {
-			isNotMerging();
+			mergeLock.unlock();
 		}
 	} };
 
@@ -236,18 +251,19 @@
 
 	private volatile boolean isChecking = false;
 
-	public void check(SortField sortField) throws IOException {
+	public boolean check(SortField sortField) throws IOException {
 		if( isChecking )
 			throw new RuntimeException("another check is running");
 		isChecking = true;
 		try {
-			doCheck(sortField);
+			return doCheck(sortField);
 		} finally {
 			isChecking = false;
 		}
 	}
 
-	protected void doCheck(SortField sortField) throws IOException {
+	protected boolean doCheck(SortField sortField) throws IOException {
+		boolean ok = true;
 		IndexReader indexReader;
 		List<LogInputStream> logReaders;
 		synchronized(this) {
@@ -255,24 +271,26 @@
 			logReaders = logReaders(logs);
 		}
 		try {
-			logger.info("check start");
+			//logger.info("check start");
 			indexWriter.check();
 			File dirFile = new File(logDir,"check");
 			IoUtils.deleteRecursively(dirFile);
 			Directory dir = FSDirectory.open(dirFile);
 			LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
 			playLogs(logReaders,checkWriter);
-			logger.info("check lucene");
+			//logger.info("check lucene");
 			IndexReader checkReader = checkWriter.openReader();
 			if( sortField == null ) {
 				int nCheck = checkReader.numDocs();
 				int nOrig = indexReader.numDocs();
 				if( nCheck != nOrig ) {
 					logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck);
+					ok = false;
 				}
-				logger.info("numDocs="+nOrig);
+				//logger.info("numDocs="+nOrig);
 				if( hash(indexReader) != hash(checkReader) ) {
 					logger.error("hash mismatch");
+					ok = false;
 				}
 			} else {
 				Sort sort = new Sort(sortField);
@@ -291,25 +309,30 @@
 							logger.error(sortFieldName+" "+origFld+" not equal");
 							logger.error("lucene = "+origFields);
 							logger.error("logs = "+checkFields);
+							ok = false;
 						}
 						origFields = LuceneUtils.toMap(origIter.next());
 						checkFields = LuceneUtils.toMap(checkIter.next());
 					} else if( cmp < 0 ) {
 						logger.error(sortFieldName+" "+origFld+" found in lucene but not logs");
+						ok = false;
 						origFields = LuceneUtils.toMap(origIter.next());
 					} else {  // >
 						logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene");
+						ok = false;
 						checkFields = LuceneUtils.toMap(checkIter.next());
 					}
 				}
 				while( origFields!=null ) {
 					Comparable origFld = (Comparable)origFields.get(sortFieldName);
 					logger.error(sortFieldName+" "+origFld+" found in lucene but not logs");
+					ok = false;
 					origFields = LuceneUtils.toMap(origIter.next());
 				}
 				while( checkFields!=null ) {
 					Comparable checkFld = (Comparable)checkFields.get(sortFieldName);
 					logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene");
+					ok = false;
 					checkFields = LuceneUtils.toMap(checkIter.next());
 				}
 				//logger.info("check done");
@@ -317,10 +340,11 @@
 			checkReader.close();
 			checkWriter.close();
 			IoUtils.deleteRecursively(dirFile);
-			logger.info("check done");
+			//logger.info("check done");
 		} finally {
 			indexReader.close();
 		}
+		return ok;
 	}
 
 	private static abstract class HashCollector extends GoodCollector {
@@ -350,7 +374,7 @@
 	public synchronized void commit() throws IOException {
 		indexWriter.commit();
 		log.commit();
-		if( isMerging )
+		if( mergeLock.isLocked() )
 			return;
 		if( log.logFile.end() > logs.get(0).end() ) {
 			logs.add( newLogFile() );
@@ -358,7 +382,7 @@
 			setLog();
 		}
 		if( logs.size() > 3 ) {
-			isMerging = true;
+			getMergeLock();
 			new Thread(mergeLogs).start();
 //			mergeLogs.run();
 		}
diff -r f7649ad6e3e7 -r 634f6765830e src/goodjava/lucene/logging/SemaphoreLock.java
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/goodjava/lucene/logging/SemaphoreLock.java	Fri Aug 07 21:42:16 2020 -0600
@@ -0,0 +1,21 @@
+package goodjava.lucene.logging;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+
+public final class SemaphoreLock {
+	private final Semaphore semaphore = new Semaphore(1);
+
+	public void unlock() {
+		semaphore.release();
+	}
+
+	public boolean isLocked() {
+		return semaphore.availablePermits() == 0;
+	}
+
+	public boolean tryLock(long time,TimeUnit unit) throws InterruptedException {
+		return semaphore.tryAcquire(time,unit);
+	}
+}
diff -r f7649ad6e3e7 -r 634f6765830e src/luan/modules/lucene/Lucene.luan
--- a/src/luan/modules/lucene/Lucene.luan	Fri Aug 07 13:38:25 2020 -0600
+++ b/src/luan/modules/lucene/Lucene.luan	Fri Aug 07 21:42:16 2020 -0600
@@ -34,13 +34,19 @@
 
 Lucene.quote = GoodQueryParser.quote
 
+local function get_file(f,name)
+	type(f)=="table" or error(name.." must be table")
+	f.to_uri_string and matches(f.to_uri_string(),"^file:") or error(name.." must be file")
+	return f.java.file or error()
+end
+
 function Lucene.index(index_dir,options)
-	type(index_dir)=="table" or error "index_dir must be table"
-	index_dir.to_uri_string and matches(index_dir.to_uri_string(),"^file:") or error "must be file"
-	options = options or {}
 	local index = {}
 	index.dir = index_dir
-	local java_index = LuceneIndex.getLuceneIndex(index_dir.java.file,options)
+	index_dir = get_file(index_dir)
+	options = options or {}
+	options.log_dir = options.log_dir and get_file(options.log_dir)
+	local java_index = LuceneIndex.getLuceneIndex(index_dir,options)
 	index.java = java_index
 
 	index.indexed_fields = {}
@@ -69,6 +75,8 @@
 	index.count_tokens = java_index.count_tokens
 	--index.close = java_index.close
 
+	index.rebuild_log = java_index.rebuild_log
+
 	index.has_postgres_backup = java_index.hasPostgresBackup()
 	index.rebuild_postgres_backup = java_index.rebuild_postgres_backup
 	index.restore_from_postgres = java_index.restore_from_postgres
diff -r f7649ad6e3e7 -r 634f6765830e src/luan/modules/lucene/LuceneIndex.java
--- a/src/luan/modules/lucene/LuceneIndex.java	Fri Aug 07 13:38:25 2020 -0600
+++ b/src/luan/modules/lucene/LuceneIndex.java	Fri Aug 07 21:42:16 2020 -0600
@@ -78,6 +78,7 @@
 import goodjava.lucene.api.LuceneIndexWriter;
 import goodjava.lucene.api.GoodIndexWriterConfig;
 import goodjava.lucene.api.LuceneUtils;
+import goodjava.lucene.logging.LoggingIndexWriter;
 import goodjava.parser.ParseException;
 import luan.modules.Utils;
 import luan.Luan;
@@ -121,6 +122,7 @@
 	public static final StringFieldParser STRING_FIELD_PARSER = new StringFieldParser(new KeywordAnalyzer());
 	public static final StringFieldParser LOWERCASE_FIELD_PARSER = new StringFieldParser(new LowercaseAnalyzer(luceneVersion));
 	public static final StringFieldParser ENGLISH_FIELD_PARSER = new StringFieldParser(new EnglishAnalyzer(luceneVersion));
+	private static final SortField ID_SORT = new SortField("id",SortField.Type.LONG);
 
 	private final Object version;
 
@@ -140,6 +142,7 @@
 
 	private final PostgresBackup postgresBackup;
 	private boolean wasCreated;
+	private final File logDir;
 
 	private LuceneIndex(Luan luan,File indexDir,LuanTable options)
 		throws LuanException, IOException, ClassNotFoundException, SQLException
@@ -151,6 +154,7 @@
 		String[] defaultFields = defaultFieldsTbl==null ? null : (String[])defaultFieldsTbl.asList().toArray(new String[0]);
 		LuanTable postgresSpec = Utils.removeTable(options,"postgres_spec");
 		LuanFunction supplementer = Utils.removeFunction(options,"supplementer");
+		logDir = (File)options.remove("log_dir");
 		Utils.checkEmpty(options);
 
 		mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields);
@@ -185,6 +189,8 @@
 		fsDir = FSDirectory.open(indexDir);
 		boolean wasCreated = !fsDir.getDirectory().exists();
 		writer = new LuceneIndexWriter(fsDir,config);
+		if( logDir != null )
+			writer = new LoggingIndexWriter((LuceneIndexWriter)writer,logDir);
 		reader = DirectoryReader.open(fsDir);
 		searcher = new IndexSearcher(reader);
 		initId();
@@ -781,15 +787,28 @@
 		CheckIndex.Status status = new CheckIndex(fsDir).checkIndex();
 		if( !status.clean )
 			logger.error("index not clean");
-		if( hasPostgres )
+		if( writer instanceof LoggingIndexWriter ) {
+			LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer;
+			logger.info("log check");
+			boolean ok = loggingWriter.check(ID_SORT);
+		}
+		if( hasPostgres ) {
+			logger.info("postgres check");
 			checkPostgres(luan);
+		}
 		logger.info("end check");
 	}
 
+	public void rebuild_log() throws IOException {
+		logger.info("start rebuild_log");
+		LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer;
+		loggingWriter.newLogs();
+		logger.info("end rebuild_log");
+	}
+
 	private void checkPostgres(Luan luan)
 		throws IOException, SQLException, LuanException, ParseException
 	{
-		//logger.info("start postgres check");
 		final PostgresBackup.Checker postgresChecker = postgresBackup.newChecker();
 		final IndexSearcher searcher = openSearcher();
 		try {