view src/luan/modules/ThreadLuan.java @ 1599:f2a663a4ba9e

web logging
author Franklin Schmidt <fschmidt@gmail.com>
date Mon, 05 Apr 2021 00:13:05 -0600
parents c922446f53aa
children 7c7f28c724e8
line wrap: on
line source

package luan.modules;

import java.io.Closeable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import goodjava.util.WeakCacheMap;
import luan.Luan;
import luan.LuanFunction;
import luan.LuanTable;
import luan.LuanException;
import luan.LuanMutable;
import luan.modules.logging.LuanLogger;
import goodjava.logging.Logger;
import goodjava.logging.LoggerFactory;


public final class ThreadLuan {
	private static final Logger logger = LoggerFactory.getLogger(ThreadLuan.class);

	private static final Executor exec = Executors.newCachedThreadPool();
	public static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

	private static Runnable runnable(final Luan luan,final LuanFunction fn) {
		return new Runnable() {
			public synchronized void run() {
				LuanLogger.startThreadLogging(luan);
				try {
					fn.call(luan);
				} catch(LuanException e) {
					e.printStackTrace();
				} finally {
					LuanLogger.endThreadLogging();
				}
			}
		};
	}

	public static void fork(Luan luan,LuanFunction fn) throws LuanException {
		luan = new Luan(luan);
		LuanMutable.makeImmutable(fn);
		exec.execute(runnable(luan,fn));
	}

	private static final Map<String,ScheduledFuture> scheduleds = new WeakCacheMap<String,ScheduledFuture>();

	private static void cancel(ScheduledFuture sf,String src) {
		boolean b = sf.cancel(false);
		if( !sf.isCancelled() )
			logger.error(src+" cancel="+b+" isCancelled="+sf.isCancelled()+" isDone="+sf.isDone()+" "+sf);
	}

	public static synchronized void schedule_closure(Luan luan,LuanFunction initFn,LuanTable options)
		throws LuanException
	{
		final Luan newLuan = new Luan(luan);
		LuanMutable.makeImmutable(initFn);
		LuanFunction fn = (LuanFunction)initFn.call(newLuan);
		scheduleFn(luan,newLuan,fn,options);
	}

	public static synchronized void schedule(Luan luan,LuanFunction fn,LuanTable options)
		throws LuanException
	{
		final Luan newLuan = new Luan(luan);
		LuanMutable.makeImmutable(fn);
		scheduleFn(luan,newLuan,fn,options);
	}

	private static synchronized void scheduleFn(Luan luan,final Luan newLuan,LuanFunction fn,LuanTable options)
		throws LuanException
	{
		options = new LuanTable(options);
		Number delay = Utils.removeNumber(options,"delay");
		Number repeatingDelay = Utils.removeNumber(options,"repeating_delay");
		Number repeatingRate = Utils.removeNumber(options,"repeating_rate");
		Boolean dontGc = Utils.removeBoolean(options,"dont_gc");
		String id = Utils.removeString(options,"id");
		if( repeatingDelay!=null && repeatingRate!=null )
			throw new LuanException("can't define both repeating_delay and repeating_rate");
		boolean repeating = repeatingDelay!=null || repeatingRate!=null;
		Utils.checkEmpty(options);
		if( id != null ) {
			ScheduledFuture sf = scheduleds.remove(id);
			if( sf != null )
				cancel(sf,"id "+id);
		}
		final Runnable r = runnable(newLuan,fn);
		final ScheduledFuture sf;
		if( repeatingDelay != null ) {
			if( delay==null )
				delay = repeatingDelay;
			sf = scheduler.scheduleWithFixedDelay(r,delay.longValue(),repeatingDelay.longValue(),TimeUnit.MILLISECONDS);
		} else if( repeatingRate != null ) {
			if( delay==null )
				delay = repeatingRate;
			sf = scheduler.scheduleWithFixedDelay(r,delay.longValue(),repeatingRate.longValue(),TimeUnit.MILLISECONDS);
		} else if( delay != null ) {
			sf = scheduler.schedule(r,delay.longValue(),TimeUnit.MILLISECONDS);
		} else {
			scheduler.schedule(r,0L,TimeUnit.MILLISECONDS);
			return;
		}
		if( !Boolean.TRUE.equals(dontGc) ) {
			Object c = new Object() {
				protected void finalize() throws Throwable {
					cancel(sf,"gc");
				}
			};
			luan.registry().put(c,c);  // cancel on gc
		}
		if( id != null )
			scheduleds.put(id,sf);
	}


	public static void sleep(long millis) throws InterruptedException {
		Thread.sleep(millis);
	}


	public static final class Callable {
		private long expires;
		private final Luan luan = new Luan();
		private final LuanTable fns;

		Callable(LuanFunction initFn) throws LuanException {
			LuanMutable.makeImmutable(initFn);
			this.fns = (LuanTable)initFn.call(luan);
		}

		public synchronized Object call(Luan callerLuan,String fnName,Object... args) throws LuanException {
			LuanMutable.makeImmutable(args);
			Object f = fns.get(luan,fnName);
			if( f == null )
				throw new LuanException("function '"+fnName+"' not found in global_callable");
			if( !(f instanceof LuanFunction) )
				throw new LuanException("value of '"+fnName+"' not a function in global_callable");
			LuanFunction fn = (LuanFunction)f;
			Object rtn = fn.call(luan,args);
			LuanMutable.makeImmutable(rtn);
			return rtn;
		}
	}

	private static Map<String,Callable> callableMap = new HashMap<String,Callable>();

	private static void sweep() {
		long now = System.currentTimeMillis();
		for( Iterator<Callable> iter = callableMap.values().iterator(); iter.hasNext(); ) {
			Callable callable = iter.next();
			if( callable.expires < now )
				iter.remove();
		}
	}

	public static synchronized Callable globalCallable(String name,LuanFunction initFn,long timeout) throws LuanException {
		Callable callable = callableMap.get(name);
		if( callable == null ) {
			sweep();
			callable = new Callable(initFn);
			callableMap.put(name,callable);
		}
		callable.expires = System.currentTimeMillis() + timeout;
		return callable;
	}

	public static synchronized void removeGlobalCallable(String name) {
		callableMap.remove(name);
	}


	public static Object runInLock(Luan luan,Lock lock,long timeout,LuanFunction fn,Object... args)
		throws LuanException, InterruptedException
	{
		if( !lock.tryLock(timeout,TimeUnit.MILLISECONDS) )
			throw new LuanException("failed to acquire lock");
		try {
			return fn.call(luan,args);
		} finally {
			lock.unlock();
		}
	}

	private static final Map<String,Lock> locks = new WeakCacheMap<String,Lock>();

	public static synchronized Lock getLock(String key) {
		Lock lock = locks.get(key);
		if( lock == null ) {
			lock = new ReentrantLock();
			locks.put(key,lock);
		}
		return lock;
	}

}