001package co.codewizards.cloudstore.core.concurrent;
002
003import static java.util.Objects.*;
004
005import java.lang.ref.WeakReference;
006import java.util.Collections;
007import java.util.Date;
008import java.util.HashMap;
009import java.util.LinkedList;
010import java.util.List;
011import java.util.Map;
012import java.util.Timer;
013import java.util.TimerTask;
014import java.util.WeakHashMap;
015import java.util.concurrent.Callable;
016import java.util.concurrent.ExecutorService;
017import java.util.concurrent.Executors;
018import java.util.concurrent.Future;
019import java.util.concurrent.TimeUnit;
020import java.util.concurrent.TimeoutException;
021
022import org.slf4j.Logger;
023import org.slf4j.LoggerFactory;
024
025import co.codewizards.cloudstore.core.config.Config;
026import co.codewizards.cloudstore.core.config.ConfigImpl;
027
028public class DeferrableExecutor {
029        private static final Logger logger = LoggerFactory.getLogger(DeferrableExecutor.class);
030
031        /**
032         * The {@code key} for the timeout used with {@link Config#getPropertyAsInt(String, int)}.
033         * <p>
034         * The configuration can be overridden by a system property - see {@link Config#SYSTEM_PROPERTY_PREFIX}.
035         */
036        public static final String CONFIG_KEY_TIMEOUT = "deferrableExecutor.timeout"; //$NON-NLS-1$
037
038        private static final int DEFAULT_TIMEOUT = 60 * 1000;
039
040        /**
041         * The {@code key} for the expiry period used with {@link Config#getPropertyAsInt(String, int)}.
042         * <p>
043         * The configuration can be overridden by a system property - see {@link Config#SYSTEM_PROPERTY_PREFIX}.
044         */
045        public static final String CONFIG_KEY_EXPIRY_PERIOD = "deferrableExecutor.expiryPeriod"; //$NON-NLS-1$
046
047        private static final int DEFAULT_EXPIRY_PERIOD = 60 * 60 * 1000;
048
049        private final Map<String, WeakReference<String>> canonicalCallIdentifierMap = new WeakHashMap<String, WeakReference<String>>();
050        private final Map<String, Future<?>> callIdentifier2Future = Collections.synchronizedMap(new HashMap<String, Future<?>>());
051        private final Map<String, Date> callIdentifier2DoneDate = Collections.synchronizedMap(new WeakHashMap<String, Date>());
052        private final ExecutorService executorService = Executors.newCachedThreadPool();
053        private final Timer cleanUpExpiredEntriesTimer = new Timer("cleanUpExpiredEntriesTimer", true);
054        private TimerTask cleanUpExpiredEntriesTimerTask;
055        private int lastExpiryPeriod;
056
057        private DeferrableExecutor() { }
058
059        private static final class RunnableWithProgressExecutorHolder {
060                private static final DeferrableExecutor instance = new DeferrableExecutor();
061        }
062
063        public static DeferrableExecutor getInstance() {
064                return RunnableWithProgressExecutorHolder.instance;
065        }
066
067        // TODO maybe we should make it possible to pass the timeout from the client, because
068        // the client knows its socket's read-timeout.
069        @SuppressWarnings("unchecked")
070        public <V> V call(String callIdentifier, final CallableProvider<V> callableProvider) throws DeferredCompletionException, ExecutionException {
071                requireNonNull(callIdentifier, "callIdentifier");
072                requireNonNull(callableProvider, "callableProvider");
073
074                final int timeout = ConfigImpl.getInstance().getPropertyAsPositiveOrZeroInt(CONFIG_KEY_TIMEOUT, DEFAULT_TIMEOUT);
075
076                cleanUpExpiredEntries();
077                callIdentifier = canonicalizeCallIdentifier(callIdentifier);
078                synchronized (callIdentifier) {
079                        Future<?> future = callIdentifier2Future.get(callIdentifier);
080                        if (future == null) {
081                                Callable<V> callable = callableProvider.getCallable();
082                                future = executorService.submit(new CallableWrapper<V>(callIdentifier, callable));
083                                callIdentifier2Future.put(callIdentifier, future);
084                        }
085
086                        Object result;
087                        try {
088                                result = future.get(timeout, TimeUnit.MILLISECONDS);
089                        } catch (InterruptedException e) {
090                                throw new DeferredCompletionException(e);
091                        } catch (TimeoutException e) {
092                                throw new DeferredCompletionException(e);
093                        } catch (java.util.concurrent.ExecutionException e) {
094                                callIdentifier2Future.remove(callIdentifier); // remove in case of failure (keep while still running)
095                                throw new ExecutionException(e);
096                        }
097
098                        callIdentifier2Future.remove(callIdentifier); // remove in case of successful completion (keep while still running)
099                        return (V) result;
100                }
101        }
102
103        private class CallableWrapper<V> implements Callable<V> {
104                private final String identifier;
105                private final Callable<V> delegate;
106
107                public CallableWrapper(String identifier, Callable<V> delegate) {
108                        this.identifier = requireNonNull(identifier, "identifier");
109                        this.delegate = requireNonNull(delegate, "delegate");
110                }
111
112                @Override
113                public V call() throws Exception {
114                        try {
115                                return delegate.call();
116                        } finally {
117                                callIdentifier2DoneDate.put(identifier, new Date());
118                        }
119                }
120
121        }
122
123        private String canonicalizeCallIdentifier(String callIdentifier) {
124                synchronized (canonicalCallIdentifierMap) {
125                        WeakReference<String> ref = canonicalCallIdentifierMap.get(callIdentifier);
126                        String ci = ref == null ? null : ref.get();
127                        if (ci == null) {
128                                ci = callIdentifier;
129                                canonicalCallIdentifierMap.put(ci, new WeakReference<String>(ci));
130                        }
131                        return ci;
132                }
133        }
134
135        private void cleanUpExpiredEntries() {
136                rescheduleExpiredEntriesTimerTaskIfExpiryPeriodChanged();
137
138                List<String> expiredCallIdentifiers = new LinkedList<String>();
139                Date expireDoneBeforeDate = new Date(System.currentTimeMillis() - getExpiryPeriod());
140                synchronized (callIdentifier2DoneDate) {
141                        for (Map.Entry<String, Date> me : callIdentifier2DoneDate.entrySet()) {
142                                if (me.getValue().before(expireDoneBeforeDate))
143                                        expiredCallIdentifiers.add(me.getKey());
144                        }
145                }
146                for (String callIdentifier : expiredCallIdentifiers) {
147                        synchronized (callIdentifier) {
148                                callIdentifier2Future.remove(callIdentifier);
149                                callIdentifier2DoneDate.remove(callIdentifier);
150                        }
151                }
152        }
153
154        private void rescheduleExpiredEntriesTimerTaskIfExpiryPeriodChanged() {
155                synchronized (cleanUpExpiredEntriesTimer) {
156                        final int expiryPeriod = getExpiryPeriod();
157                        if (cleanUpExpiredEntriesTimerTask == null || lastExpiryPeriod != expiryPeriod) {
158                                if (cleanUpExpiredEntriesTimerTask != null)
159                                        cleanUpExpiredEntriesTimerTask.cancel();
160
161                                scheduleExpiredEntriesTimerTask();
162                        }
163                }
164        }
165
166        private void scheduleExpiredEntriesTimerTask() {
167                synchronized (cleanUpExpiredEntriesTimer) {
168                        final int expiryPeriod = getExpiryPeriod();
169                        lastExpiryPeriod = expiryPeriod;
170
171                        cleanUpExpiredEntriesTimerTask = new TimerTask() {
172                                @Override
173                                public void run() {
174                                        cleanUpExpiredEntries();
175                                }
176                        };
177
178                        cleanUpExpiredEntriesTimer.schedule(cleanUpExpiredEntriesTimerTask, expiryPeriod / 2, expiryPeriod / 2);
179                }
180        }
181
182        private int getExpiryPeriod() {
183                return ConfigImpl.getInstance().getPropertyAsPositiveOrZeroInt(CONFIG_KEY_EXPIRY_PERIOD, DEFAULT_EXPIRY_PERIOD);
184        }
185}