comparison src/luan/modules/Rpc.luan @ 1120:e8fc6712b468

luan Rpc uses luan.lib.rpc
author Franklin Schmidt <fschmidt@gmail.com>
date Mon, 07 Aug 2017 23:50:52 -0600
parents bae2d0c2576c
children ba4daf107e07
comparison
equal deleted inserted replaced
1119:87c674f3f6b7 1120:e8fc6712b468
1 java() 1 java()
2 local RpcLuan = require "java:luan.modules.RpcLuan" 2 local RpcClient = require "java:luan.lib.rpc.RpcClient"
3 local RpcServer = require "java:luan.lib.rpc.RpcServer"
4 local RpcCall = require "java:luan.lib.rpc.RpcCall"
5 local RpcResult = require "java:luan.lib.rpc.RpcResult"
6 local RpcException = require "java:luan.lib.rpc.RpcException"
7 local JavaRpc = require "java:luan.lib.rpc.Rpc"
8 local JavaLuan = require "java:luan.Luan"
9 local JavaUtils = require "java:luan.modules.Utils"
10 local IoLuan = require "java:luan.modules.IoLuan"
11 local ByteArrayInputStream = require "java:java.io.ByteArrayInputStream"
3 local Luan = require "luan:Luan.luan" 12 local Luan = require "luan:Luan.luan"
4 local error = Luan.error 13 local error = Luan.error
5 local set_metatable = Luan.set_metatable or error() 14 local set_metatable = Luan.set_metatable or error()
6 local try = Luan.try or error() 15 local try = Luan.try or error()
16 local ipairs = Luan.ipairs or error()
17 local assert_table = Luan.assert_table or error()
18 local type = Luan.type or error()
7 local Io = require "luan:Io.luan" 19 local Io = require "luan:Io.luan"
8 local Thread = require "luan:Thread.luan" 20 local Thread = require "luan:Thread.luan"
9 local Logging = require "luan:logging/Logging.luan" -- external dependency 21 local Table = require "luan:Table.luan"
22 local unpack = Table.unpack or error()
23 require "luan:logging/init.luan" -- initialize logging
24 local Logging = require "luan:logging/Logging.luan"
10 local logger = Logging.logger "Rpc" 25 local logger = Logging.logger "Rpc"
11 26
12 27
13 local Rpc = {} 28 local Rpc = {}
14 29
15 Rpc.port = 9101 30 Rpc.port = 9101
16 31
17 Rpc.call = RpcLuan.call -- Rpc.call(socket,fn_name,...) 32 local function java_args(list)
33 for i,v in ipairs(list) do
34 list[i] = JavaLuan.toJava(v)
35 end
36 return unpack(list)
37 end
38
39 local function luan_args(list,binary_in)
40 list = assert_table(list)
41 for i,v in ipairs(list) do
42 list[i] = JavaLuan.toLuan(v)
43 end
44 if binary_in ~= nil then
45 local i_in = list[#list]
46 list[#list] = nil
47 local type = list[i_in]
48 if type == "binary" then
49 list[i_in] = JavaUtils.readAll(binary_in)
50 elseif type == "input" then
51 list[i_in] = IoLuan.LuanInput.new(binary_in).table()
52 else
53 error(type)
54 end
55 end
56 return unpack(list)
57 end
58
59 local function encode_binary(args)
60 local binary_in, len_in, i_in
61 for i,v in ipairs(args) do
62 if type(v) == "binary" then
63 binary_in==nil or error "can't have multiple binary args"
64 i_in = i
65 binary_in = ByteArrayInputStream.new(v)
66 len_in = #v
67 args[i] = "binary"
68 elseif type(v) == "table" and v.java ~= nil and v.java.instanceof(IoLuan.LuanFile) then
69 binary_in==nil or error "can't have multiple binary args"
70 i_in = i
71 binary_in = v.java.inputStream()
72 len_in = v.length()
73 args[i] = "input"
74 end
75 end
76 args[#args+1] = i_in
77 return binary_in, len_in
78 end
79
80 function Rpc.caller(socket)
81 local java_socket = socket.java.socket
82 local client = RpcClient.new(java_socket)
83 return function(fn_name,...)
84 local args = {...}
85 local binary_in, len_in = encode_binary(args)
86 local call
87 if binary_in == nil then
88 call = RpcCall.new(fn_name,java_args(args))
89 else
90 call = RpcCall.new(binary_in,len_in,fn_name,java_args(args))
91 end
92 client.write(call)
93 if fn_name == "close" then
94 client.close()
95 return
96 end
97 return try {
98 function()
99 local result = client.read()
100 return luan_args(result.returnValues,result["in"])
101 end
102 catch = function(e)
103 local cause = e.java.getCause()
104 if cause ~= nil and cause.instanceof(RpcException) and cause.getMessage() == "luan" then
105 error(cause.values.get(0))
106 else
107 e.throw()
108 end
109 end
110 }
111 end_function
112 end_function
18 113
19 Rpc.functions = {} 114 Rpc.functions = {}
20 115
21 function Rpc.respond(socket,fns) 116 function Rpc.responder(socket,fns)
22 RpcLuan.respond( socket, fns or Rpc.functions ) 117 fns = fns or Rpc.functions
23 end 118 local java_socket = socket.java.socket
119 local server = RpcServer.new(java_socket)
120 local responder = {}
121 function responder.is_closed()
122 return server.isClosed()
123 end_function
124 function responder.respond()
125 local call = server.read()
126 if call==nil then return end
127 local cmd = call.cmd
128 if cmd == "close" then
129 server.close()
130 return
131 end_if
132 local fn = fns[cmd]
133 if fn == nil then
134 server.write(JavaRpc.COMMAND_NOT_FOUND)
135 return
136 end_if
137 local rtn = try {
138 function()
139 return {fn(luan_args(call.args,call["in"]))}
140 end
141 catch = function(e)
142 logger.warn(e)
143 local ex = RpcException.new("luan",e.get_message())
144 server.write(ex)
145 return nil
146 end
147 }
148 if rtn==nil then return end
149 local binary_in, len_in = encode_binary(rtn)
150 local result
151 if binary_in == nil then
152 result = RpcResult.new(java_args(rtn))
153 else
154 result = RpcResult.new(binary_in,len_in,java_args(rtn))
155 end
156 server.write(result)
157 end
158 return responder
159 end_function
24 160
25 function Rpc.remote_socket(socket_uri) 161 function Rpc.remote_socket(socket_uri)
162 local socket = Io.uri(socket_uri)
163 local call = Rpc.caller(socket)
26 local mt = {} 164 local mt = {}
27 function mt.__index(_,key) 165 function mt.__index(_,key)
28 return function(...) 166 return function(...)
29 local socket = Io.uri(socket_uri) 167 return call(key,...)
30 return Rpc.call(socket,key,...)
31 end 168 end
32 end 169 end
33 local t = {} 170 local t = {}
34 set_metatable(t,mt) 171 set_metatable(t,mt)
35 return t 172 return t
36 end 173 end_function
37 174
38 function Rpc.remote(domain) 175 function Rpc.remote(domain)
39 local socket = "socket:" .. domain .. ":" .. Rpc.port 176 local socket = "socket:" .. domain .. ":" .. Rpc.port
40 return Rpc.remote_socket(socket) 177 return Rpc.remote_socket(socket)
41 end 178 end_function
42 179
43 function Rpc.serve(port,fns) 180 function Rpc.serve(port,fns)
44 local server = Io.socket_server(port or Rpc.port) 181 local socket_server = Io.socket_server(port or Rpc.port)
45 while true do 182 while true do
46 try { 183 try {
47 function() 184 function()
48 local socket = server() 185 local socket = socket_server()
49 local function respond() 186 local function server()
50 try { 187 try {
51 function() 188 function()
52 Rpc.respond(socket,fns) 189 local responder = Rpc.responder(socket)
190 while not responder.is_closed() do
191 responder.respond()
192 end
53 end 193 end
54 catch = function(e) 194 catch = function(e)
55 logger.error(e) 195 logger.error(e)
56 end 196 end
57 } 197 }
58 end 198 end
59 Thread.fork(respond) 199 Thread.fork(server)
60 end 200 end
61 catch = function(e) 201 catch = function(e)
62 logger.error(e) 202 logger.error(e)
63 end 203 end
64 } 204 }
65 end 205 end_while
66 end 206 end_function
67 207
68 return Rpc 208 return Rpc