001package co.codewizards.cloudstore.ls.rest.server;
002
003import static co.codewizards.cloudstore.core.util.Util.*;
004import static java.util.Objects.*;
005
006import java.lang.reflect.Proxy;
007import java.util.ArrayList;
008import java.util.HashMap;
009import java.util.HashSet;
010import java.util.LinkedList;
011import java.util.List;
012import java.util.Map;
013import java.util.Set;
014
015import co.codewizards.cloudstore.core.Uid;
016import co.codewizards.cloudstore.core.dto.Error;
017import co.codewizards.cloudstore.core.dto.RemoteException;
018import co.codewizards.cloudstore.core.dto.RemoteExceptionUtil;
019import co.codewizards.cloudstore.core.io.TimeoutException;
020import co.codewizards.cloudstore.core.util.ExceptionUtil;
021import co.codewizards.cloudstore.ls.core.dto.ErrorResponse;
022import co.codewizards.cloudstore.ls.core.dto.InverseServiceRequest;
023import co.codewizards.cloudstore.ls.core.dto.InverseServiceResponse;
024import co.codewizards.cloudstore.ls.core.dto.NullResponse;
025import co.codewizards.cloudstore.ls.core.invoke.ClassInfo;
026import co.codewizards.cloudstore.ls.core.invoke.ClassInfoMap;
027import co.codewizards.cloudstore.ls.core.invoke.ClassManager;
028import co.codewizards.cloudstore.ls.core.invoke.DelayedMethodInvocationResponse;
029import co.codewizards.cloudstore.ls.core.invoke.IncDecRefCountQueue;
030import co.codewizards.cloudstore.ls.core.invoke.InverseMethodInvocationRequest;
031import co.codewizards.cloudstore.ls.core.invoke.InverseMethodInvocationResponse;
032import co.codewizards.cloudstore.ls.core.invoke.Invoker;
033import co.codewizards.cloudstore.ls.core.invoke.MethodInvocationRequest;
034import co.codewizards.cloudstore.ls.core.invoke.MethodInvocationResponse;
035import co.codewizards.cloudstore.ls.core.invoke.ObjectManager;
036import co.codewizards.cloudstore.ls.core.invoke.ObjectRef;
037import co.codewizards.cloudstore.ls.core.invoke.RemoteObjectProxy;
038import co.codewizards.cloudstore.ls.core.invoke.RemoteObjectProxyFactory;
039import co.codewizards.cloudstore.ls.core.invoke.RemoteObjectProxyInvocationHandler;
040
041public class InverseInvoker implements Invoker {
042        /**
043         * Timeout (in milliseconds) before sending an empty HTTP response to the polling client. The client does
044         * <i>long polling</i> in order to allow for
045         * {@linkplain #performInverseServiceRequest(InverseServiceRequest) inverse service invocations}.
046         * <p>
047         * This timeout must be (significantly) shorter than {@link ObjectManager#EVICT_UNUSED_OBJECT_MANAGER_TIMEOUT_MS} to make sure, the
048         * {@linkplain #pollInverseServiceRequest() polling} serves additionally as a keep-alive for
049         * the server-side {@code ObjectManager}.
050         */
051        private static final long POLL_INVERSE_SERVICE_REQUEST_TIMEOUT_MS = 15L * 1000L; // 15 seconds
052
053        /**
054         * Timeout for {@link #performInverseServiceRequest(InverseServiceRequest)}.
055         * <p>
056         * If an inverse service-request does not receive a response within this timeout, a {@link TimeoutException} is thrown.
057         * <p>
058         * Please note, that the {@code invoke*} methods (e.g. {@link #invoke(Object, String, Object...)} or
059         * {@link #invokeConstructor(Class, Object...)}) can take much longer, because the other side will return
060         * a {@link DelayedMethodInvocationResponse} after a much shorter timeout (a few dozen seconds). This allows
061         * the actual method to be invoked to take how long it wants (unlimited!) while at the same time detecting very
062         * quickly, if the other side is dead (this timeout).
063         */
064        private static final long PERFORM_INVERSE_SERVICE_REQUEST_TIMEOUT_MS = 3L * 60L * 1000L; // 3 minutes is more than enough, because we have DelayedMethodInvocationResponse
065
066        private final IncDecRefCountQueue incDecRefCountQueue = new IncDecRefCountQueue(this);
067        private final ObjectManager objectManager;
068        private final LinkedList<InverseServiceRequest> inverseServiceRequests = new LinkedList<>();
069        private final Set<Uid> requestIdsWaitingForResponse = new HashSet<Uid>(); // synchronized by: requestId2InverseServiceResponse
070        private final Map<Uid, InverseServiceResponse> requestId2InverseServiceResponse = new HashMap<Uid, InverseServiceResponse>();
071        private final ClassInfoMap classInfoMap = new ClassInfoMap();
072
073        private volatile boolean diedOfTimeout;
074
075        public static InverseInvoker getInverseInvoker(final ObjectManager objectManager) {
076                requireNonNull(objectManager, "objectManager");
077
078                synchronized (objectManager) {
079                        InverseInvoker inverseInvoker = (InverseInvoker) objectManager.getContextObject(InverseInvoker.class.getName());
080                        if (inverseInvoker == null) {
081                                inverseInvoker = new InverseInvoker(objectManager);
082                                objectManager.putContextObject(InverseInvoker.class.getName(), inverseInvoker);
083                        }
084                        return inverseInvoker;
085                }
086        }
087
088        private InverseInvoker(final ObjectManager objectManager) {
089                this.objectManager = requireNonNull(objectManager, "objectManager");
090        }
091
092        @Override
093        public ObjectManager getObjectManager() {
094                return objectManager;
095        }
096
097        @Override
098        public <T> T invokeStatic(final Class<?> clazz, final String methodName, final Object ... arguments) {
099                requireNonNull(clazz, "clazz");
100                requireNonNull(methodName, "methodName");
101                return invokeStatic(clazz.getName(), methodName, (String[]) null, arguments);
102        }
103
104        @Override
105        public <T> T invokeStatic(final String className, final String methodName, final Object ... arguments) {
106                requireNonNull(className, "className");
107                requireNonNull(methodName, "methodName");
108                return invokeStatic(className, methodName, (String[]) null, arguments);
109        }
110
111        @Override
112        public <T> T invokeStatic(final Class<?> clazz, final String methodName, final Class<?>[] argumentTypes, final Object ... arguments) {
113                requireNonNull(clazz, "clazz");
114                requireNonNull(methodName, "methodName");
115                return invokeStatic(clazz.getName(), methodName, toClassNames(argumentTypes), arguments);
116        }
117
118        @Override
119        public <T> T invokeStatic(final String className, final String methodName, final String[] argumentTypeNames, final Object ... arguments) {
120                requireNonNull(className, "className");
121                requireNonNull(methodName, "methodName");
122
123                final MethodInvocationRequest methodInvocationRequest = MethodInvocationRequest.forStaticInvocation(
124                                className, methodName, argumentTypeNames, arguments);
125
126                return invoke(methodInvocationRequest);
127        }
128
129        @Override
130        public <T> T invokeConstructor(final Class<T> clazz, final Object ... arguments) {
131                requireNonNull(clazz, "clazz");
132                return invokeConstructor(clazz.getName(), (String[]) null, arguments);
133        }
134
135        @Override
136        public <T> T invokeConstructor(final String className, final Object ... arguments) {
137                requireNonNull(className, "className");
138                return invokeConstructor(className, (String[]) null, arguments);
139        }
140
141        @Override
142        public <T> T invokeConstructor(final Class<T> clazz, final Class<?>[] argumentTypes, final Object ... arguments) {
143                requireNonNull(clazz, "clazz");
144                return invokeConstructor(clazz.getName(), toClassNames(argumentTypes), arguments);
145        }
146
147        @Override
148        public <T> T invokeConstructor(final String className, final String[] argumentTypeNames, final Object ... arguments) {
149                requireNonNull(className, "className");
150
151                final MethodInvocationRequest methodInvocationRequest = MethodInvocationRequest.forConstructorInvocation(
152                                className, argumentTypeNames, arguments);
153
154                return invoke(methodInvocationRequest);
155        }
156
157        @Override
158        public <T> T invoke(final Object object, final String methodName, final Object ... arguments) {
159                requireNonNull(object, "object");
160                requireNonNull(methodName, "methodName");
161
162                if (!(object instanceof RemoteObjectProxy))
163                        throw new IllegalArgumentException("object is not an instance of RemoteObjectProxy!");
164
165                return invoke(object, methodName, (Class<?>[]) null, arguments);
166        }
167
168        @Override
169        public <T> T invoke(final Object object, final String methodName, final Class<?>[] argumentTypes, final Object... arguments) {
170                requireNonNull(object, "object");
171                requireNonNull(methodName, "methodName");
172                return invoke(object, methodName, toClassNames(argumentTypes), arguments);
173        }
174
175        @Override
176        public <T> T invoke(final Object object, final String methodName, final String[] argumentTypeNames, final Object... arguments) {
177                requireNonNull(object, "object");
178                requireNonNull(methodName, "methodName");
179
180                final MethodInvocationRequest methodInvocationRequest = MethodInvocationRequest.forObjectInvocation(
181                                object, methodName, argumentTypeNames, arguments);
182
183                return invoke(methodInvocationRequest);
184        }
185
186        private <T> T invoke(final MethodInvocationRequest methodInvocationRequest) {
187                requireNonNull(methodInvocationRequest, "methodInvocationRequest");
188
189                InverseMethodInvocationResponse inverseMethodInvocationResponse = performInverseServiceRequest(
190                                new InverseMethodInvocationRequest(methodInvocationRequest));
191
192                requireNonNull(inverseMethodInvocationResponse, "inverseMethodInvocationResponse");
193
194                MethodInvocationResponse methodInvocationResponse = inverseMethodInvocationResponse.getMethodInvocationResponse();
195
196                while (methodInvocationResponse instanceof DelayedMethodInvocationResponse) {
197                        final DelayedMethodInvocationResponse dmir = (DelayedMethodInvocationResponse) methodInvocationResponse;
198                        final Uid delayedResponseId = dmir.getDelayedResponseId();
199
200                        inverseMethodInvocationResponse = performInverseServiceRequest(
201                                        new InverseMethodInvocationRequest(delayedResponseId));
202
203                        requireNonNull(inverseMethodInvocationResponse, "inverseMethodInvocationResponse");
204
205                        methodInvocationResponse = inverseMethodInvocationResponse.getMethodInvocationResponse();
206                }
207
208                final Object result = methodInvocationResponse.getResult();
209                return cast(result);
210        }
211
212        @Override
213        public void incRefCount(final ObjectRef objectRef, final Uid refId) {
214                incDecRefCountQueue.incRefCount(objectRef, refId);
215        }
216
217        @Override
218        public void decRefCount(final ObjectRef objectRef, final Uid refId) {
219                incDecRefCountQueue.decRefCount(objectRef, refId);
220        }
221
222        private String[] toClassNames(Class<?> ... classes) {
223                final String[] classNames;
224                if (classes == null)
225                        classNames = null;
226                else {
227                        classNames = new String[classes.length];
228                        for (int i = 0; i < classes.length; i++)
229                                classNames[i] = classes[i].getName();
230                }
231                return classNames;
232        }
233
234        public Object getRemoteObjectProxyOrCreate(ObjectRef objectRef) {
235                return objectManager.getRemoteObjectProxyManager().getRemoteObjectProxyOrCreate(objectRef, new RemoteObjectProxyFactory() {
236                        @Override
237                        public RemoteObjectProxy createRemoteObjectProxy(ObjectRef objectRef) {
238                                return _createRemoteObjectProxy(objectRef);
239                        }
240                });
241        }
242
243        private RemoteObjectProxy _createRemoteObjectProxy(final ObjectRef objectRef) {
244                final Class<?>[] interfaces = getInterfaces(objectRef);
245
246                final ClassLoader classLoader = this.getClass().getClassLoader();
247                return (RemoteObjectProxy) Proxy.newProxyInstance(classLoader, interfaces,
248                                new RemoteObjectProxyInvocationHandler(this, objectRef));
249        }
250
251        private Class<?>[] getInterfaces(final ObjectRef objectRef) {
252                ClassInfo classInfo = classInfoMap.getClassInfo(objectRef.getClassId());
253                if (classInfo == null) {
254                        classInfo = objectRef.getClassInfo();
255                        if (classInfo == null)
256                                throw new IllegalStateException("There is no ClassInfo in the ClassInfoMap and neither in the ObjectRef! " + objectRef);
257
258                        classInfoMap.putClassInfo(classInfo);
259                        objectRef.setClassInfo(null);
260                }
261
262                final ClassManager classManager = objectManager.getClassManager();
263                final Set<String> interfaceNames = classInfo.getInterfaceNames();
264                final List<Class<?>> interfaces = new ArrayList<>(interfaceNames.size() + 1);
265                for (final String interfaceName : interfaceNames) {
266                        Class<?> iface = null;
267                        try {
268                                iface = classManager.getClassOrFail(interfaceName);
269                        } catch (RuntimeException x) {
270                                if (ExceptionUtil.getCause(x, ClassNotFoundException.class) == null)
271                                        throw x;
272                        }
273                        if (iface != null)
274                                interfaces.add(iface);
275                }
276                interfaces.add(RemoteObjectProxy.class);
277                return interfaces.toArray(new Class<?>[interfaces.size()]);
278        }
279
280        /**
281         * Invokes a service on the client-side.
282         * <p>
283         * Normally, a client initiates a request-response-cycle by sending a request to a server-side service and waiting
284         * for the response. In order to notify client-side listeners, we need the inverse, though: the server must invoke
285         * a service on the client-side. This can be easily done by sending an implementation of {@code InverseServiceRequest}
286         * to a {@code InverseServiceRequestHandler} implementation on the client-side using this method.
287         *
288         * @param request the request to be processed on the client-side. Must not be <code>null</code>.
289         * @return the response created and sent back by the client-side {@code InverseServiceRequestHandler}.
290         * @throws TimeoutException if this method did not receive a response within the timeout
291         * {@link #PERFORM_INVERSE_SERVICE_REQUEST_TIMEOUT_MS}.
292         */
293        public <T extends InverseServiceResponse> T performInverseServiceRequest(final InverseServiceRequest request) throws TimeoutException {
294                requireNonNull(request, "request");
295
296                if (diedOfTimeout)
297                        throw new IllegalStateException(String.format("InverseInvoker[%s] died of timeout, already!", objectManager.getClientId()));
298
299                final Uid requestId = request.getRequestId();
300                requireNonNull(requestId, "request.requestId");
301
302                synchronized (requestId2InverseServiceResponse) {
303                        if (!requestIdsWaitingForResponse.add(requestId))
304                                throw new IllegalStateException("requestId already queued: " + requestId);
305                }
306                try {
307                        synchronized (inverseServiceRequests) {
308                                inverseServiceRequests.add(request);
309                                inverseServiceRequests.notify();
310                        }
311
312                        // The request is pushed, hence from now on, we wait for the response until the timeout in PERFORM_INVERSE_SERVICE_REQUEST_TIMEOUT_MS.
313                        final long startTimestamp = System.currentTimeMillis();
314
315                        synchronized (requestId2InverseServiceResponse) {
316                                boolean first = true;
317                                while (first || System.currentTimeMillis() - startTimestamp < PERFORM_INVERSE_SERVICE_REQUEST_TIMEOUT_MS) {
318                                        if (first)
319                                                first = false;
320                                        else {
321                                                final long timeSpentTillNowMillis = System.currentTimeMillis() - startTimestamp;
322                                                final long waitTimeout = PERFORM_INVERSE_SERVICE_REQUEST_TIMEOUT_MS - timeSpentTillNowMillis;
323                                                if (waitTimeout > 0) {
324                                                        try {
325                                                                requestId2InverseServiceResponse.wait(waitTimeout);
326                                                        } catch (InterruptedException e) {
327                                                                doNothing();
328                                                        }
329                                                }
330                                        }
331
332                                        final InverseServiceResponse response = requestId2InverseServiceResponse.remove(requestId);
333                                        if (response != null) {
334                                                if (response instanceof NullResponse)
335                                                        return null;
336                                                else if (response instanceof ErrorResponse) {
337                                                        final Error error = ((ErrorResponse) response).getError();
338                                                        RemoteExceptionUtil.throwOriginalExceptionIfPossible(error);
339                                                        throw new RemoteException(error);
340                                                }
341                                                else {
342                                                        @SuppressWarnings("unchecked")
343                                                        final T t = (T) response;
344                                                        return t;
345                                                }
346                                        }
347                                }
348                        }
349                } finally {
350                        boolean requestWasStillWaiting;
351                        // in case, it was not yet polled, we make sure garbage does not pile up.
352                        synchronized (requestId2InverseServiceResponse) {
353                                requestWasStillWaiting = requestIdsWaitingForResponse.remove(requestId);
354
355                                // Make sure, no garbage is left over by removing this together with the requestId from requestIdsWaitingForResponse.
356                                requestId2InverseServiceResponse.remove(requestId);
357                        }
358
359                        if (requestWasStillWaiting) {
360                                synchronized (inverseServiceRequests) {
361                                        inverseServiceRequests.remove(request);
362                                }
363                        }
364                }
365
366                if (request.isTimeoutDeadly())
367                        diedOfTimeout = true;
368
369                throw new TimeoutException(String.format("InverseInvoker[%s] encountered timeout while waiting for response matching requestId=%s! diedOfTimeout=%s",
370                                objectManager.getClientId(), requestId, diedOfTimeout));
371        }
372
373        public InverseServiceRequest pollInverseServiceRequest() {
374                final long startTimestamp = System.currentTimeMillis();
375
376                synchronized (inverseServiceRequests) {
377                        boolean first = true;
378                        while (first || System.currentTimeMillis() - startTimestamp < POLL_INVERSE_SERVICE_REQUEST_TIMEOUT_MS) {
379                                if (first)
380                                        first = false;
381                                else {
382                                        final long timeSpentTillNowMillis = System.currentTimeMillis() - startTimestamp;
383                                        final long waitTimeout = POLL_INVERSE_SERVICE_REQUEST_TIMEOUT_MS - timeSpentTillNowMillis;
384                                        if (waitTimeout > 0) {
385                                                try {
386                                                        inverseServiceRequests.wait(waitTimeout);
387                                                } catch (InterruptedException e) {
388                                                        doNothing();
389                                                }
390                                        }
391                                }
392
393                                final InverseServiceRequest request = inverseServiceRequests.poll();
394                                if (request != null)
395                                        return request;
396                        };
397                }
398                return null;
399        }
400
401        public void pushInverseServiceResponse(final InverseServiceResponse response) {
402                requireNonNull(response, "response");
403
404                final Uid requestId = response.getRequestId();
405                requireNonNull(requestId, "response.requestId");
406
407                synchronized (requestId2InverseServiceResponse) {
408                        if (!requestIdsWaitingForResponse.contains(requestId))
409                                throw new IllegalArgumentException(String.format("response.requestId=%s does not match any waiting request!", requestId));
410
411                        requestId2InverseServiceResponse.put(requestId, response);
412                        requestId2InverseServiceResponse.notifyAll();
413                }
414        }
415
416        @Override
417        public ClassInfoMap getClassInfoMap() {
418                return classInfoMap;
419        }
420}