Mercurial Hosting > luan
comparison src/luan/modules/Rpc.luan @ 1120:e8fc6712b468
luan Rpc uses luan.lib.rpc
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Mon, 07 Aug 2017 23:50:52 -0600 |
parents | bae2d0c2576c |
children | ba4daf107e07 |
comparison
equal
deleted
inserted
replaced
1119:87c674f3f6b7 | 1120:e8fc6712b468 |
---|---|
1 java() | 1 java() |
2 local RpcLuan = require "java:luan.modules.RpcLuan" | 2 local RpcClient = require "java:luan.lib.rpc.RpcClient" |
3 local RpcServer = require "java:luan.lib.rpc.RpcServer" | |
4 local RpcCall = require "java:luan.lib.rpc.RpcCall" | |
5 local RpcResult = require "java:luan.lib.rpc.RpcResult" | |
6 local RpcException = require "java:luan.lib.rpc.RpcException" | |
7 local JavaRpc = require "java:luan.lib.rpc.Rpc" | |
8 local JavaLuan = require "java:luan.Luan" | |
9 local JavaUtils = require "java:luan.modules.Utils" | |
10 local IoLuan = require "java:luan.modules.IoLuan" | |
11 local ByteArrayInputStream = require "java:java.io.ByteArrayInputStream" | |
3 local Luan = require "luan:Luan.luan" | 12 local Luan = require "luan:Luan.luan" |
4 local error = Luan.error | 13 local error = Luan.error |
5 local set_metatable = Luan.set_metatable or error() | 14 local set_metatable = Luan.set_metatable or error() |
6 local try = Luan.try or error() | 15 local try = Luan.try or error() |
16 local ipairs = Luan.ipairs or error() | |
17 local assert_table = Luan.assert_table or error() | |
18 local type = Luan.type or error() | |
7 local Io = require "luan:Io.luan" | 19 local Io = require "luan:Io.luan" |
8 local Thread = require "luan:Thread.luan" | 20 local Thread = require "luan:Thread.luan" |
9 local Logging = require "luan:logging/Logging.luan" -- external dependency | 21 local Table = require "luan:Table.luan" |
22 local unpack = Table.unpack or error() | |
23 require "luan:logging/init.luan" -- initialize logging | |
24 local Logging = require "luan:logging/Logging.luan" | |
10 local logger = Logging.logger "Rpc" | 25 local logger = Logging.logger "Rpc" |
11 | 26 |
12 | 27 |
13 local Rpc = {} | 28 local Rpc = {} |
14 | 29 |
15 Rpc.port = 9101 | 30 Rpc.port = 9101 |
16 | 31 |
17 Rpc.call = RpcLuan.call -- Rpc.call(socket,fn_name,...) | 32 local function java_args(list) |
33 for i,v in ipairs(list) do | |
34 list[i] = JavaLuan.toJava(v) | |
35 end | |
36 return unpack(list) | |
37 end | |
38 | |
39 local function luan_args(list,binary_in) | |
40 list = assert_table(list) | |
41 for i,v in ipairs(list) do | |
42 list[i] = JavaLuan.toLuan(v) | |
43 end | |
44 if binary_in ~= nil then | |
45 local i_in = list[#list] | |
46 list[#list] = nil | |
47 local type = list[i_in] | |
48 if type == "binary" then | |
49 list[i_in] = JavaUtils.readAll(binary_in) | |
50 elseif type == "input" then | |
51 list[i_in] = IoLuan.LuanInput.new(binary_in).table() | |
52 else | |
53 error(type) | |
54 end | |
55 end | |
56 return unpack(list) | |
57 end | |
58 | |
59 local function encode_binary(args) | |
60 local binary_in, len_in, i_in | |
61 for i,v in ipairs(args) do | |
62 if type(v) == "binary" then | |
63 binary_in==nil or error "can't have multiple binary args" | |
64 i_in = i | |
65 binary_in = ByteArrayInputStream.new(v) | |
66 len_in = #v | |
67 args[i] = "binary" | |
68 elseif type(v) == "table" and v.java ~= nil and v.java.instanceof(IoLuan.LuanFile) then | |
69 binary_in==nil or error "can't have multiple binary args" | |
70 i_in = i | |
71 binary_in = v.java.inputStream() | |
72 len_in = v.length() | |
73 args[i] = "input" | |
74 end | |
75 end | |
76 args[#args+1] = i_in | |
77 return binary_in, len_in | |
78 end | |
79 | |
80 function Rpc.caller(socket) | |
81 local java_socket = socket.java.socket | |
82 local client = RpcClient.new(java_socket) | |
83 return function(fn_name,...) | |
84 local args = {...} | |
85 local binary_in, len_in = encode_binary(args) | |
86 local call | |
87 if binary_in == nil then | |
88 call = RpcCall.new(fn_name,java_args(args)) | |
89 else | |
90 call = RpcCall.new(binary_in,len_in,fn_name,java_args(args)) | |
91 end | |
92 client.write(call) | |
93 if fn_name == "close" then | |
94 client.close() | |
95 return | |
96 end | |
97 return try { | |
98 function() | |
99 local result = client.read() | |
100 return luan_args(result.returnValues,result["in"]) | |
101 end | |
102 catch = function(e) | |
103 local cause = e.java.getCause() | |
104 if cause ~= nil and cause.instanceof(RpcException) and cause.getMessage() == "luan" then | |
105 error(cause.values.get(0)) | |
106 else | |
107 e.throw() | |
108 end | |
109 end | |
110 } | |
111 end_function | |
112 end_function | |
18 | 113 |
19 Rpc.functions = {} | 114 Rpc.functions = {} |
20 | 115 |
21 function Rpc.respond(socket,fns) | 116 function Rpc.responder(socket,fns) |
22 RpcLuan.respond( socket, fns or Rpc.functions ) | 117 fns = fns or Rpc.functions |
23 end | 118 local java_socket = socket.java.socket |
119 local server = RpcServer.new(java_socket) | |
120 local responder = {} | |
121 function responder.is_closed() | |
122 return server.isClosed() | |
123 end_function | |
124 function responder.respond() | |
125 local call = server.read() | |
126 if call==nil then return end | |
127 local cmd = call.cmd | |
128 if cmd == "close" then | |
129 server.close() | |
130 return | |
131 end_if | |
132 local fn = fns[cmd] | |
133 if fn == nil then | |
134 server.write(JavaRpc.COMMAND_NOT_FOUND) | |
135 return | |
136 end_if | |
137 local rtn = try { | |
138 function() | |
139 return {fn(luan_args(call.args,call["in"]))} | |
140 end | |
141 catch = function(e) | |
142 logger.warn(e) | |
143 local ex = RpcException.new("luan",e.get_message()) | |
144 server.write(ex) | |
145 return nil | |
146 end | |
147 } | |
148 if rtn==nil then return end | |
149 local binary_in, len_in = encode_binary(rtn) | |
150 local result | |
151 if binary_in == nil then | |
152 result = RpcResult.new(java_args(rtn)) | |
153 else | |
154 result = RpcResult.new(binary_in,len_in,java_args(rtn)) | |
155 end | |
156 server.write(result) | |
157 end | |
158 return responder | |
159 end_function | |
24 | 160 |
25 function Rpc.remote_socket(socket_uri) | 161 function Rpc.remote_socket(socket_uri) |
162 local socket = Io.uri(socket_uri) | |
163 local call = Rpc.caller(socket) | |
26 local mt = {} | 164 local mt = {} |
27 function mt.__index(_,key) | 165 function mt.__index(_,key) |
28 return function(...) | 166 return function(...) |
29 local socket = Io.uri(socket_uri) | 167 return call(key,...) |
30 return Rpc.call(socket,key,...) | |
31 end | 168 end |
32 end | 169 end |
33 local t = {} | 170 local t = {} |
34 set_metatable(t,mt) | 171 set_metatable(t,mt) |
35 return t | 172 return t |
36 end | 173 end_function |
37 | 174 |
38 function Rpc.remote(domain) | 175 function Rpc.remote(domain) |
39 local socket = "socket:" .. domain .. ":" .. Rpc.port | 176 local socket = "socket:" .. domain .. ":" .. Rpc.port |
40 return Rpc.remote_socket(socket) | 177 return Rpc.remote_socket(socket) |
41 end | 178 end_function |
42 | 179 |
43 function Rpc.serve(port,fns) | 180 function Rpc.serve(port,fns) |
44 local server = Io.socket_server(port or Rpc.port) | 181 local socket_server = Io.socket_server(port or Rpc.port) |
45 while true do | 182 while true do |
46 try { | 183 try { |
47 function() | 184 function() |
48 local socket = server() | 185 local socket = socket_server() |
49 local function respond() | 186 local function server() |
50 try { | 187 try { |
51 function() | 188 function() |
52 Rpc.respond(socket,fns) | 189 local responder = Rpc.responder(socket) |
190 while not responder.is_closed() do | |
191 responder.respond() | |
192 end | |
53 end | 193 end |
54 catch = function(e) | 194 catch = function(e) |
55 logger.error(e) | 195 logger.error(e) |
56 end | 196 end |
57 } | 197 } |
58 end | 198 end |
59 Thread.fork(respond) | 199 Thread.fork(server) |
60 end | 200 end |
61 catch = function(e) | 201 catch = function(e) |
62 logger.error(e) | 202 logger.error(e) |
63 end | 203 end |
64 } | 204 } |
65 end | 205 end_while |
66 end | 206 end_function |
67 | 207 |
68 return Rpc | 208 return Rpc |