comparison src/luan/modules/lucene/LuceneIndex.java @ 1391:94f48cc76de8

add lucene check
author Franklin Schmidt <fschmidt@gmail.com>
date Thu, 05 Sep 2019 01:29:57 -0600
parents 179c4882c6b6
children 002152af497a
comparison
equal deleted inserted replaced
1390:179c4882c6b6 1391:94f48cc76de8
3 import java.io.Closeable; 3 import java.io.Closeable;
4 import java.io.File; 4 import java.io.File;
5 import java.io.FileOutputStream; 5 import java.io.FileOutputStream;
6 import java.io.FileInputStream; 6 import java.io.FileInputStream;
7 import java.io.IOException; 7 import java.io.IOException;
8 import java.sql.SQLException;
8 import java.util.Arrays; 9 import java.util.Arrays;
9 import java.util.Iterator; 10 import java.util.Iterator;
10 import java.util.Map; 11 import java.util.Map;
11 import java.util.HashMap; 12 import java.util.HashMap;
12 import java.util.List; 13 import java.util.List;
37 import org.apache.lucene.index.DirectoryReader; 38 import org.apache.lucene.index.DirectoryReader;
38 import org.apache.lucene.index.Term; 39 import org.apache.lucene.index.Term;
39 import org.apache.lucene.index.SnapshotDeletionPolicy; 40 import org.apache.lucene.index.SnapshotDeletionPolicy;
40 import org.apache.lucene.index.IndexCommit; 41 import org.apache.lucene.index.IndexCommit;
41 import org.apache.lucene.index.AtomicReaderContext; 42 import org.apache.lucene.index.AtomicReaderContext;
43 import org.apache.lucene.index.CheckIndex;
42 import org.apache.lucene.store.Directory; 44 import org.apache.lucene.store.Directory;
43 import org.apache.lucene.store.FSDirectory; 45 import org.apache.lucene.store.FSDirectory;
44 import org.apache.lucene.util.Version; 46 import org.apache.lucene.util.Version;
45 import org.apache.lucene.util.BytesRef; 47 import org.apache.lucene.util.BytesRef;
46 import org.apache.lucene.util.NumericUtils; 48 import org.apache.lucene.util.NumericUtils;
74 import luan.Luan; 76 import luan.Luan;
75 import luan.LuanTable; 77 import luan.LuanTable;
76 import luan.LuanFunction; 78 import luan.LuanFunction;
77 import luan.LuanException; 79 import luan.LuanException;
78 import luan.LuanRuntimeException; 80 import luan.LuanRuntimeException;
81 import luan.modules.parsers.LuanToString;
79 import luan.lib.logging.Logger; 82 import luan.lib.logging.Logger;
80 import luan.lib.logging.LoggerFactory; 83 import luan.lib.logging.LoggerFactory;
81 84
82 85
83 public final class LuceneIndex { 86 public final class LuceneIndex {
145 private IndexSearcher searcher; 148 private IndexSearcher searcher;
146 private final ThreadLocal<IndexSearcher> threadLocalSearcher = new ThreadLocal<IndexSearcher>(); 149 private final ThreadLocal<IndexSearcher> threadLocalSearcher = new ThreadLocal<IndexSearcher>();
147 private final MultiFieldParser mfp; 150 private final MultiFieldParser mfp;
148 private final Analyzer analyzer; 151 private final Analyzer analyzer;
149 152
150 private File fileDir; 153 private FSDirectory fsDir;
151 private int writeCount; 154 private int writeCount;
152 private AtomicInteger writeCounter = new AtomicInteger(); 155 private AtomicInteger writeCounter = new AtomicInteger();
153 156
154 private Set<String> indexOnly = new HashSet<String>(); 157 private Set<String> indexOnly = new HashSet<String>();
155 158
191 194
192 public boolean reopen() throws IOException { 195 public boolean reopen() throws IOException {
193 IndexWriterConfig conf = new IndexWriterConfig(version,analyzer); 196 IndexWriterConfig conf = new IndexWriterConfig(version,analyzer);
194 snapshotDeletionPolicy = new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy()); 197 snapshotDeletionPolicy = new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy());
195 conf.setIndexDeletionPolicy(snapshotDeletionPolicy); 198 conf.setIndexDeletionPolicy(snapshotDeletionPolicy);
196 FSDirectory dir = FSDirectory.open(indexDir); 199 fsDir = FSDirectory.open(indexDir);
197 fileDir = dir.getDirectory(); 200 boolean wasCreated = !fsDir.getDirectory().exists();
198 boolean wasCreated = !fileDir.exists(); 201 writer = new IndexWriter(fsDir,conf);
199 writer = new IndexWriter(dir,conf);
200 writer.commit(); // commit index creation 202 writer.commit(); // commit index creation
201 reader = DirectoryReader.open(dir); 203 reader = DirectoryReader.open(fsDir);
202 searcher = new IndexSearcher(reader); 204 searcher = new IndexSearcher(reader);
203 initId(); 205 initId();
204 return wasCreated; 206 return wasCreated;
205 } 207 }
206 208
233 throws IOException 235 throws IOException
234 { 236 {
235 if( postgresBackup != null ) { 237 if( postgresBackup != null ) {
236 final List<Long> ids = new ArrayList<Long>(); 238 final List<Long> ids = new ArrayList<Long>();
237 IndexSearcher searcher = openSearcher(); 239 IndexSearcher searcher = openSearcher();
238 MyCollector col = new MyCollector() { 240 try {
239 @Override public void collect(int iDoc) throws IOException { 241 MyCollector col = new MyCollector() {
240 Document doc = searcher.doc( docBase + iDoc ); 242 @Override public void collect(int iDoc) throws IOException {
241 Long id = (Long)doc.getField("id").numericValue(); 243 Document doc = searcher.doc( docBase + iDoc );
242 ids.add(id); 244 Long id = (Long)doc.getField("id").numericValue();
243 } 245 ids.add(id);
244 }; 246 }
245 searcher.search(query,col); 247 };
248 searcher.search(query,col);
249 } finally {
250 close(searcher);
251 }
246 postgresBackup.begin(); 252 postgresBackup.begin();
247 for( Long id : ids ) { 253 for( Long id : ids ) {
248 postgresBackup.delete(id); 254 postgresBackup.delete(id);
249 } 255 }
250 postgresBackup.commit(); 256 postgresBackup.commit();
290 writeLock.lock(); 296 writeLock.lock();
291 try { 297 try {
292 if( id == null ) { 298 if( id == null ) {
293 id = nextId(); 299 id = nextId();
294 doc.put("id",id); 300 doc.put("id",id);
301 if( postgresBackup != null )
302 postgresBackup.add(doc);
295 writer.addDocument(toLucene(doc,boosts)); 303 writer.addDocument(toLucene(doc,boosts));
304 } else {
296 if( postgresBackup != null ) 305 if( postgresBackup != null )
297 postgresBackup.add(id,doc); 306 postgresBackup.update(doc);
298 } else {
299 writer.updateDocument( term("id",id), toLucene(doc,boosts) ); 307 writer.updateDocument( term("id",id), toLucene(doc,boosts) );
300 if( postgresBackup != null )
301 postgresBackup.update(id,doc);
302 } 308 }
303 if(commit) writer.commit(); 309 if(commit) writer.commit();
304 } finally { 310 } finally {
305 wrote(); 311 wrote();
306 writeLock.unlock(); 312 writeLock.unlock();
411 } 417 }
412 418
413 public Object snapshot(LuanFunction fn) throws LuanException, IOException { 419 public Object snapshot(LuanFunction fn) throws LuanException, IOException {
414 IndexCommit ic = snapshotDeletionPolicy.snapshot(); 420 IndexCommit ic = snapshotDeletionPolicy.snapshot();
415 try { 421 try {
416 String dir = fileDir.toString(); 422 String dir = fsDir.getDirectory().toString();
417 LuanTable fileNames = new LuanTable(fn.luan(),new ArrayList(ic.getFileNames())); 423 LuanTable fileNames = new LuanTable(fn.luan(),new ArrayList(ic.getFileNames()));
418 return fn.call(dir,fileNames); 424 return fn.call(dir,fileNames);
419 } finally { 425 } finally {
420 snapshotDeletionPolicy.release(ic); 426 snapshotDeletionPolicy.release(ic);
421 } 427 }
801 807
802 public void rebuild_postgres_backup(LuanFunction completer) 808 public void rebuild_postgres_backup(LuanFunction completer)
803 throws IOException, LuanException 809 throws IOException, LuanException
804 { 810 {
805 writeLock.lock(); 811 writeLock.lock();
812 IndexSearcher searcher = openSearcher();
806 boolean ok = false; 813 boolean ok = false;
807 try { 814 try {
808 postgresBackup.begin(); 815 postgresBackup.begin();
809 postgresBackup.deleteAll(); 816 postgresBackup.deleteAll();
810 Query query = new PrefixQuery(new Term("id")); 817 Query query = new PrefixQuery(new Term("id"));
811 IndexSearcher searcher = openSearcher();
812 MyCollector col = new MyCollector() { 818 MyCollector col = new MyCollector() {
813 @Override public void collect(int iDoc) throws IOException { 819 @Override public void collect(int iDoc) throws IOException {
814 try { 820 try {
815 Document doc = searcher.doc( docBase + iDoc ); 821 Document doc = searcher.doc( docBase + iDoc );
816 LuanTable tbl = toTable(completer.luan(),doc); 822 LuanTable tbl = toTable(completer.luan(),doc);
817 tbl = (LuanTable)completer.call(tbl); 823 tbl = (LuanTable)completer.call(tbl);
818 Long id = (Long)tbl.get("id"); 824 postgresBackup.add(tbl);
819 //logger.info("id = "+id);
820 postgresBackup.add(id,tbl);
821 } catch(LuanException e) { 825 } catch(LuanException e) {
822 throw new LuanRuntimeException(e); 826 throw new LuanRuntimeException(e);
823 } 827 }
824 } 828 }
825 }; 829 };
829 throw (LuanException)e.getCause(); 833 throw (LuanException)e.getCause();
830 } 834 }
831 ok = true; 835 ok = true;
832 postgresBackup.commit(); 836 postgresBackup.commit();
833 } finally { 837 } finally {
838 close(searcher);
834 if( !ok ) 839 if( !ok )
835 postgresBackup.rollback(); 840 postgresBackup.rollback();
836 writeLock.unlock(); 841 writeLock.unlock();
837 } 842 }
838 } 843 }
867 throws LuanException, IOException 872 throws LuanException, IOException
868 { 873 {
869 writer.addDocument(toLucene(doc,null)); 874 writer.addDocument(toLucene(doc,null));
870 } 875 }
871 876
877 public void check(LuanFunction completer) throws IOException, SQLException, LuanException {
878 logger.info("start check");
879 CheckIndex.Status status = new CheckIndex(fsDir).checkIndex();
880 if( !status.clean )
881 logger.error("index not clean");
882 if( postgresBackup != null )
883 checkPostgres(completer);
884 logger.info("end check");
885 }
886
887 private void checkPostgres(LuanFunction completer) throws IOException, SQLException, LuanException {
888 final PostgresBackup.Checker postgresChecker;
889 final IndexSearcher searcher;
890 writeLock.lock();
891 try {
892 postgresChecker = postgresBackup.newChecker();
893 searcher = openSearcher();
894 } finally {
895 writeLock.unlock();
896 }
897 try {
898 final List<Long> idsLucene = new ArrayList<Long>();
899 Query query = new PrefixQuery(new Term("id"));
900 MyCollector col = new MyCollector() {
901 @Override public void collect(int iDoc) throws IOException {
902 Document doc = searcher.doc( docBase + iDoc );
903 Long id = (Long)doc.getField("id").numericValue();
904 idsLucene.add(id);
905 }
906 };
907 searcher.search(query,col);
908 Collections.sort(idsLucene);
909 final List<Long> idsPostgres = postgresChecker.getIds();
910 final int nLucene = idsLucene.size();
911 final int nPostgres = idsPostgres.size();
912 int iLucene = 0;
913 int iPostgres = 0;
914 LuanToString lts = new LuanToString();
915 lts.strict = true;
916 lts.numberTypes = true;
917 while( iLucene < nLucene && iPostgres < nPostgres ) {
918 long idLucene = idsLucene.get(iLucene);
919 long idPostgres = idsPostgres.get(iPostgres);
920 if( idLucene < idPostgres ) {
921 iLucene++;
922 logger.error("id "+idLucene+" found in lucene but not postgres");
923 } else if( idLucene > idPostgres ) {
924 iPostgres++;
925 logger.error("id "+idPostgres+" found in postgres but not lucene");
926 } else { // ==
927 LuanTable docPostgres = postgresChecker.getDoc(idPostgres);
928 TopDocs td = searcher.search(new TermQuery(term("id",idLucene)),1);
929 if( td.totalHits != 1 ) throw new RuntimeException();
930 Document doc = searcher.doc( td.scoreDocs[0].doc );
931 LuanTable docLucene = toTable(completer.luan(),doc);
932 docLucene = (LuanTable)completer.call(docLucene);
933 if( !equal(docPostgres,docLucene) ) {
934 logger.error("id "+idLucene+" not equal");
935 logger.error("lucene = "+lts.toString(docLucene));
936 logger.error("postgres = "+lts.toString(docPostgres));
937 }
938 iLucene++;
939 iPostgres++;
940 }
941 }
942 while( iLucene < nLucene ) {
943 long idLucene = idsLucene.get(iLucene++);
944 logger.error("id "+idLucene+" found in lucene but not postgres");
945 }
946 while( iPostgres < nPostgres ) {
947 long idPostgres = idsPostgres.get(iPostgres++);
948 logger.error("id "+idPostgres+" found in postgres but not lucene");
949 }
950 } finally {
951 close(searcher);
952 postgresChecker.close();
953 }
954 }
955
956 private boolean equal(LuanTable t1,LuanTable t2) throws LuanException {
957 return t1.asMap().equals(t2.asMap());
958 }
959
872 } 960 }