001package co.codewizards.cloudstore.core.concurrent; 002 003import static co.codewizards.cloudstore.core.util.Util.*; 004 005import java.lang.ref.WeakReference; 006import java.util.Collections; 007import java.util.Date; 008import java.util.HashMap; 009import java.util.LinkedList; 010import java.util.List; 011import java.util.Map; 012import java.util.Timer; 013import java.util.TimerTask; 014import java.util.WeakHashMap; 015import java.util.concurrent.Callable; 016import java.util.concurrent.ExecutorService; 017import java.util.concurrent.Executors; 018import java.util.concurrent.Future; 019import java.util.concurrent.TimeUnit; 020import java.util.concurrent.TimeoutException; 021 022import org.slf4j.Logger; 023import org.slf4j.LoggerFactory; 024 025import co.codewizards.cloudstore.core.config.Config; 026 027public class DeferrableExecutor { 028 private static final Logger logger = LoggerFactory.getLogger(DeferrableExecutor.class); 029 030 /** 031 * The {@code key} for the timeout used with {@link Config#getPropertyAsInt(String, int)}. 032 * <p> 033 * The configuration can be overridden by a system property - see {@link Config#SYSTEM_PROPERTY_PREFIX}. 034 */ 035 public static final String CONFIG_KEY_TIMEOUT = "deferrableExecutor.timeout"; //$NON-NLS-1$ 036 037 private static final int DEFAULT_TIMEOUT = 60 * 1000; 038 039 /** 040 * The {@code key} for the expiry period used with {@link Config#getPropertyAsInt(String, int)}. 041 * <p> 042 * The configuration can be overridden by a system property - see {@link Config#SYSTEM_PROPERTY_PREFIX}. 043 */ 044 public static final String CONFIG_KEY_EXPIRY_PERIOD = "deferrableExecutor.expiryPeriod"; //$NON-NLS-1$ 045 046 private static final int DEFAULT_EXPIRY_PERIOD = 60 * 60 * 1000; 047 048 private final Map<String, WeakReference<String>> canonicalCallIdentifierMap = new WeakHashMap<String, WeakReference<String>>(); 049 private final Map<String, Future<?>> callIdentifier2Future = Collections.synchronizedMap(new HashMap<String, Future<?>>()); 050 private final Map<String, Date> callIdentifier2DoneDate = Collections.synchronizedMap(new WeakHashMap<String, Date>()); 051 private final ExecutorService executorService = Executors.newCachedThreadPool(); 052 private final Timer cleanUpExpiredEntriesTimer = new Timer("cleanUpExpiredEntriesTimer", true); 053 private TimerTask cleanUpExpiredEntriesTimerTask; 054 private int lastExpiryPeriod; 055 056 private DeferrableExecutor() { } 057 058 private static final class RunnableWithProgressExecutorHolder { 059 private static final DeferrableExecutor instance = new DeferrableExecutor(); 060 } 061 062 public static DeferrableExecutor getInstance() { 063 return RunnableWithProgressExecutorHolder.instance; 064 } 065 066 // TODO maybe we should make it possible to pass the timeout from the client, because 067 // the client knows its socket's read-timeout. 068 @SuppressWarnings("unchecked") 069 public <V> V call(String callIdentifier, final CallableProvider<V> callableProvider) throws DeferredCompletionException, ExecutionException { 070 assertNotNull("callIdentifier", callIdentifier); 071 assertNotNull("callableProvider", callableProvider); 072 073 final int timeout = Config.getInstance().getPropertyAsPositiveOrZeroInt(CONFIG_KEY_TIMEOUT, DEFAULT_TIMEOUT); 074 075 cleanUpExpiredEntries(); 076 callIdentifier = canonicalizeCallIdentifier(callIdentifier); 077 synchronized (callIdentifier) { 078 Future<?> future = callIdentifier2Future.get(callIdentifier); 079 if (future == null) { 080 Callable<V> callable = callableProvider.getCallable(); 081 future = executorService.submit(new CallableWrapper<V>(callIdentifier, callable)); 082 callIdentifier2Future.put(callIdentifier, future); 083 } 084 085 Object result; 086 try { 087 result = future.get(timeout, TimeUnit.MILLISECONDS); 088 } catch (InterruptedException e) { 089 throw new DeferredCompletionException(e); 090 } catch (TimeoutException e) { 091 throw new DeferredCompletionException(e); 092 } catch (java.util.concurrent.ExecutionException e) { 093 callIdentifier2Future.remove(callIdentifier); // remove in case of failure (keep while still running) 094 throw new ExecutionException(e); 095 } 096 097 callIdentifier2Future.remove(callIdentifier); // remove in case of successful completion (keep while still running) 098 return (V) result; 099 } 100 } 101 102 private class CallableWrapper<V> implements Callable<V> { 103 private final String identifier; 104 private final Callable<V> delegate; 105 106 public CallableWrapper(String identifier, Callable<V> delegate) { 107 this.identifier = assertNotNull("identifier", identifier); 108 this.delegate = assertNotNull("delegate", delegate); 109 } 110 111 @Override 112 public V call() throws Exception { 113 try { 114 return delegate.call(); 115 } finally { 116 callIdentifier2DoneDate.put(identifier, new Date()); 117 } 118 } 119 120 } 121 122 private String canonicalizeCallIdentifier(String callIdentifier) { 123 synchronized (canonicalCallIdentifierMap) { 124 WeakReference<String> ref = canonicalCallIdentifierMap.get(callIdentifier); 125 String ci = ref == null ? null : ref.get(); 126 if (ci == null) { 127 ci = callIdentifier; 128 canonicalCallIdentifierMap.put(ci, new WeakReference<String>(ci)); 129 } 130 return ci; 131 } 132 } 133 134 private void cleanUpExpiredEntries() { 135 rescheduleExpiredEntriesTimerTaskIfExpiryPeriodChanged(); 136 137 List<String> expiredCallIdentifiers = new LinkedList<String>(); 138 Date expireDoneBeforeDate = new Date(System.currentTimeMillis() - getExpiryPeriod()); 139 synchronized (callIdentifier2DoneDate) { 140 for (Map.Entry<String, Date> me : callIdentifier2DoneDate.entrySet()) { 141 if (me.getValue().before(expireDoneBeforeDate)) 142 expiredCallIdentifiers.add(me.getKey()); 143 } 144 } 145 for (String callIdentifier : expiredCallIdentifiers) { 146 synchronized (callIdentifier) { 147 callIdentifier2Future.remove(callIdentifier); 148 callIdentifier2DoneDate.remove(callIdentifier); 149 } 150 } 151 } 152 153 private void rescheduleExpiredEntriesTimerTaskIfExpiryPeriodChanged() { 154 synchronized (cleanUpExpiredEntriesTimer) { 155 final int expiryPeriod = getExpiryPeriod(); 156 if (cleanUpExpiredEntriesTimerTask == null || lastExpiryPeriod != expiryPeriod) { 157 if (cleanUpExpiredEntriesTimerTask != null) 158 cleanUpExpiredEntriesTimerTask.cancel(); 159 160 scheduleExpiredEntriesTimerTask(); 161 } 162 } 163 } 164 165 private void scheduleExpiredEntriesTimerTask() { 166 synchronized (cleanUpExpiredEntriesTimer) { 167 final int expiryPeriod = getExpiryPeriod(); 168 lastExpiryPeriod = expiryPeriod; 169 170 cleanUpExpiredEntriesTimerTask = new TimerTask() { 171 @Override 172 public void run() { 173 cleanUpExpiredEntries(); 174 } 175 }; 176 177 cleanUpExpiredEntriesTimer.schedule(cleanUpExpiredEntriesTimerTask, expiryPeriod / 2, expiryPeriod / 2); 178 } 179 } 180 181 private int getExpiryPeriod() { 182 return Config.getInstance().getPropertyAsPositiveOrZeroInt(CONFIG_KEY_EXPIRY_PERIOD, DEFAULT_EXPIRY_PERIOD); 183 } 184}