Mercurial Hosting > luan
diff src/luan/modules/Rpc.luan @ 1120:e8fc6712b468
luan Rpc uses luan.lib.rpc
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Mon, 07 Aug 2017 23:50:52 -0600 |
parents | bae2d0c2576c |
children | ba4daf107e07 |
line wrap: on
line diff
--- a/src/luan/modules/Rpc.luan Mon Aug 07 12:35:45 2017 -0600 +++ b/src/luan/modules/Rpc.luan Mon Aug 07 23:50:52 2017 -0600 @@ -1,12 +1,27 @@ java() -local RpcLuan = require "java:luan.modules.RpcLuan" +local RpcClient = require "java:luan.lib.rpc.RpcClient" +local RpcServer = require "java:luan.lib.rpc.RpcServer" +local RpcCall = require "java:luan.lib.rpc.RpcCall" +local RpcResult = require "java:luan.lib.rpc.RpcResult" +local RpcException = require "java:luan.lib.rpc.RpcException" +local JavaRpc = require "java:luan.lib.rpc.Rpc" +local JavaLuan = require "java:luan.Luan" +local JavaUtils = require "java:luan.modules.Utils" +local IoLuan = require "java:luan.modules.IoLuan" +local ByteArrayInputStream = require "java:java.io.ByteArrayInputStream" local Luan = require "luan:Luan.luan" local error = Luan.error local set_metatable = Luan.set_metatable or error() local try = Luan.try or error() +local ipairs = Luan.ipairs or error() +local assert_table = Luan.assert_table or error() +local type = Luan.type or error() local Io = require "luan:Io.luan" local Thread = require "luan:Thread.luan" -local Logging = require "luan:logging/Logging.luan" -- external dependency +local Table = require "luan:Table.luan" +local unpack = Table.unpack or error() +require "luan:logging/init.luan" -- initialize logging +local Logging = require "luan:logging/Logging.luan" local logger = Logging.logger "Rpc" @@ -14,55 +29,180 @@ Rpc.port = 9101 -Rpc.call = RpcLuan.call -- Rpc.call(socket,fn_name,...) +local function java_args(list) + for i,v in ipairs(list) do + list[i] = JavaLuan.toJava(v) + end + return unpack(list) +end + +local function luan_args(list,binary_in) + list = assert_table(list) + for i,v in ipairs(list) do + list[i] = JavaLuan.toLuan(v) + end + if binary_in ~= nil then + local i_in = list[#list] + list[#list] = nil + local type = list[i_in] + if type == "binary" then + list[i_in] = JavaUtils.readAll(binary_in) + elseif type == "input" then + list[i_in] = IoLuan.LuanInput.new(binary_in).table() + else + error(type) + end + end + return unpack(list) +end + +local function encode_binary(args) + local binary_in, len_in, i_in + for i,v in ipairs(args) do + if type(v) == "binary" then + binary_in==nil or error "can't have multiple binary args" + i_in = i + binary_in = ByteArrayInputStream.new(v) + len_in = #v + args[i] = "binary" + elseif type(v) == "table" and v.java ~= nil and v.java.instanceof(IoLuan.LuanFile) then + binary_in==nil or error "can't have multiple binary args" + i_in = i + binary_in = v.java.inputStream() + len_in = v.length() + args[i] = "input" + end + end + args[#args+1] = i_in + return binary_in, len_in +end + +function Rpc.caller(socket) + local java_socket = socket.java.socket + local client = RpcClient.new(java_socket) + return function(fn_name,...) + local args = {...} + local binary_in, len_in = encode_binary(args) + local call + if binary_in == nil then + call = RpcCall.new(fn_name,java_args(args)) + else + call = RpcCall.new(binary_in,len_in,fn_name,java_args(args)) + end + client.write(call) + if fn_name == "close" then + client.close() + return + end + return try { + function() + local result = client.read() + return luan_args(result.returnValues,result["in"]) + end + catch = function(e) + local cause = e.java.getCause() + if cause ~= nil and cause.instanceof(RpcException) and cause.getMessage() == "luan" then + error(cause.values.get(0)) + else + e.throw() + end + end + } + end_function +end_function Rpc.functions = {} -function Rpc.respond(socket,fns) - RpcLuan.respond( socket, fns or Rpc.functions ) -end +function Rpc.responder(socket,fns) + fns = fns or Rpc.functions + local java_socket = socket.java.socket + local server = RpcServer.new(java_socket) + local responder = {} + function responder.is_closed() + return server.isClosed() + end_function + function responder.respond() + local call = server.read() + if call==nil then return end + local cmd = call.cmd + if cmd == "close" then + server.close() + return + end_if + local fn = fns[cmd] + if fn == nil then + server.write(JavaRpc.COMMAND_NOT_FOUND) + return + end_if + local rtn = try { + function() + return {fn(luan_args(call.args,call["in"]))} + end + catch = function(e) + logger.warn(e) + local ex = RpcException.new("luan",e.get_message()) + server.write(ex) + return nil + end + } + if rtn==nil then return end + local binary_in, len_in = encode_binary(rtn) + local result + if binary_in == nil then + result = RpcResult.new(java_args(rtn)) + else + result = RpcResult.new(binary_in,len_in,java_args(rtn)) + end + server.write(result) + end + return responder +end_function function Rpc.remote_socket(socket_uri) + local socket = Io.uri(socket_uri) + local call = Rpc.caller(socket) local mt = {} function mt.__index(_,key) return function(...) - local socket = Io.uri(socket_uri) - return Rpc.call(socket,key,...) + return call(key,...) end end local t = {} set_metatable(t,mt) return t -end +end_function function Rpc.remote(domain) local socket = "socket:" .. domain .. ":" .. Rpc.port return Rpc.remote_socket(socket) -end +end_function function Rpc.serve(port,fns) - local server = Io.socket_server(port or Rpc.port) + local socket_server = Io.socket_server(port or Rpc.port) while true do try { function() - local socket = server() - local function respond() + local socket = socket_server() + local function server() try { function() - Rpc.respond(socket,fns) + local responder = Rpc.responder(socket) + while not responder.is_closed() do + responder.respond() + end end catch = function(e) logger.error(e) end } end - Thread.fork(respond) + Thread.fork(server) end catch = function(e) logger.error(e) end } - end -end + end_while +end_function return Rpc