Mercurial Hosting > luan
view core/src/luan/modules/RpcLuan.java @ 744:4b8695f1cfc4
add rpc IO type
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Wed, 13 Jul 2016 20:39:08 -0600 |
parents | 5578541125ea |
children | 1a101ac9ea46 |
line wrap: on
line source
package luan.modules; import java.io.InputStream; import java.io.OutputStream; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.InputStreamReader; import java.io.IOException; import java.io.EOFException; import java.net.Socket; import java.nio.charset.StandardCharsets; import java.util.Set; import java.util.IdentityHashMap; import java.util.Collections; import java.util.Map; import luan.Luan; import luan.LuanState; import luan.LuanTable; import luan.LuanFunction; import luan.LuanException; import luan.LuanMethod; public final class RpcLuan { private static final int NIL = 0; private static final int STRING = 1; private static final int BOOLEAN = 2; private static final int NUMBER = 3; private static final int BINARY = 4; private static final int TABLE = 5; private static final int IO = 6; @LuanMethod public static Object[] call(LuanState luan,LuanTable socketTbl,String fnName,Object... args) throws LuanException, IOException { IoLuan.LuanSocket luanSocket = (IoLuan.LuanSocket)socketTbl.rawGet("java"); Socket socket = luanSocket.socket; InputStream in = new BufferedInputStream(socket.getInputStream()); OutputStream out = new BufferedOutputStream(socket.getOutputStream()); Close close = new Close(); try { writeString(out,fnName); writeObjs(out,luan,args); out.flush(); socket.shutdownOutput(); boolean ok = readBoolean(in); if( ok ) { return readObjs(in,luan,close); } else { String msg = readString(in); throw new LuanException(msg); } } finally { if( close.b) { socket.close(); } } } public static void respond(LuanState luan,LuanTable socketTbl,LuanTable fns) throws IOException, LuanException { IoLuan.LuanSocket luanSocket = (IoLuan.LuanSocket)socketTbl.rawGet("java"); Socket socket = luanSocket.socket; InputStream in = new BufferedInputStream(socket.getInputStream()); OutputStream out = new BufferedOutputStream(socket.getOutputStream()); try { Object[] rtn; try { String fnName = readString(in); Object[] args = readObjs(in,luan,null); LuanFunction fn = (LuanFunction)fns.get(luan,fnName); if( fn == null ) throw new LuanException( "function not found: " + fnName ); rtn = Luan.array(fn.call(luan,args)); } catch(LuanException e) { writeBoolean(out,false); writeString(out,e.getFullMessage()); return; } writeBoolean(out,true); writeObjs(out,luan,rtn); } finally { out.flush(); socket.close(); } } private static void writeObjs(OutputStream out,LuanState luan,Object[] a) throws IOException, LuanException { IoLuan.LuanIn luanIn = null; writeInt(out,a.length); for( Object obj : a ) { if( obj instanceof LuanTable ) { LuanTable tbl = (LuanTable)obj; Object java = tbl.rawGet("java"); if( java instanceof IoLuan.LuanIn ) { if( luanIn != null ) throw new LuanException("can't have multiple IO params"); luanIn = (IoLuan.LuanIn)java; out.write(IO); continue; } } writeObj(out,luan,obj); } if( luanIn != null ) { InputStream in = luanIn.inputStream(); Utils.copyAll(in,out); } } private static Object[] readObjs(InputStream in,LuanState luan,Close close) throws IOException, LuanException { int n = readInt(in); Object[] rtn = new Object[n]; for( int i=0; i<n; i++ ) { rtn[i] = readObj(in,luan,close); } return rtn; } private static void writeObj(OutputStream out,LuanState luan,Object obj) throws IOException, LuanException { if( obj == null ) { out.write(NIL); } else if( obj instanceof String ) { out.write(STRING); writeString(out,(String)obj); } else if( obj instanceof Boolean ) { out.write(BOOLEAN); writeBoolean(out,(Boolean)obj); } else if( obj instanceof Number ) { out.write(NUMBER); writeString(out,obj.toString()); } else if( obj instanceof byte[] ) { byte[] a = (byte[])obj; out.write(BINARY); writeInt(out,a.length); out.write(a); } else if( obj instanceof LuanTable ) { out.write(TABLE); String s = pickle( luan, obj, Collections.newSetFromMap(new IdentityHashMap<LuanTable,Boolean>()) ); writeString(out,s); } else throw new LuanException( "invalid type: " + obj.getClass() ); } private static Object readObj(InputStream in,LuanState luan,Close close) throws IOException, LuanException { int type = in.read(); switch(type) { case NIL: return null; case STRING: return readString(in); case BOOLEAN: return readBoolean(in); case NUMBER: return Double.valueOf(readString(in)); case BINARY: return readBinary(in,readInt(in)); case TABLE: String s = readString(in); LuanFunction fn = Luan.load("return "+s,"rpc-reader"); return fn.call(luan); case IO: return new LuanInputStream(in,close).table(); default: throw new LuanException( "invalid type: " + type ); } } private static Boolean readBoolean(InputStream in) throws IOException { return Boolean.valueOf(readString(in)); } private static String readString(InputStream in) throws IOException { int len = readInt(in); byte[] a = readBinary(in,len); return new String(a,StandardCharsets.UTF_8); } private static int readInt(InputStream in) throws IOException { int ch1 = in.read(); int ch2 = in.read(); int ch3 = in.read(); int ch4 = in.read(); if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException(); return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); } private static byte[] readBinary(InputStream in,int size) throws IOException { byte[] a = new byte[size]; int i = 0; while( i < size ) { int n = in.read(a,i,size-i); if( n == -1 ) throw new EOFException(); i += n; } return a; } private static void writeBoolean(OutputStream out,Boolean b) throws IOException { writeString(out,b.toString()); } private static void writeString(OutputStream out,String s) throws IOException { byte[] a = s.getBytes(StandardCharsets.UTF_8); writeInt(out,a.length); out.write(a); } private static void writeInt(OutputStream out,int v) throws IOException { out.write((v >>> 24) & 0xFF); out.write((v >>> 16) & 0xFF); out.write((v >>> 8) & 0xFF); out.write((v >>> 0) & 0xFF); } private static String pickle(LuanState luan,Object obj,Set<LuanTable> set) throws LuanException { if( obj == null ) return "nil"; if( obj instanceof Boolean ) return obj.toString(); if( obj instanceof Number ) return Luan.toString((Number)obj); if( obj instanceof String ) return "\"" + Luan.stringEncode((String)obj) + "\""; if( obj instanceof LuanTable ) { LuanTable tbl = (LuanTable)obj; if( !set.add(tbl) ) { throw new LuanException( "circular reference in table" ); } StringBuilder sb = new StringBuilder(); sb.append( "{" ); for( Map.Entry<Object,Object> entry : tbl.iterable(luan) ) { sb.append( "[" ); sb.append( pickle(luan,entry.getKey(),set) ); sb.append( "]=" ); sb.append( pickle(luan,entry.getValue(),set) ); sb.append( ", " ); } sb.append( "}" ); return sb.toString(); } throw new LuanException( "invalid type: " + obj.getClass() ); } private static class Close { boolean b = true; } private static class LuanInputStream extends IoLuan.LuanIn { private final InputStream in; private final boolean close; public LuanInputStream(InputStream in,Close close) { this.in = in; this.close = close!=null && close.b; if(this.close) close.b = false; } private void close() throws IOException { if(close) in.close(); } @Override public InputStream inputStream() { return in; } @Override public String to_string() { return "<input_stream>"; } @Override public String to_uri_string() { throw new UnsupportedOperationException(); } @Override public String read_text() throws IOException { String rtn = Utils.readAll(new InputStreamReader(in)); close(); return rtn; } @Override public byte[] read_binary() throws IOException { byte[] rtn = Utils.readAll(in); close(); return rtn; } @Override public boolean exists() { return true; } }; }