001package co.codewizards.cloudstore.core.concurrent;
002
003import static co.codewizards.cloudstore.core.util.Util.*;
004
005import java.lang.ref.WeakReference;
006import java.util.Collections;
007import java.util.Date;
008import java.util.HashMap;
009import java.util.LinkedList;
010import java.util.List;
011import java.util.Map;
012import java.util.Timer;
013import java.util.TimerTask;
014import java.util.WeakHashMap;
015import java.util.concurrent.Callable;
016import java.util.concurrent.ExecutorService;
017import java.util.concurrent.Executors;
018import java.util.concurrent.Future;
019import java.util.concurrent.TimeUnit;
020import java.util.concurrent.TimeoutException;
021
022import org.slf4j.Logger;
023import org.slf4j.LoggerFactory;
024
025import co.codewizards.cloudstore.core.config.Config;
026
027public class DeferrableExecutor {
028        private static final Logger logger = LoggerFactory.getLogger(DeferrableExecutor.class);
029
030        /**
031         * The {@code key} for the timeout used with {@link Config#getPropertyAsInt(String, int)}.
032         * <p>
033         * The configuration can be overridden by a system property - see {@link Config#SYSTEM_PROPERTY_PREFIX}.
034         */
035        public static final String CONFIG_KEY_TIMEOUT = "deferrableExecutor.timeout"; //$NON-NLS-1$
036
037        private static final int DEFAULT_TIMEOUT = 60 * 1000;
038
039        /**
040         * The {@code key} for the expiry period used with {@link Config#getPropertyAsInt(String, int)}.
041         * <p>
042         * The configuration can be overridden by a system property - see {@link Config#SYSTEM_PROPERTY_PREFIX}.
043         */
044        public static final String CONFIG_KEY_EXPIRY_PERIOD = "deferrableExecutor.expiryPeriod"; //$NON-NLS-1$
045
046        private static final int DEFAULT_EXPIRY_PERIOD = 60 * 60 * 1000;
047
048        private final Map<String, WeakReference<String>> canonicalCallIdentifierMap = new WeakHashMap<String, WeakReference<String>>();
049        private final Map<String, Future<?>> callIdentifier2Future = Collections.synchronizedMap(new HashMap<String, Future<?>>());
050        private final Map<String, Date> callIdentifier2DoneDate = Collections.synchronizedMap(new WeakHashMap<String, Date>());
051        private final ExecutorService executorService = Executors.newCachedThreadPool();
052        private final Timer cleanUpExpiredEntriesTimer = new Timer("cleanUpExpiredEntriesTimer", true);
053        private TimerTask cleanUpExpiredEntriesTimerTask;
054        private int lastExpiryPeriod;
055
056        private DeferrableExecutor() { }
057
058        private static final class RunnableWithProgressExecutorHolder {
059                private static final DeferrableExecutor instance = new DeferrableExecutor();
060        }
061
062        public static DeferrableExecutor getInstance() {
063                return RunnableWithProgressExecutorHolder.instance;
064        }
065
066        // TODO maybe we should make it possible to pass the timeout from the client, because
067        // the client knows its socket's read-timeout.
068        @SuppressWarnings("unchecked")
069        public <V> V call(String callIdentifier, final CallableProvider<V> callableProvider) throws DeferredCompletionException, ExecutionException {
070                assertNotNull("callIdentifier", callIdentifier);
071                assertNotNull("callableProvider", callableProvider);
072
073                final int timeout = Config.getInstance().getPropertyAsPositiveOrZeroInt(CONFIG_KEY_TIMEOUT, DEFAULT_TIMEOUT);
074
075                cleanUpExpiredEntries();
076                callIdentifier = canonicalizeCallIdentifier(callIdentifier);
077                synchronized (callIdentifier) {
078                        Future<?> future = callIdentifier2Future.get(callIdentifier);
079                        if (future == null) {
080                                Callable<V> callable = callableProvider.getCallable();
081                                future = executorService.submit(new CallableWrapper<V>(callIdentifier, callable));
082                                callIdentifier2Future.put(callIdentifier, future);
083                        }
084
085                        Object result;
086                        try {
087                                result = future.get(timeout, TimeUnit.MILLISECONDS);
088                        } catch (InterruptedException e) {
089                                throw new DeferredCompletionException(e);
090                        } catch (TimeoutException e) {
091                                throw new DeferredCompletionException(e);
092                        } catch (java.util.concurrent.ExecutionException e) {
093                                callIdentifier2Future.remove(callIdentifier); // remove in case of failure (keep while still running)
094                                throw new ExecutionException(e);
095                        }
096
097                        callIdentifier2Future.remove(callIdentifier); // remove in case of successful completion (keep while still running)
098                        return (V) result;
099                }
100        }
101
102        private class CallableWrapper<V> implements Callable<V> {
103                private final String identifier;
104                private final Callable<V> delegate;
105
106                public CallableWrapper(String identifier, Callable<V> delegate) {
107                        this.identifier = assertNotNull("identifier", identifier);
108                        this.delegate = assertNotNull("delegate", delegate);
109                }
110
111                @Override
112                public V call() throws Exception {
113                        try {
114                                return delegate.call();
115                        } finally {
116                                callIdentifier2DoneDate.put(identifier, new Date());
117                        }
118                }
119
120        }
121
122        private String canonicalizeCallIdentifier(String callIdentifier) {
123                synchronized (canonicalCallIdentifierMap) {
124                        WeakReference<String> ref = canonicalCallIdentifierMap.get(callIdentifier);
125                        String ci = ref == null ? null : ref.get();
126                        if (ci == null) {
127                                ci = callIdentifier;
128                                canonicalCallIdentifierMap.put(ci, new WeakReference<String>(ci));
129                        }
130                        return ci;
131                }
132        }
133
134        private void cleanUpExpiredEntries() {
135                rescheduleExpiredEntriesTimerTaskIfExpiryPeriodChanged();
136
137                List<String> expiredCallIdentifiers = new LinkedList<String>();
138                Date expireDoneBeforeDate = new Date(System.currentTimeMillis() - getExpiryPeriod());
139                synchronized (callIdentifier2DoneDate) {
140                        for (Map.Entry<String, Date> me : callIdentifier2DoneDate.entrySet()) {
141                                if (me.getValue().before(expireDoneBeforeDate))
142                                        expiredCallIdentifiers.add(me.getKey());
143                        }
144                }
145                for (String callIdentifier : expiredCallIdentifiers) {
146                        synchronized (callIdentifier) {
147                                callIdentifier2Future.remove(callIdentifier);
148                                callIdentifier2DoneDate.remove(callIdentifier);
149                        }
150                }
151        }
152
153        private void rescheduleExpiredEntriesTimerTaskIfExpiryPeriodChanged() {
154                synchronized (cleanUpExpiredEntriesTimer) {
155                        final int expiryPeriod = getExpiryPeriod();
156                        if (cleanUpExpiredEntriesTimerTask == null || lastExpiryPeriod != expiryPeriod) {
157                                if (cleanUpExpiredEntriesTimerTask != null)
158                                        cleanUpExpiredEntriesTimerTask.cancel();
159
160                                scheduleExpiredEntriesTimerTask();
161                        }
162                }
163        }
164
165        private void scheduleExpiredEntriesTimerTask() {
166                synchronized (cleanUpExpiredEntriesTimer) {
167                        final int expiryPeriod = getExpiryPeriod();
168                        lastExpiryPeriod = expiryPeriod;
169
170                        cleanUpExpiredEntriesTimerTask = new TimerTask() {
171                                @Override
172                                public void run() {
173                                        cleanUpExpiredEntries();
174                                }
175                        };
176
177                        cleanUpExpiredEntriesTimer.schedule(cleanUpExpiredEntriesTimerTask, expiryPeriod / 2, expiryPeriod / 2);
178                }
179        }
180
181        private int getExpiryPeriod() {
182                return Config.getInstance().getPropertyAsPositiveOrZeroInt(CONFIG_KEY_EXPIRY_PERIOD, DEFAULT_EXPIRY_PERIOD);
183        }
184}