001package co.codewizards.cloudstore.core.concurrent; 002 003import static java.util.Objects.*; 004 005import java.lang.ref.WeakReference; 006import java.util.Collections; 007import java.util.Date; 008import java.util.HashMap; 009import java.util.LinkedList; 010import java.util.List; 011import java.util.Map; 012import java.util.Timer; 013import java.util.TimerTask; 014import java.util.WeakHashMap; 015import java.util.concurrent.Callable; 016import java.util.concurrent.ExecutorService; 017import java.util.concurrent.Executors; 018import java.util.concurrent.Future; 019import java.util.concurrent.TimeUnit; 020import java.util.concurrent.TimeoutException; 021 022import org.slf4j.Logger; 023import org.slf4j.LoggerFactory; 024 025import co.codewizards.cloudstore.core.config.Config; 026import co.codewizards.cloudstore.core.config.ConfigImpl; 027 028public class DeferrableExecutor { 029 private static final Logger logger = LoggerFactory.getLogger(DeferrableExecutor.class); 030 031 /** 032 * The {@code key} for the timeout used with {@link Config#getPropertyAsInt(String, int)}. 033 * <p> 034 * The configuration can be overridden by a system property - see {@link Config#SYSTEM_PROPERTY_PREFIX}. 035 */ 036 public static final String CONFIG_KEY_TIMEOUT = "deferrableExecutor.timeout"; //$NON-NLS-1$ 037 038 private static final int DEFAULT_TIMEOUT = 60 * 1000; 039 040 /** 041 * The {@code key} for the expiry period used with {@link Config#getPropertyAsInt(String, int)}. 042 * <p> 043 * The configuration can be overridden by a system property - see {@link Config#SYSTEM_PROPERTY_PREFIX}. 044 */ 045 public static final String CONFIG_KEY_EXPIRY_PERIOD = "deferrableExecutor.expiryPeriod"; //$NON-NLS-1$ 046 047 private static final int DEFAULT_EXPIRY_PERIOD = 60 * 60 * 1000; 048 049 private final Map<String, WeakReference<String>> canonicalCallIdentifierMap = new WeakHashMap<String, WeakReference<String>>(); 050 private final Map<String, Future<?>> callIdentifier2Future = Collections.synchronizedMap(new HashMap<String, Future<?>>()); 051 private final Map<String, Date> callIdentifier2DoneDate = Collections.synchronizedMap(new WeakHashMap<String, Date>()); 052 private final ExecutorService executorService = Executors.newCachedThreadPool(); 053 private final Timer cleanUpExpiredEntriesTimer = new Timer("cleanUpExpiredEntriesTimer", true); 054 private TimerTask cleanUpExpiredEntriesTimerTask; 055 private int lastExpiryPeriod; 056 057 private DeferrableExecutor() { } 058 059 private static final class RunnableWithProgressExecutorHolder { 060 private static final DeferrableExecutor instance = new DeferrableExecutor(); 061 } 062 063 public static DeferrableExecutor getInstance() { 064 return RunnableWithProgressExecutorHolder.instance; 065 } 066 067 // TODO maybe we should make it possible to pass the timeout from the client, because 068 // the client knows its socket's read-timeout. 069 @SuppressWarnings("unchecked") 070 public <V> V call(String callIdentifier, final CallableProvider<V> callableProvider) throws DeferredCompletionException, ExecutionException { 071 requireNonNull(callIdentifier, "callIdentifier"); 072 requireNonNull(callableProvider, "callableProvider"); 073 074 final int timeout = ConfigImpl.getInstance().getPropertyAsPositiveOrZeroInt(CONFIG_KEY_TIMEOUT, DEFAULT_TIMEOUT); 075 076 cleanUpExpiredEntries(); 077 callIdentifier = canonicalizeCallIdentifier(callIdentifier); 078 synchronized (callIdentifier) { 079 Future<?> future = callIdentifier2Future.get(callIdentifier); 080 if (future == null) { 081 Callable<V> callable = callableProvider.getCallable(); 082 future = executorService.submit(new CallableWrapper<V>(callIdentifier, callable)); 083 callIdentifier2Future.put(callIdentifier, future); 084 } 085 086 Object result; 087 try { 088 result = future.get(timeout, TimeUnit.MILLISECONDS); 089 } catch (InterruptedException e) { 090 throw new DeferredCompletionException(e); 091 } catch (TimeoutException e) { 092 throw new DeferredCompletionException(e); 093 } catch (java.util.concurrent.ExecutionException e) { 094 callIdentifier2Future.remove(callIdentifier); // remove in case of failure (keep while still running) 095 throw new ExecutionException(e); 096 } 097 098 callIdentifier2Future.remove(callIdentifier); // remove in case of successful completion (keep while still running) 099 return (V) result; 100 } 101 } 102 103 private class CallableWrapper<V> implements Callable<V> { 104 private final String identifier; 105 private final Callable<V> delegate; 106 107 public CallableWrapper(String identifier, Callable<V> delegate) { 108 this.identifier = requireNonNull(identifier, "identifier"); 109 this.delegate = requireNonNull(delegate, "delegate"); 110 } 111 112 @Override 113 public V call() throws Exception { 114 try { 115 return delegate.call(); 116 } finally { 117 callIdentifier2DoneDate.put(identifier, new Date()); 118 } 119 } 120 121 } 122 123 private String canonicalizeCallIdentifier(String callIdentifier) { 124 synchronized (canonicalCallIdentifierMap) { 125 WeakReference<String> ref = canonicalCallIdentifierMap.get(callIdentifier); 126 String ci = ref == null ? null : ref.get(); 127 if (ci == null) { 128 ci = callIdentifier; 129 canonicalCallIdentifierMap.put(ci, new WeakReference<String>(ci)); 130 } 131 return ci; 132 } 133 } 134 135 private void cleanUpExpiredEntries() { 136 rescheduleExpiredEntriesTimerTaskIfExpiryPeriodChanged(); 137 138 List<String> expiredCallIdentifiers = new LinkedList<String>(); 139 Date expireDoneBeforeDate = new Date(System.currentTimeMillis() - getExpiryPeriod()); 140 synchronized (callIdentifier2DoneDate) { 141 for (Map.Entry<String, Date> me : callIdentifier2DoneDate.entrySet()) { 142 if (me.getValue().before(expireDoneBeforeDate)) 143 expiredCallIdentifiers.add(me.getKey()); 144 } 145 } 146 for (String callIdentifier : expiredCallIdentifiers) { 147 synchronized (callIdentifier) { 148 callIdentifier2Future.remove(callIdentifier); 149 callIdentifier2DoneDate.remove(callIdentifier); 150 } 151 } 152 } 153 154 private void rescheduleExpiredEntriesTimerTaskIfExpiryPeriodChanged() { 155 synchronized (cleanUpExpiredEntriesTimer) { 156 final int expiryPeriod = getExpiryPeriod(); 157 if (cleanUpExpiredEntriesTimerTask == null || lastExpiryPeriod != expiryPeriod) { 158 if (cleanUpExpiredEntriesTimerTask != null) 159 cleanUpExpiredEntriesTimerTask.cancel(); 160 161 scheduleExpiredEntriesTimerTask(); 162 } 163 } 164 } 165 166 private void scheduleExpiredEntriesTimerTask() { 167 synchronized (cleanUpExpiredEntriesTimer) { 168 final int expiryPeriod = getExpiryPeriod(); 169 lastExpiryPeriod = expiryPeriod; 170 171 cleanUpExpiredEntriesTimerTask = new TimerTask() { 172 @Override 173 public void run() { 174 cleanUpExpiredEntries(); 175 } 176 }; 177 178 cleanUpExpiredEntriesTimer.schedule(cleanUpExpiredEntriesTimerTask, expiryPeriod / 2, expiryPeriod / 2); 179 } 180 } 181 182 private int getExpiryPeriod() { 183 return ConfigImpl.getInstance().getPropertyAsPositiveOrZeroInt(CONFIG_KEY_EXPIRY_PERIOD, DEFAULT_EXPIRY_PERIOD); 184 } 185}