diff src/luan/modules/Rpc.luan @ 1120:e8fc6712b468

luan Rpc uses luan.lib.rpc
author Franklin Schmidt <fschmidt@gmail.com>
date Mon, 07 Aug 2017 23:50:52 -0600
parents bae2d0c2576c
children ba4daf107e07
line wrap: on
line diff
--- a/src/luan/modules/Rpc.luan	Mon Aug 07 12:35:45 2017 -0600
+++ b/src/luan/modules/Rpc.luan	Mon Aug 07 23:50:52 2017 -0600
@@ -1,12 +1,27 @@
 java()
-local RpcLuan = require "java:luan.modules.RpcLuan"
+local RpcClient = require "java:luan.lib.rpc.RpcClient"
+local RpcServer = require "java:luan.lib.rpc.RpcServer"
+local RpcCall = require "java:luan.lib.rpc.RpcCall"
+local RpcResult = require "java:luan.lib.rpc.RpcResult"
+local RpcException = require "java:luan.lib.rpc.RpcException"
+local JavaRpc = require "java:luan.lib.rpc.Rpc"
+local JavaLuan = require "java:luan.Luan"
+local JavaUtils = require "java:luan.modules.Utils"
+local IoLuan = require "java:luan.modules.IoLuan"
+local ByteArrayInputStream = require "java:java.io.ByteArrayInputStream"
 local Luan = require "luan:Luan.luan"
 local error = Luan.error
 local set_metatable = Luan.set_metatable or error()
 local try = Luan.try or error()
+local ipairs = Luan.ipairs or error()
+local assert_table = Luan.assert_table or error()
+local type = Luan.type or error()
 local Io = require "luan:Io.luan"
 local Thread = require "luan:Thread.luan"
-local Logging = require "luan:logging/Logging.luan"  -- external dependency
+local Table = require "luan:Table.luan"
+local unpack = Table.unpack or error()
+require "luan:logging/init.luan"  -- initialize logging
+local Logging = require "luan:logging/Logging.luan"
 local logger = Logging.logger "Rpc"
 
 
@@ -14,55 +29,180 @@
 
 Rpc.port = 9101
 
-Rpc.call = RpcLuan.call  -- Rpc.call(socket,fn_name,...)
+local function java_args(list)
+	for i,v in ipairs(list) do
+		list[i] = JavaLuan.toJava(v)
+	end
+	return unpack(list)
+end
+
+local function luan_args(list,binary_in)
+	list = assert_table(list)
+	for i,v in ipairs(list) do
+		list[i] = JavaLuan.toLuan(v)
+	end
+	if binary_in ~= nil then
+		local i_in = list[#list]
+		list[#list] = nil
+		local type = list[i_in]
+		if type == "binary" then
+			list[i_in] = JavaUtils.readAll(binary_in)
+		elseif type == "input" then
+			list[i_in] = IoLuan.LuanInput.new(binary_in).table()
+		else
+			error(type)
+		end
+	end
+	return unpack(list)
+end
+
+local function encode_binary(args)
+	local binary_in, len_in, i_in
+	for i,v in ipairs(args) do
+		if type(v) == "binary" then
+			binary_in==nil or error "can't have multiple binary args"
+			i_in = i
+			binary_in = ByteArrayInputStream.new(v)
+			len_in = #v
+			args[i] = "binary"
+		elseif type(v) == "table" and v.java ~= nil and v.java.instanceof(IoLuan.LuanFile) then
+			binary_in==nil or error "can't have multiple binary args"
+			i_in = i
+			binary_in = v.java.inputStream()
+			len_in = v.length()
+			args[i] = "input"
+		end
+	end
+	args[#args+1] = i_in
+	return binary_in, len_in
+end
+
+function Rpc.caller(socket)
+	local java_socket = socket.java.socket
+	local client = RpcClient.new(java_socket)
+	return function(fn_name,...)
+		local args = {...}
+		local binary_in, len_in = encode_binary(args)
+		local call
+		if binary_in == nil then
+			call = RpcCall.new(fn_name,java_args(args))
+		else
+			call = RpcCall.new(binary_in,len_in,fn_name,java_args(args))
+		end
+		client.write(call)
+		if fn_name == "close" then
+			client.close()
+			return
+		end
+		return try {
+			function()
+				local result = client.read()
+				return luan_args(result.returnValues,result["in"])
+			end
+			catch = function(e)
+				local cause = e.java.getCause()
+				if cause ~= nil and cause.instanceof(RpcException) and cause.getMessage() == "luan" then
+					error(cause.values.get(0))
+				else
+					e.throw()
+				end
+			end
+		}
+	end_function
+end_function
 
 Rpc.functions = {}
 
-function Rpc.respond(socket,fns)
-	RpcLuan.respond( socket, fns or Rpc.functions )
-end
+function Rpc.responder(socket,fns)
+	fns = fns or Rpc.functions
+	local java_socket = socket.java.socket
+	local server = RpcServer.new(java_socket)
+	local responder = {}
+	function responder.is_closed()
+		return server.isClosed()
+	end_function
+	function responder.respond()
+		local call = server.read()
+		if call==nil then return end
+		local cmd = call.cmd
+		if cmd == "close" then
+			server.close()
+			return
+		end_if
+		local fn = fns[cmd]
+		if fn == nil then
+			server.write(JavaRpc.COMMAND_NOT_FOUND)
+			return
+		end_if
+		local rtn = try {
+			function()
+				return {fn(luan_args(call.args,call["in"]))}
+			end
+			catch = function(e)
+				logger.warn(e)
+				local ex = RpcException.new("luan",e.get_message())
+				server.write(ex)
+				return nil
+			end
+		}
+		if rtn==nil then return end
+		local binary_in, len_in = encode_binary(rtn)
+		local result
+		if binary_in == nil then
+			result = RpcResult.new(java_args(rtn))
+		else
+			result = RpcResult.new(binary_in,len_in,java_args(rtn))
+		end
+		server.write(result)
+	end
+	return responder
+end_function
 
 function Rpc.remote_socket(socket_uri)
+	local socket = Io.uri(socket_uri)
+	local call = Rpc.caller(socket)
 	local mt = {}
 	function mt.__index(_,key)
 		return function(...)
-			local socket = Io.uri(socket_uri)
-			return Rpc.call(socket,key,...)
+			return call(key,...)
 		end
 	end
 	local t = {}
 	set_metatable(t,mt)
 	return t
-end
+end_function
 
 function Rpc.remote(domain)
 	local socket = "socket:" .. domain .. ":" .. Rpc.port
 	return Rpc.remote_socket(socket)
-end
+end_function
 
 function Rpc.serve(port,fns)
-	local server = Io.socket_server(port or Rpc.port)
+	local socket_server = Io.socket_server(port or Rpc.port)
 	while true do
 		try {
 			function()
-				local socket = server()
-				local function respond()
+				local socket = socket_server()
+				local function server()
 					try {
 						function()
-							Rpc.respond(socket,fns)
+							local responder = Rpc.responder(socket)
+							while not responder.is_closed() do
+								responder.respond()
+							end
 						end
 						catch = function(e)
 							logger.error(e)
 						end
 					}
 				end
-				Thread.fork(respond)
+				Thread.fork(server)
 			end
 			catch = function(e)
 				logger.error(e)
 			end
 		}
-	end
-end
+	end_while
+end_function
 
 return Rpc