001package co.codewizards.cloudstore.ls.client.handler;
002
003import static co.codewizards.cloudstore.core.util.Util.*;
004import static java.util.Objects.*;
005
006import java.lang.ref.WeakReference;
007import java.util.concurrent.Executor;
008import java.util.concurrent.Executors;
009import java.util.concurrent.atomic.AtomicInteger;
010
011import org.slf4j.Logger;
012import org.slf4j.LoggerFactory;
013
014import co.codewizards.cloudstore.core.Uid;
015import co.codewizards.cloudstore.core.dto.Error;
016import co.codewizards.cloudstore.ls.client.LocalServerClient;
017import co.codewizards.cloudstore.ls.core.dto.ErrorResponse;
018import co.codewizards.cloudstore.ls.core.dto.InverseServiceRequest;
019import co.codewizards.cloudstore.ls.core.dto.InverseServiceResponse;
020import co.codewizards.cloudstore.ls.core.dto.NullResponse;
021import co.codewizards.cloudstore.ls.rest.client.LocalServerRestClient;
022import co.codewizards.cloudstore.ls.rest.client.request.PollInverseServiceRequest;
023import co.codewizards.cloudstore.ls.rest.client.request.PushInverseServiceResponse;
024
025public class InverseServiceRequestHandlerThread extends Thread {
026
027        private static final Logger logger = LoggerFactory.getLogger(InverseServiceRequestHandlerThread.class);
028
029        private static final AtomicInteger nextThreadId = new AtomicInteger();
030        private volatile boolean interrupted;
031
032        private final WeakReference<LocalServerClient> localServerClientRef;
033        private final WeakReference<LocalServerRestClient> localServerRestClientRef;
034        private final InverseServiceRequestHandlerManager inverseServiceRequestHandlerManager = InverseServiceRequestHandlerManager.getInstance();
035        private final Executor executor = Executors.newCachedThreadPool();
036
037        public InverseServiceRequestHandlerThread(final LocalServerClient localServerClient) {
038                this.localServerClientRef = new WeakReference<LocalServerClient>(requireNonNull(localServerClient, "localServerClient"));
039                this.localServerRestClientRef = new WeakReference<LocalServerRestClient>(requireNonNull(localServerClient.getLocalServerRestClient(), "localServerRestClient"));
040                setName(getClass().getSimpleName() + '-' + nextThreadId.getAndIncrement());
041                setDaemon(true);
042        }
043
044        @Override
045        public void interrupt() {
046                // We use our own field instead of isInterrupted() to make absolutely sure, we end the thread. The isInterrupted()
047                // flag may be reset by an InterruptedException, while our flag cannot be reset.
048                interrupted = true;
049                super.interrupt();
050        }
051
052        @Override
053        public boolean isInterrupted() {
054                return interrupted || super.isInterrupted();
055        }
056
057        @Override
058        public void run() {
059                int consecutiveErrorCounter = 0;
060                while (! isInterrupted()) {
061                        try {
062                                final InverseServiceRequest inverseServiceRequest = getLocalServerRestClientOrFail().execute(new PollInverseServiceRequest());
063                                if (inverseServiceRequest != null)
064                                        executor.execute(new HandlerRunnable(inverseServiceRequest));
065
066                                consecutiveErrorCounter = 0;
067                        } catch (Exception x) {
068                                logger.error(x.toString(), x);
069
070                                // Wait a bit before retrying (increasingly longer) in order to prevent the log from filling up too quickly.
071                                // We wait 1 second longer after each consecutive error up to a maximum of 1 minute.
072                                consecutiveErrorCounter = Math.min(60, ++consecutiveErrorCounter);
073                                try { Thread.sleep(consecutiveErrorCounter * 1000L); } catch (Exception y) { doNothing(); }
074                        }
075                }
076        }
077
078        private class HandlerRunnable implements Runnable {
079                private final InverseServiceRequest inverseServiceRequest;
080
081                public HandlerRunnable(final InverseServiceRequest inverseServiceRequest) {
082                        this.inverseServiceRequest = requireNonNull(inverseServiceRequest, "inverseServiceRequest");
083                }
084
085                @Override
086                public void run() {
087                        requireNonNull(inverseServiceRequest, "inverseServiceRequest");
088                        final Uid requestId = inverseServiceRequest.getRequestId();
089                        requireNonNull(requestId, "inverseServiceRequest.requestId");
090
091                        final LocalServerRestClient localServerRestClient = getLocalServerRestClientOrFail();
092
093                        InverseServiceResponse inverseServiceResponse = null;
094                        try {
095                                @SuppressWarnings("unchecked")
096                                final InverseServiceRequestHandler<InverseServiceRequest, InverseServiceResponse> handler = inverseServiceRequestHandlerManager.getInverseServiceRequestHandlerOrFail(inverseServiceRequest);
097
098                                final LocalServerClient localServerClient = getLocalServerClientOrFail();
099
100                                handler.setLocalServerClient(localServerClient);
101
102                                inverseServiceResponse = handler.handle(inverseServiceRequest);
103                                if (inverseServiceResponse == null)
104                                        inverseServiceResponse = new NullResponse(requestId);
105
106                                if (!requestId.equals(inverseServiceResponse.getRequestId()))
107                                        throw new IllegalStateException(String.format("Implementation error in %s: handle(...) returned a response with a requestId different from the request!", handler.getClass().getName()));
108
109                        } catch (final Exception x) {
110                                logger.warn("handleInverseServiceRequest: " + x, x);
111                                final ErrorResponse errorResponse = new ErrorResponse(requestId, new Error(x));
112                                localServerRestClient.execute(new PushInverseServiceResponse(errorResponse));
113                        }
114
115                        // Send this outside of the try-catch, because it might otherwise cause 2 responses for the same requestId to be delivered to the server.
116                        if (inverseServiceResponse != null)
117                                localServerRestClient.execute(new PushInverseServiceResponse(inverseServiceResponse));
118                }
119        }
120
121        private LocalServerClient getLocalServerClientOrFail() {
122                final LocalServerClient localServerClient = localServerClientRef.get();
123                if (localServerClient == null)
124                        throw new IllegalStateException("LocalServerClient already garbage-collected!");
125
126                return localServerClient;
127        }
128
129        private LocalServerRestClient getLocalServerRestClientOrFail() {
130                final LocalServerRestClient localServerRestClient = localServerRestClientRef.get();
131                if (localServerRestClient == null)
132                        throw new IllegalStateException("LocalServerRestClient already garbage-collected!");
133
134                return localServerRestClient;
135        }
136}