001package co.codewizards.cloudstore.ls.client.handler; 002 003import static co.codewizards.cloudstore.core.util.Util.*; 004import static java.util.Objects.*; 005 006import java.lang.ref.WeakReference; 007import java.util.concurrent.Executor; 008import java.util.concurrent.Executors; 009import java.util.concurrent.atomic.AtomicInteger; 010 011import org.slf4j.Logger; 012import org.slf4j.LoggerFactory; 013 014import co.codewizards.cloudstore.core.Uid; 015import co.codewizards.cloudstore.core.dto.Error; 016import co.codewizards.cloudstore.ls.client.LocalServerClient; 017import co.codewizards.cloudstore.ls.core.dto.ErrorResponse; 018import co.codewizards.cloudstore.ls.core.dto.InverseServiceRequest; 019import co.codewizards.cloudstore.ls.core.dto.InverseServiceResponse; 020import co.codewizards.cloudstore.ls.core.dto.NullResponse; 021import co.codewizards.cloudstore.ls.rest.client.LocalServerRestClient; 022import co.codewizards.cloudstore.ls.rest.client.request.PollInverseServiceRequest; 023import co.codewizards.cloudstore.ls.rest.client.request.PushInverseServiceResponse; 024 025public class InverseServiceRequestHandlerThread extends Thread { 026 027 private static final Logger logger = LoggerFactory.getLogger(InverseServiceRequestHandlerThread.class); 028 029 private static final AtomicInteger nextThreadId = new AtomicInteger(); 030 private volatile boolean interrupted; 031 032 private final WeakReference<LocalServerClient> localServerClientRef; 033 private final WeakReference<LocalServerRestClient> localServerRestClientRef; 034 private final InverseServiceRequestHandlerManager inverseServiceRequestHandlerManager = InverseServiceRequestHandlerManager.getInstance(); 035 private final Executor executor = Executors.newCachedThreadPool(); 036 037 public InverseServiceRequestHandlerThread(final LocalServerClient localServerClient) { 038 this.localServerClientRef = new WeakReference<LocalServerClient>(requireNonNull(localServerClient, "localServerClient")); 039 this.localServerRestClientRef = new WeakReference<LocalServerRestClient>(requireNonNull(localServerClient.getLocalServerRestClient(), "localServerRestClient")); 040 setName(getClass().getSimpleName() + '-' + nextThreadId.getAndIncrement()); 041 setDaemon(true); 042 } 043 044 @Override 045 public void interrupt() { 046 // We use our own field instead of isInterrupted() to make absolutely sure, we end the thread. The isInterrupted() 047 // flag may be reset by an InterruptedException, while our flag cannot be reset. 048 interrupted = true; 049 super.interrupt(); 050 } 051 052 @Override 053 public boolean isInterrupted() { 054 return interrupted || super.isInterrupted(); 055 } 056 057 @Override 058 public void run() { 059 int consecutiveErrorCounter = 0; 060 while (! isInterrupted()) { 061 try { 062 final InverseServiceRequest inverseServiceRequest = getLocalServerRestClientOrFail().execute(new PollInverseServiceRequest()); 063 if (inverseServiceRequest != null) 064 executor.execute(new HandlerRunnable(inverseServiceRequest)); 065 066 consecutiveErrorCounter = 0; 067 } catch (Exception x) { 068 logger.error(x.toString(), x); 069 070 // Wait a bit before retrying (increasingly longer) in order to prevent the log from filling up too quickly. 071 // We wait 1 second longer after each consecutive error up to a maximum of 1 minute. 072 consecutiveErrorCounter = Math.min(60, ++consecutiveErrorCounter); 073 try { Thread.sleep(consecutiveErrorCounter * 1000L); } catch (Exception y) { doNothing(); } 074 } 075 } 076 } 077 078 private class HandlerRunnable implements Runnable { 079 private final InverseServiceRequest inverseServiceRequest; 080 081 public HandlerRunnable(final InverseServiceRequest inverseServiceRequest) { 082 this.inverseServiceRequest = requireNonNull(inverseServiceRequest, "inverseServiceRequest"); 083 } 084 085 @Override 086 public void run() { 087 requireNonNull(inverseServiceRequest, "inverseServiceRequest"); 088 final Uid requestId = inverseServiceRequest.getRequestId(); 089 requireNonNull(requestId, "inverseServiceRequest.requestId"); 090 091 final LocalServerRestClient localServerRestClient = getLocalServerRestClientOrFail(); 092 093 InverseServiceResponse inverseServiceResponse = null; 094 try { 095 @SuppressWarnings("unchecked") 096 final InverseServiceRequestHandler<InverseServiceRequest, InverseServiceResponse> handler = inverseServiceRequestHandlerManager.getInverseServiceRequestHandlerOrFail(inverseServiceRequest); 097 098 final LocalServerClient localServerClient = getLocalServerClientOrFail(); 099 100 handler.setLocalServerClient(localServerClient); 101 102 inverseServiceResponse = handler.handle(inverseServiceRequest); 103 if (inverseServiceResponse == null) 104 inverseServiceResponse = new NullResponse(requestId); 105 106 if (!requestId.equals(inverseServiceResponse.getRequestId())) 107 throw new IllegalStateException(String.format("Implementation error in %s: handle(...) returned a response with a requestId different from the request!", handler.getClass().getName())); 108 109 } catch (final Exception x) { 110 logger.warn("handleInverseServiceRequest: " + x, x); 111 final ErrorResponse errorResponse = new ErrorResponse(requestId, new Error(x)); 112 localServerRestClient.execute(new PushInverseServiceResponse(errorResponse)); 113 } 114 115 // Send this outside of the try-catch, because it might otherwise cause 2 responses for the same requestId to be delivered to the server. 116 if (inverseServiceResponse != null) 117 localServerRestClient.execute(new PushInverseServiceResponse(inverseServiceResponse)); 118 } 119 } 120 121 private LocalServerClient getLocalServerClientOrFail() { 122 final LocalServerClient localServerClient = localServerClientRef.get(); 123 if (localServerClient == null) 124 throw new IllegalStateException("LocalServerClient already garbage-collected!"); 125 126 return localServerClient; 127 } 128 129 private LocalServerRestClient getLocalServerRestClientOrFail() { 130 final LocalServerRestClient localServerRestClient = localServerRestClientRef.get(); 131 if (localServerRestClient == null) 132 throw new IllegalStateException("LocalServerRestClient already garbage-collected!"); 133 134 return localServerRestClient; 135 } 136}