comparison src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1548:736ec76bbf42

lucene log work
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 27 Sep 2020 22:07:18 -0600
parents 35601f15ecc3
children 41c32da4cbd1
comparison
equal deleted inserted replaced
1547:f24a9ba7551e 1548:736ec76bbf42
8 import java.io.FileInputStream; 8 import java.io.FileInputStream;
9 import java.io.IOException; 9 import java.io.IOException;
10 import java.util.Map; 10 import java.util.Map;
11 import java.util.Set; 11 import java.util.Set;
12 import java.util.HashSet; 12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.ArrayList;
15 import java.util.Random; 13 import java.util.Random;
16 import java.util.concurrent.TimeUnit; 14 import java.util.concurrent.TimeUnit;
17 import org.apache.lucene.document.Document; 15 import org.apache.lucene.document.Document;
18 import org.apache.lucene.index.DirectoryReader; 16 import org.apache.lucene.index.DirectoryReader;
19 import org.apache.lucene.index.IndexReader; 17 import org.apache.lucene.index.IndexReader;
27 import org.apache.lucene.search.SortField; 25 import org.apache.lucene.search.SortField;
28 import org.apache.lucene.search.Sort; 26 import org.apache.lucene.search.Sort;
29 import org.apache.lucene.store.Directory; 27 import org.apache.lucene.store.Directory;
30 import org.apache.lucene.store.FSDirectory; 28 import org.apache.lucene.store.FSDirectory;
31 import goodjava.io.IoUtils; 29 import goodjava.io.IoUtils;
30 import goodjava.lucene.api.GoodWriter;
32 import goodjava.lucene.api.GoodIndexWriter; 31 import goodjava.lucene.api.GoodIndexWriter;
33 import goodjava.lucene.api.LuceneIndexWriter; 32 import goodjava.lucene.api.LuceneIndexWriter;
34 import goodjava.lucene.api.GoodCollector; 33 import goodjava.lucene.api.GoodCollector;
35 import goodjava.lucene.api.LuceneUtils; 34 import goodjava.lucene.api.LuceneUtils;
36 import goodjava.logging.Logger; 35 import goodjava.logging.Logger;
37 import goodjava.logging.LoggerFactory; 36 import goodjava.logging.LoggerFactory;
38 37
39 38
40 public class LoggingIndexWriter implements GoodIndexWriter { 39 public class LoggingIndexWriter implements GoodIndexWriter {
41 private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class); 40 private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class);
42 private static final int version = 1; 41 private static final int version = 2;
43 private static final int OP_DELETE_ALL = 1; 42 private static final int OP_DELETE_ALL = 1;
44 private static final int OP_DELETE_DOCUMENTS = 2; 43 private static final int OP_DELETE_DOCUMENTS = 2;
45 private static final int OP_ADD_DOCUMENT = 3; 44 private static final int OP_ADD_DOCUMENT = 3;
46 private static final int OP_UPDATE_DOCUMENT = 4; 45 private static final int OP_UPDATE_DOCUMENT = 4;
47 private static final int OP_TAG = 5; 46 private static final int OP_TAG = 5;
48 private static final Random rnd = new Random(); 47 private static final Random rnd = new Random();
49 48
50 public final LuceneIndexWriter indexWriter; 49 public final LuceneIndexWriter indexWriter;
51 public boolean wasCreated; 50 public boolean wasCreated;
52 private final File logDir; 51 private final File logDir;
53 protected final List<LogFile> logs = new ArrayList<LogFile>(); 52 private final long logTime;
53 protected final LogFile[] logs = new LogFile[3];
54 private LogOutputStream log; 54 private LogOutputStream log;
55 private final File index; 55 private final File index;
56 private final SemaphoreLock mergeLock = new SemaphoreLock(); 56 private final SemaphoreLock mergeLock = new SemaphoreLock();
57 57
58 public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException { 58 public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir,long logTime)
59 throws IOException
60 {
59 this.indexWriter = indexWriter; 61 this.indexWriter = indexWriter;
60 this.logDir = logDir; 62 this.logDir = logDir;
63 this.logTime = logTime;
61 IoUtils.mkdirs(logDir); 64 IoUtils.mkdirs(logDir);
62 if( !logDir.isDirectory() ) 65 if( !logDir.isDirectory() )
63 throw new RuntimeException(); 66 throw new RuntimeException();
64 index = new File(logDir,"index"); 67 index = new File(logDir,"index");
65 if( index.exists() ) { 68 if( index.exists() ) {
66 DataInputStream dis = new DataInputStream(new FileInputStream(index)); 69 DataInputStream dis = new DataInputStream(new FileInputStream(index));
67 try { 70 try {
68 if( dis.readInt() == version ) { 71 if( dis.readInt() == version ) {
69 final int n = dis.readInt(); 72 for( int i=0; i<logs.length; i++ ) {
70 for( int i=0; i<n; i++ ) {
71 File file = new File( logDir, dis.readUTF() ); 73 File file = new File( logDir, dis.readUTF() );
72 logs.add( new LogFile(file) ); 74 logs[i] = new LogFile(file);
73 } 75 }
74 deleteUnusedFiles(); 76 deleteUnusedFiles();
75 setLog(); 77 setLog();
76 wasCreated = false; 78 wasCreated = false;
77 return; 79 return;
82 } 84 }
83 newLogs(); 85 newLogs();
84 wasCreated = true; 86 wasCreated = true;
85 } 87 }
86 88
89 public IndexReader openReader() throws IOException {
90 return indexWriter.openReader();
91 }
92
87 public IndexWriter getLuceneIndexWriter() { 93 public IndexWriter getLuceneIndexWriter() {
88 return indexWriter.getLuceneIndexWriter(); 94 return indexWriter.getLuceneIndexWriter();
89 } 95 }
90 96
91 private void setLog() throws IOException { 97 private void setLog() throws IOException {
92 if( log != null ) 98 if( log != null )
93 log.close(); 99 log.close();
94 log = logs.get(logs.size()-1).output(); 100 log = logs[2].output();
95 } 101 }
96 /* 102 /*
97 public synchronized boolean isMerging() { 103 public synchronized boolean isMerging() {
98 return mergeLock.isLocked(); 104 return mergeLock.isLocked();
99 } 105 }
116 } 122 }
117 } 123 }
118 124
119 private void newLogs2() throws IOException { 125 private void newLogs2() throws IOException {
120 logger.info("building new logs"); 126 logger.info("building new logs");
121 logs.clear(); 127 for( int i=0; i<logs.length; i++ ) {
122 for( int i=0; i<2; i++ ) { 128 logs[i] = newLogFile();
123 logs.add( newLogFile() ); 129 }
124 } 130 LogOutputStream log = logs[0].output();
125 logLucene( System.currentTimeMillis(), logs.get(0), indexWriter ); 131 logLucene( System.currentTimeMillis(), log, indexWriter );
132 log.close();
126 writeIndex(); 133 writeIndex();
127 setLog(); 134 setLog();
128 logger.info("done building new logs"); 135 logger.info("done building new logs");
129 } 136 }
130 137
131 private static void logLucene(long time,LogFile logLucene,LuceneIndexWriter indexWriter) throws IOException { 138 public synchronized void logLucene()
132 LogOutputStream log = logLucene.output(); 139 throws IOException
140 {
141 //log.rollback(); ?
142 logLucene( System.currentTimeMillis(), log, indexWriter );
143 }
144
145 private static void logLucene(long time,LogOutputStream log,LuceneIndexWriter indexWriter)
146 throws IOException
147 {
148 log.writeLong(time);
149 log.writeByte(OP_DELETE_ALL);
133 IndexReader reader = indexWriter.openReader(); 150 IndexReader reader = indexWriter.openReader();
134 final IndexSearcher searcher = new IndexSearcher(reader); 151 final IndexSearcher searcher = new IndexSearcher(reader);
135 Query query = new MatchAllDocsQuery(); 152 Query query = new MatchAllDocsQuery();
136 searcher.search( query, new GoodCollector(){ 153 searcher.search( query, new GoodCollector(){
137 public void collectDoc(int iDoc) throws IOException { 154 public void collectDoc(int iDoc) throws IOException {
142 log.writeMap(storedFields); 159 log.writeMap(storedFields);
143 } 160 }
144 }); 161 });
145 reader.close(); 162 reader.close();
146 log.commit(); 163 log.commit();
147 log.close();
148 } 164 }
149 165
150 private LogFile newLogFile() throws IOException { 166 private LogFile newLogFile() throws IOException {
151 File file; 167 File file;
152 do { 168 do {
157 173
158 private void deleteUnusedFiles() throws IOException { 174 private void deleteUnusedFiles() throws IOException {
159 deleteUnusedFiles(logs,index); 175 deleteUnusedFiles(logs,index);
160 } 176 }
161 177
162 private static void deleteUnusedFiles(List<LogFile> logs,File index) throws IOException { 178 private static void deleteUnusedFiles(LogFile[] logs,File index) throws IOException {
163 Set<String> used = new HashSet<String>(); 179 Set<String> used = new HashSet<String>();
164 used.add( index.getName() ); 180 used.add( index.getName() );
165 for( LogFile lf : logs ) { 181 for( LogFile lf : logs ) {
166 used.add( lf.file.getName() ); 182 used.add( lf.file.getName() );
167 } 183 }
174 190
175 private void writeIndex() throws IOException { 191 private void writeIndex() throws IOException {
176 writeIndex(logs,index); 192 writeIndex(logs,index);
177 } 193 }
178 194
179 public static void writeIndex(List<LogFile> logs,File index) throws IOException { 195 public static void writeIndex(LogFile[] logs,File index) throws IOException {
196 if( logs.length != 3 )
197 throw new RuntimeException();
180 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 198 ByteArrayOutputStream baos = new ByteArrayOutputStream();
181 DataOutputStream dos = new DataOutputStream(baos); 199 DataOutputStream dos = new DataOutputStream(baos);
182 dos.writeInt(version); 200 dos.writeInt(version);
183 dos.writeInt(logs.size());
184 for( LogFile lf : logs ) { 201 for( LogFile lf : logs ) {
185 String fileName = lf.file.getName(); 202 String fileName = lf.file.getName();
186 dos.writeUTF(fileName); 203 dos.writeUTF(fileName);
187 } 204 }
188 dos.close(); 205 dos.close();
192 deleteUnusedFiles(logs,index); 209 deleteUnusedFiles(logs,index);
193 //logger.info("writeIndex "+logs.toString()); 210 //logger.info("writeIndex "+logs.toString());
194 } 211 }
195 212
196 private void mergeLogs() throws IOException { 213 private void mergeLogs() throws IOException {
197 //logger.info("merge"); 214 logger.info("merge");
198 if( logs.size() <= 3 ) 215 if( !mergeLock.isLocked() ) {
216 logger.error("merge without lock");
199 return; 217 return;
200 LogFile first = logs.get(0); 218 }
201 LogFile second = logs.get(1); 219 LogFile first = logs[0];
220 LogFile second = logs[1];
202 long lastTime = second.file.lastModified(); 221 long lastTime = second.file.lastModified();
203 File dirFile = new File(logDir,"merge"); 222 File dirFile = new File(logDir,"merge");
204 if( dirFile.exists() ) 223 if( dirFile.exists() )
205 throw new RuntimeException(); 224 throw new RuntimeException();
206 Directory dir = FSDirectory.open(dirFile); 225 Directory dir = FSDirectory.open(dirFile);
207 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); 226 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
208 playLog( first.input(), mergeWriter, null ); 227 playLog( first.input(), mergeWriter );
209 playLog( second.input(), mergeWriter, null ); 228 playLog( second.input(), mergeWriter );
210 mergeWriter.commit(); 229 mergeWriter.commit();
211 LogFile merge = newLogFile(); 230 LogFile merge = newLogFile();
212 logLucene( lastTime, merge, mergeWriter ); 231 LogOutputStream log = merge.output();
232 logLucene( lastTime, log, mergeWriter );
233 log.close();
213 mergeWriter.close(); 234 mergeWriter.close();
214 synchronized(this) { 235 synchronized(this) {
215 //check(); 236 //check();
216 logs.remove(0); 237 logs[0] = merge;
217 logs.set(0,merge); 238 logs[1] = logs[2];
239 logs[2] = newLogFile();
218 writeIndex(); 240 writeIndex();
241 setLog();
219 //check(null); 242 //check(null);
220 } 243 }
221 } 244 }
222 private final Runnable mergeLogs = new Runnable() { public void run() { 245 private final Runnable mergeLogs = new Runnable() { public void run() {
223 try { 246 try {
263 } 286 }
264 287
265 protected boolean doCheck(SortField sortField) throws IOException { 288 protected boolean doCheck(SortField sortField) throws IOException {
266 boolean ok = true; 289 boolean ok = true;
267 IndexReader indexReader; 290 IndexReader indexReader;
268 List<LogInputStream> logReaders; 291 LogInputStream[] logReaders;
269 synchronized(this) { 292 synchronized(this) {
270 indexReader = indexWriter.openReader(); 293 indexReader = indexWriter.openReader();
271 logReaders = logReaders(logs); 294 logReaders = logReaders(logs);
272 } 295 }
273 try { 296 try {
275 indexWriter.check(); 298 indexWriter.check();
276 File dirFile = new File(logDir,"check"); 299 File dirFile = new File(logDir,"check");
277 IoUtils.deleteRecursively(dirFile); 300 IoUtils.deleteRecursively(dirFile);
278 Directory dir = FSDirectory.open(dirFile); 301 Directory dir = FSDirectory.open(dirFile);
279 LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); 302 LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
280 playLogs(logReaders,checkWriter,null); 303 playLogs(logReaders,checkWriter);
281 //logger.info("check lucene"); 304 //logger.info("check lucene");
282 IndexReader checkReader = checkWriter.openReader(); 305 IndexReader checkReader = checkWriter.openReader();
283 int nCheck = checkReader.numDocs(); 306 int nCheck = checkReader.numDocs();
284 int nOrig = indexReader.numDocs(); 307 int nOrig = indexReader.numDocs();
285 if( nCheck != nOrig ) { 308 if( nCheck != nOrig ) {
373 public synchronized void commit() throws IOException { 396 public synchronized void commit() throws IOException {
374 indexWriter.commit(); 397 indexWriter.commit();
375 log.commit(); 398 log.commit();
376 if( mergeLock.isLocked() ) 399 if( mergeLock.isLocked() )
377 return; 400 return;
378 if( log.logFile.end() > logs.get(0).end() ) { 401 if( logs[1].file.lastModified() < System.currentTimeMillis() - logTime ) {
379 logs.add( newLogFile() );
380 writeIndex();
381 setLog();
382 }
383 if( logs.size() > 3 ) {
384 getMergeLock(); 402 getMergeLock();
385 new Thread(mergeLogs).start(); 403 new Thread(mergeLogs).start();
386 // mergeLogs.run(); 404 // mergeLogs.run();
387 } 405 }
388 } 406 }
429 log.writeLong(System.currentTimeMillis()); 447 log.writeLong(System.currentTimeMillis());
430 log.writeByte(op); 448 log.writeByte(op);
431 } 449 }
432 450
433 // return whether stopped at tag 451 // return whether stopped at tag
434 public synchronized boolean playLogs(String upToTag) throws IOException { 452 public synchronized void playLogs(GoodWriter writer) throws IOException {
435 return playLogs( logReaders(logs), indexWriter, upToTag ); 453 if( writer == null )
436 } 454 writer = indexWriter;
437 455 playLogs( logReaders(logs), writer );
438 private static List<LogInputStream> logReaders(List<LogFile> logs) throws IOException { 456 }
439 List<LogInputStream> logReaders = new ArrayList<LogInputStream>(); 457
440 for( LogFile log : logs ) { 458 private static LogInputStream[] logReaders(LogFile[] logs) throws IOException {
441 logReaders.add( log.input() ); 459 LogInputStream[] logReaders = new LogInputStream[logs.length];
460 for( int i=0; i<logs.length; i++ ) {
461 logReaders[i] = logs[i].input();
442 } 462 }
443 return logReaders; 463 return logReaders;
444 } 464 }
445 465
446 private static boolean playLogs(List<LogInputStream> logReaders,LuceneIndexWriter indexWriter,String upToTag) 466 private static void playLogs(LogInputStream[] logReaders,GoodWriter indexWriter)
447 throws IOException 467 throws IOException
448 { 468 {
449 if( numDocs(indexWriter) != 0 ) 469 if( numDocs(indexWriter) != 0 )
450 throw new RuntimeException ("not empty"); 470 throw new RuntimeException ("not empty");
451 boolean rtn = false;
452 for( LogInputStream reader : logReaders ) { 471 for( LogInputStream reader : logReaders ) {
453 if( playLog(reader,indexWriter,upToTag) ) { 472 playLog(reader,indexWriter);
454 rtn = true;
455 break;
456 }
457 } 473 }
458 indexWriter.commit(); 474 indexWriter.commit();
459 return rtn; 475 }
460 } 476
461 477 private static int numDocs(GoodWriter indexWriter) throws IOException {
462 private static int numDocs(LuceneIndexWriter indexWriter) throws IOException {
463 IndexReader reader = indexWriter.openReader(); 478 IndexReader reader = indexWriter.openReader();
464 int n = reader.numDocs(); 479 int n = reader.numDocs();
465 reader.close(); 480 reader.close();
466 return n; 481 return n;
467 } 482 }
468 483
469 private static boolean playLog(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) 484 private static void playLog(LogInputStream in,GoodWriter indexWriter)
470 throws IOException 485 throws IOException
471 { 486 {
472 boolean rtn = false;
473 while( in.available() > 0 ) { 487 while( in.available() > 0 ) {
474 if( playOp(in,indexWriter,upToTag) ) { 488 playOp(in,indexWriter);
475 rtn = true;
476 break;
477 }
478 } 489 }
479 in.close(); 490 in.close();
480 return rtn; 491 }
481 } 492
482 493 private static void playOp(LogInputStream in,GoodWriter indexWriter)
483 private static boolean playOp(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) throws IOException { 494 throws IOException
495 {
484 in.readLong(); // time 496 in.readLong(); // time
485 int op = in.readByte(); 497 int op = in.readByte();
486 switch(op) { 498 switch(op) {
487 case OP_DELETE_ALL: 499 case OP_DELETE_ALL:
488 indexWriter.deleteAll(); 500 indexWriter.deleteAll();
489 return false; 501 return;
490 case OP_DELETE_DOCUMENTS: 502 case OP_DELETE_DOCUMENTS:
491 { 503 {
492 Query query = in.readQuery(); 504 Query query = in.readQuery();
493 //System.out.println("OP_DELETE_DOCUMENTS "+query); 505 //System.out.println("OP_DELETE_DOCUMENTS "+query);
494 indexWriter.deleteDocuments(query); 506 indexWriter.deleteDocuments(query);
495 return false; 507 return;
496 } 508 }
497 case OP_ADD_DOCUMENT: 509 case OP_ADD_DOCUMENT:
498 { 510 {
499 Map storedFields = in.readMap(); 511 Map storedFields = in.readMap();
500 indexWriter.addDocument(storedFields); 512 indexWriter.addDocument(storedFields);
501 return false; 513 return;
502 } 514 }
503 case OP_UPDATE_DOCUMENT: 515 case OP_UPDATE_DOCUMENT:
504 { 516 {
505 String keyFieldName = in.readUTF(); 517 String keyFieldName = in.readUTF();
506 Map storedFields = in.readMap(); 518 Map storedFields = in.readMap();
507 indexWriter.updateDocument(keyFieldName,storedFields); 519 indexWriter.updateDocument(keyFieldName,storedFields);
508 return false; 520 return;
509 } 521 }
510 case OP_TAG: 522 case OP_TAG:
511 { 523 {
512 String tag = in.readUTF(); 524 String tag = in.readUTF();
513 return tag.equals(upToTag); 525 indexWriter.tag(tag);
526 return;
514 } 527 }
515 default: 528 default:
516 throw new RuntimeException("invalid op "+op); 529 throw new RuntimeException("invalid op "+op);
517 } 530 }
518 } 531 }