diff src/goodjava/lucene/backup/BackupIndexWriter.java @ 1499:22e15cf73040

lucene.backup
author Franklin Schmidt <fschmidt@gmail.com>
date Sat, 09 May 2020 23:14:13 -0600
parents af55cfad6e12
children f01abd6d5858
line wrap: on
line diff
--- a/src/goodjava/lucene/backup/BackupIndexWriter.java	Fri May 08 18:07:14 2020 -0600
+++ b/src/goodjava/lucene/backup/BackupIndexWriter.java	Sat May 09 23:14:13 2020 -0600
@@ -1,21 +1,39 @@
 package goodjava.lucene.backup;
 
 import java.io.File;
+import java.io.InputStream;
 import java.io.IOException;
+import java.net.Socket;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Arrays;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.SSLSocket;
 import goodjava.io.IoUtils;
+import goodjava.rpc.RpcClient;
+import goodjava.rpc.RpcCall;
+import goodjava.rpc.RpcResult;
+import goodjava.rpc.RpcException;
 import goodjava.lucene.api.LuceneIndexWriter;
+import goodjava.logging.Logger;
+import goodjava.logging.LoggerFactory;
 import goodjava.lucene.logging.LoggingIndexWriter;
 import goodjava.lucene.logging.LogFile;
 
 
 public class BackupIndexWriter extends LoggingIndexWriter {
+	private static final Logger logger = LoggerFactory.getLogger(BackupIndexWriter.class);
+	public static String[] backupDomains;
 	private final String name;
 	private final File dir;
+	private boolean isSyncPending = false;
 
 	public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,String name) throws IOException {
 		super(indexWriter,logDir);
+		if( backupDomains == null )
+			throw new RuntimeException("must set backupDomains");
 		this.name = name;
 		File f = new File(System.getProperty("java.io.tmpdir"));
 		dir = new File(f,"goodjava.lucene/"+name);
@@ -24,19 +42,92 @@
 
 	public synchronized void commit() throws IOException {
 		super.commit();
-		sync();
-	}
-
-	private void sync() throws IOException {
-		for( File f : dir.listFiles() ) {
-			IoUtils.delete(f);
-		}
-		List<LogFile> logs = new ArrayList<LogFile>();
-		for( LogFile log : this.logs ) {
-			File f = new File(dir,log.file.getName());
-			IoUtils.link(log.file,f);
-			logs.add( new LogFile(f) );
+		//sync();
+		if( !isSyncPending ) {
+			new Thread(sync).start();
+			isSyncPending = true;
 		}
 	}
 
+	public void runSync() {
+		sync.run();
+	}
+
+	private final Runnable sync = new Runnable() {
+		public synchronized void run() {
+			try {
+				sync();
+			} catch(IOException e) {
+				throw new RuntimeException(e);
+			}
+		}
+	};
+
+	private void sync() throws IOException {
+		List<LogFile> logs = new ArrayList<LogFile>();
+		synchronized(this) {
+			isSyncPending = false;
+			for( File f : dir.listFiles() ) {
+				IoUtils.delete(f);
+			}
+			for( LogFile log : this.logs ) {
+				File f = new File(dir,log.file.getName());
+				IoUtils.link(log.file,f);
+				logs.add( new LogFile(f) );
+			}
+		}
+		List logInfo = new ArrayList();
+		Map<String,LogFile> logMap = new HashMap<String,LogFile>();
+		for( LogFile log : logs ) {
+			Map fileInfo = new HashMap();
+			fileInfo.put("name",log.file.getName());
+			fileInfo.put("end",log.end());
+			logInfo.add(fileInfo);
+			logMap.put(log.file.getName(),log);
+		}
+		for( String backupDomain : backupDomains ) {
+			RpcClient rpc = rpcClient(backupDomain);
+			RpcCall call = new RpcCall("check",name,logInfo);
+			try {
+				while(true) {
+					rpc.write(call);
+					RpcResult result = rpc.read();
+					logger.info(Arrays.asList(result.returnValues).toString());
+					String status = (String)result.returnValues[0];
+					if( status.equals("ok") ) {
+						break;
+					} else if( status.equals("missing") ) {
+						String fileName = (String)result.returnValues[1];
+						LogFile log = logMap.get(fileName);
+						long len = log.end() - 8;
+						InputStream in = log.input();
+						call = new RpcCall(in,len,"add",name,logInfo,fileName);
+					} else if( status.equals("incomplete") ) {
+						String fileName = (String)result.returnValues[1];
+						long logEnd = (Long)result.returnValues[2];
+						LogFile log = logMap.get(fileName);
+						long len = log.end() - logEnd;
+						InputStream in = log.input();
+						in.skip(logEnd-8);
+						call = new RpcCall(in,len,"append",name,logInfo,fileName);
+					} else
+						throw new RuntimeException("status "+status);
+				}
+			} catch(RpcException e) {
+				logger.warn("",e);
+			}
+			rpc.close();
+		}
+	}
+
+	static RpcClient rpcClient(String backupDomain) throws IOException {
+		Socket socket;
+		if( BackupServer.cipherSuites == null ) {
+			socket = new Socket(backupDomain,BackupServer.port);
+		} else {
+			socket = SSLSocketFactory.getDefault().createSocket(backupDomain,BackupServer.port);
+			((SSLSocket)socket).setEnabledCipherSuites(BackupServer.cipherSuites);
+		}
+		return new RpcClient(socket);
+	}
 }