001package co.codewizards.cloudstore.ls.rest.server; 002 003import static co.codewizards.cloudstore.core.util.Util.*; 004import static java.util.Objects.*; 005 006import java.lang.reflect.Proxy; 007import java.util.ArrayList; 008import java.util.HashMap; 009import java.util.HashSet; 010import java.util.LinkedList; 011import java.util.List; 012import java.util.Map; 013import java.util.Set; 014 015import co.codewizards.cloudstore.core.Uid; 016import co.codewizards.cloudstore.core.dto.Error; 017import co.codewizards.cloudstore.core.dto.RemoteException; 018import co.codewizards.cloudstore.core.dto.RemoteExceptionUtil; 019import co.codewizards.cloudstore.core.io.TimeoutException; 020import co.codewizards.cloudstore.core.util.ExceptionUtil; 021import co.codewizards.cloudstore.ls.core.dto.ErrorResponse; 022import co.codewizards.cloudstore.ls.core.dto.InverseServiceRequest; 023import co.codewizards.cloudstore.ls.core.dto.InverseServiceResponse; 024import co.codewizards.cloudstore.ls.core.dto.NullResponse; 025import co.codewizards.cloudstore.ls.core.invoke.ClassInfo; 026import co.codewizards.cloudstore.ls.core.invoke.ClassInfoMap; 027import co.codewizards.cloudstore.ls.core.invoke.ClassManager; 028import co.codewizards.cloudstore.ls.core.invoke.DelayedMethodInvocationResponse; 029import co.codewizards.cloudstore.ls.core.invoke.IncDecRefCountQueue; 030import co.codewizards.cloudstore.ls.core.invoke.InverseMethodInvocationRequest; 031import co.codewizards.cloudstore.ls.core.invoke.InverseMethodInvocationResponse; 032import co.codewizards.cloudstore.ls.core.invoke.Invoker; 033import co.codewizards.cloudstore.ls.core.invoke.MethodInvocationRequest; 034import co.codewizards.cloudstore.ls.core.invoke.MethodInvocationResponse; 035import co.codewizards.cloudstore.ls.core.invoke.ObjectManager; 036import co.codewizards.cloudstore.ls.core.invoke.ObjectRef; 037import co.codewizards.cloudstore.ls.core.invoke.RemoteObjectProxy; 038import co.codewizards.cloudstore.ls.core.invoke.RemoteObjectProxyFactory; 039import co.codewizards.cloudstore.ls.core.invoke.RemoteObjectProxyInvocationHandler; 040 041public class InverseInvoker implements Invoker { 042 /** 043 * Timeout (in milliseconds) before sending an empty HTTP response to the polling client. The client does 044 * <i>long polling</i> in order to allow for 045 * {@linkplain #performInverseServiceRequest(InverseServiceRequest) inverse service invocations}. 046 * <p> 047 * This timeout must be (significantly) shorter than {@link ObjectManager#EVICT_UNUSED_OBJECT_MANAGER_TIMEOUT_MS} to make sure, the 048 * {@linkplain #pollInverseServiceRequest() polling} serves additionally as a keep-alive for 049 * the server-side {@code ObjectManager}. 050 */ 051 private static final long POLL_INVERSE_SERVICE_REQUEST_TIMEOUT_MS = 15L * 1000L; // 15 seconds 052 053 /** 054 * Timeout for {@link #performInverseServiceRequest(InverseServiceRequest)}. 055 * <p> 056 * If an inverse service-request does not receive a response within this timeout, a {@link TimeoutException} is thrown. 057 * <p> 058 * Please note, that the {@code invoke*} methods (e.g. {@link #invoke(Object, String, Object...)} or 059 * {@link #invokeConstructor(Class, Object...)}) can take much longer, because the other side will return 060 * a {@link DelayedMethodInvocationResponse} after a much shorter timeout (a few dozen seconds). This allows 061 * the actual method to be invoked to take how long it wants (unlimited!) while at the same time detecting very 062 * quickly, if the other side is dead (this timeout). 063 */ 064 private static final long PERFORM_INVERSE_SERVICE_REQUEST_TIMEOUT_MS = 3L * 60L * 1000L; // 3 minutes is more than enough, because we have DelayedMethodInvocationResponse 065 066 private final IncDecRefCountQueue incDecRefCountQueue = new IncDecRefCountQueue(this); 067 private final ObjectManager objectManager; 068 private final LinkedList<InverseServiceRequest> inverseServiceRequests = new LinkedList<>(); 069 private final Set<Uid> requestIdsWaitingForResponse = new HashSet<Uid>(); // synchronized by: requestId2InverseServiceResponse 070 private final Map<Uid, InverseServiceResponse> requestId2InverseServiceResponse = new HashMap<Uid, InverseServiceResponse>(); 071 private final ClassInfoMap classInfoMap = new ClassInfoMap(); 072 073 private volatile boolean diedOfTimeout; 074 075 public static InverseInvoker getInverseInvoker(final ObjectManager objectManager) { 076 requireNonNull(objectManager, "objectManager"); 077 078 synchronized (objectManager) { 079 InverseInvoker inverseInvoker = (InverseInvoker) objectManager.getContextObject(InverseInvoker.class.getName()); 080 if (inverseInvoker == null) { 081 inverseInvoker = new InverseInvoker(objectManager); 082 objectManager.putContextObject(InverseInvoker.class.getName(), inverseInvoker); 083 } 084 return inverseInvoker; 085 } 086 } 087 088 private InverseInvoker(final ObjectManager objectManager) { 089 this.objectManager = requireNonNull(objectManager, "objectManager"); 090 } 091 092 @Override 093 public ObjectManager getObjectManager() { 094 return objectManager; 095 } 096 097 @Override 098 public <T> T invokeStatic(final Class<?> clazz, final String methodName, final Object ... arguments) { 099 requireNonNull(clazz, "clazz"); 100 requireNonNull(methodName, "methodName"); 101 return invokeStatic(clazz.getName(), methodName, (String[]) null, arguments); 102 } 103 104 @Override 105 public <T> T invokeStatic(final String className, final String methodName, final Object ... arguments) { 106 requireNonNull(className, "className"); 107 requireNonNull(methodName, "methodName"); 108 return invokeStatic(className, methodName, (String[]) null, arguments); 109 } 110 111 @Override 112 public <T> T invokeStatic(final Class<?> clazz, final String methodName, final Class<?>[] argumentTypes, final Object ... arguments) { 113 requireNonNull(clazz, "clazz"); 114 requireNonNull(methodName, "methodName"); 115 return invokeStatic(clazz.getName(), methodName, toClassNames(argumentTypes), arguments); 116 } 117 118 @Override 119 public <T> T invokeStatic(final String className, final String methodName, final String[] argumentTypeNames, final Object ... arguments) { 120 requireNonNull(className, "className"); 121 requireNonNull(methodName, "methodName"); 122 123 final MethodInvocationRequest methodInvocationRequest = MethodInvocationRequest.forStaticInvocation( 124 className, methodName, argumentTypeNames, arguments); 125 126 return invoke(methodInvocationRequest); 127 } 128 129 @Override 130 public <T> T invokeConstructor(final Class<T> clazz, final Object ... arguments) { 131 requireNonNull(clazz, "clazz"); 132 return invokeConstructor(clazz.getName(), (String[]) null, arguments); 133 } 134 135 @Override 136 public <T> T invokeConstructor(final String className, final Object ... arguments) { 137 requireNonNull(className, "className"); 138 return invokeConstructor(className, (String[]) null, arguments); 139 } 140 141 @Override 142 public <T> T invokeConstructor(final Class<T> clazz, final Class<?>[] argumentTypes, final Object ... arguments) { 143 requireNonNull(clazz, "clazz"); 144 return invokeConstructor(clazz.getName(), toClassNames(argumentTypes), arguments); 145 } 146 147 @Override 148 public <T> T invokeConstructor(final String className, final String[] argumentTypeNames, final Object ... arguments) { 149 requireNonNull(className, "className"); 150 151 final MethodInvocationRequest methodInvocationRequest = MethodInvocationRequest.forConstructorInvocation( 152 className, argumentTypeNames, arguments); 153 154 return invoke(methodInvocationRequest); 155 } 156 157 @Override 158 public <T> T invoke(final Object object, final String methodName, final Object ... arguments) { 159 requireNonNull(object, "object"); 160 requireNonNull(methodName, "methodName"); 161 162 if (!(object instanceof RemoteObjectProxy)) 163 throw new IllegalArgumentException("object is not an instance of RemoteObjectProxy!"); 164 165 return invoke(object, methodName, (Class<?>[]) null, arguments); 166 } 167 168 @Override 169 public <T> T invoke(final Object object, final String methodName, final Class<?>[] argumentTypes, final Object... arguments) { 170 requireNonNull(object, "object"); 171 requireNonNull(methodName, "methodName"); 172 return invoke(object, methodName, toClassNames(argumentTypes), arguments); 173 } 174 175 @Override 176 public <T> T invoke(final Object object, final String methodName, final String[] argumentTypeNames, final Object... arguments) { 177 requireNonNull(object, "object"); 178 requireNonNull(methodName, "methodName"); 179 180 final MethodInvocationRequest methodInvocationRequest = MethodInvocationRequest.forObjectInvocation( 181 object, methodName, argumentTypeNames, arguments); 182 183 return invoke(methodInvocationRequest); 184 } 185 186 private <T> T invoke(final MethodInvocationRequest methodInvocationRequest) { 187 requireNonNull(methodInvocationRequest, "methodInvocationRequest"); 188 189 InverseMethodInvocationResponse inverseMethodInvocationResponse = performInverseServiceRequest( 190 new InverseMethodInvocationRequest(methodInvocationRequest)); 191 192 requireNonNull(inverseMethodInvocationResponse, "inverseMethodInvocationResponse"); 193 194 MethodInvocationResponse methodInvocationResponse = inverseMethodInvocationResponse.getMethodInvocationResponse(); 195 196 while (methodInvocationResponse instanceof DelayedMethodInvocationResponse) { 197 final DelayedMethodInvocationResponse dmir = (DelayedMethodInvocationResponse) methodInvocationResponse; 198 final Uid delayedResponseId = dmir.getDelayedResponseId(); 199 200 inverseMethodInvocationResponse = performInverseServiceRequest( 201 new InverseMethodInvocationRequest(delayedResponseId)); 202 203 requireNonNull(inverseMethodInvocationResponse, "inverseMethodInvocationResponse"); 204 205 methodInvocationResponse = inverseMethodInvocationResponse.getMethodInvocationResponse(); 206 } 207 208 final Object result = methodInvocationResponse.getResult(); 209 return cast(result); 210 } 211 212 @Override 213 public void incRefCount(final ObjectRef objectRef, final Uid refId) { 214 incDecRefCountQueue.incRefCount(objectRef, refId); 215 } 216 217 @Override 218 public void decRefCount(final ObjectRef objectRef, final Uid refId) { 219 incDecRefCountQueue.decRefCount(objectRef, refId); 220 } 221 222 private String[] toClassNames(Class<?> ... classes) { 223 final String[] classNames; 224 if (classes == null) 225 classNames = null; 226 else { 227 classNames = new String[classes.length]; 228 for (int i = 0; i < classes.length; i++) 229 classNames[i] = classes[i].getName(); 230 } 231 return classNames; 232 } 233 234 public Object getRemoteObjectProxyOrCreate(ObjectRef objectRef) { 235 return objectManager.getRemoteObjectProxyManager().getRemoteObjectProxyOrCreate(objectRef, new RemoteObjectProxyFactory() { 236 @Override 237 public RemoteObjectProxy createRemoteObjectProxy(ObjectRef objectRef) { 238 return _createRemoteObjectProxy(objectRef); 239 } 240 }); 241 } 242 243 private RemoteObjectProxy _createRemoteObjectProxy(final ObjectRef objectRef) { 244 final Class<?>[] interfaces = getInterfaces(objectRef); 245 246 final ClassLoader classLoader = this.getClass().getClassLoader(); 247 return (RemoteObjectProxy) Proxy.newProxyInstance(classLoader, interfaces, 248 new RemoteObjectProxyInvocationHandler(this, objectRef)); 249 } 250 251 private Class<?>[] getInterfaces(final ObjectRef objectRef) { 252 ClassInfo classInfo = classInfoMap.getClassInfo(objectRef.getClassId()); 253 if (classInfo == null) { 254 classInfo = objectRef.getClassInfo(); 255 if (classInfo == null) 256 throw new IllegalStateException("There is no ClassInfo in the ClassInfoMap and neither in the ObjectRef! " + objectRef); 257 258 classInfoMap.putClassInfo(classInfo); 259 objectRef.setClassInfo(null); 260 } 261 262 final ClassManager classManager = objectManager.getClassManager(); 263 final Set<String> interfaceNames = classInfo.getInterfaceNames(); 264 final List<Class<?>> interfaces = new ArrayList<>(interfaceNames.size() + 1); 265 for (final String interfaceName : interfaceNames) { 266 Class<?> iface = null; 267 try { 268 iface = classManager.getClassOrFail(interfaceName); 269 } catch (RuntimeException x) { 270 if (ExceptionUtil.getCause(x, ClassNotFoundException.class) == null) 271 throw x; 272 } 273 if (iface != null) 274 interfaces.add(iface); 275 } 276 interfaces.add(RemoteObjectProxy.class); 277 return interfaces.toArray(new Class<?>[interfaces.size()]); 278 } 279 280 /** 281 * Invokes a service on the client-side. 282 * <p> 283 * Normally, a client initiates a request-response-cycle by sending a request to a server-side service and waiting 284 * for the response. In order to notify client-side listeners, we need the inverse, though: the server must invoke 285 * a service on the client-side. This can be easily done by sending an implementation of {@code InverseServiceRequest} 286 * to a {@code InverseServiceRequestHandler} implementation on the client-side using this method. 287 * 288 * @param request the request to be processed on the client-side. Must not be <code>null</code>. 289 * @return the response created and sent back by the client-side {@code InverseServiceRequestHandler}. 290 * @throws TimeoutException if this method did not receive a response within the timeout 291 * {@link #PERFORM_INVERSE_SERVICE_REQUEST_TIMEOUT_MS}. 292 */ 293 public <T extends InverseServiceResponse> T performInverseServiceRequest(final InverseServiceRequest request) throws TimeoutException { 294 requireNonNull(request, "request"); 295 296 if (diedOfTimeout) 297 throw new IllegalStateException(String.format("InverseInvoker[%s] died of timeout, already!", objectManager.getClientId())); 298 299 final Uid requestId = request.getRequestId(); 300 requireNonNull(requestId, "request.requestId"); 301 302 synchronized (requestId2InverseServiceResponse) { 303 if (!requestIdsWaitingForResponse.add(requestId)) 304 throw new IllegalStateException("requestId already queued: " + requestId); 305 } 306 try { 307 synchronized (inverseServiceRequests) { 308 inverseServiceRequests.add(request); 309 inverseServiceRequests.notify(); 310 } 311 312 // The request is pushed, hence from now on, we wait for the response until the timeout in PERFORM_INVERSE_SERVICE_REQUEST_TIMEOUT_MS. 313 final long startTimestamp = System.currentTimeMillis(); 314 315 synchronized (requestId2InverseServiceResponse) { 316 boolean first = true; 317 while (first || System.currentTimeMillis() - startTimestamp < PERFORM_INVERSE_SERVICE_REQUEST_TIMEOUT_MS) { 318 if (first) 319 first = false; 320 else { 321 final long timeSpentTillNowMillis = System.currentTimeMillis() - startTimestamp; 322 final long waitTimeout = PERFORM_INVERSE_SERVICE_REQUEST_TIMEOUT_MS - timeSpentTillNowMillis; 323 if (waitTimeout > 0) { 324 try { 325 requestId2InverseServiceResponse.wait(waitTimeout); 326 } catch (InterruptedException e) { 327 doNothing(); 328 } 329 } 330 } 331 332 final InverseServiceResponse response = requestId2InverseServiceResponse.remove(requestId); 333 if (response != null) { 334 if (response instanceof NullResponse) 335 return null; 336 else if (response instanceof ErrorResponse) { 337 final Error error = ((ErrorResponse) response).getError(); 338 RemoteExceptionUtil.throwOriginalExceptionIfPossible(error); 339 throw new RemoteException(error); 340 } 341 else { 342 @SuppressWarnings("unchecked") 343 final T t = (T) response; 344 return t; 345 } 346 } 347 } 348 } 349 } finally { 350 boolean requestWasStillWaiting; 351 // in case, it was not yet polled, we make sure garbage does not pile up. 352 synchronized (requestId2InverseServiceResponse) { 353 requestWasStillWaiting = requestIdsWaitingForResponse.remove(requestId); 354 355 // Make sure, no garbage is left over by removing this together with the requestId from requestIdsWaitingForResponse. 356 requestId2InverseServiceResponse.remove(requestId); 357 } 358 359 if (requestWasStillWaiting) { 360 synchronized (inverseServiceRequests) { 361 inverseServiceRequests.remove(request); 362 } 363 } 364 } 365 366 if (request.isTimeoutDeadly()) 367 diedOfTimeout = true; 368 369 throw new TimeoutException(String.format("InverseInvoker[%s] encountered timeout while waiting for response matching requestId=%s! diedOfTimeout=%s", 370 objectManager.getClientId(), requestId, diedOfTimeout)); 371 } 372 373 public InverseServiceRequest pollInverseServiceRequest() { 374 final long startTimestamp = System.currentTimeMillis(); 375 376 synchronized (inverseServiceRequests) { 377 boolean first = true; 378 while (first || System.currentTimeMillis() - startTimestamp < POLL_INVERSE_SERVICE_REQUEST_TIMEOUT_MS) { 379 if (first) 380 first = false; 381 else { 382 final long timeSpentTillNowMillis = System.currentTimeMillis() - startTimestamp; 383 final long waitTimeout = POLL_INVERSE_SERVICE_REQUEST_TIMEOUT_MS - timeSpentTillNowMillis; 384 if (waitTimeout > 0) { 385 try { 386 inverseServiceRequests.wait(waitTimeout); 387 } catch (InterruptedException e) { 388 doNothing(); 389 } 390 } 391 } 392 393 final InverseServiceRequest request = inverseServiceRequests.poll(); 394 if (request != null) 395 return request; 396 }; 397 } 398 return null; 399 } 400 401 public void pushInverseServiceResponse(final InverseServiceResponse response) { 402 requireNonNull(response, "response"); 403 404 final Uid requestId = response.getRequestId(); 405 requireNonNull(requestId, "response.requestId"); 406 407 synchronized (requestId2InverseServiceResponse) { 408 if (!requestIdsWaitingForResponse.contains(requestId)) 409 throw new IllegalArgumentException(String.format("response.requestId=%s does not match any waiting request!", requestId)); 410 411 requestId2InverseServiceResponse.put(requestId, response); 412 requestId2InverseServiceResponse.notifyAll(); 413 } 414 } 415 416 @Override 417 public ClassInfoMap getClassInfoMap() { 418 return classInfoMap; 419 } 420}