001package co.codewizards.cloudstore.core.util.childprocess;
002
003import java.io.BufferedReader;
004import co.codewizards.cloudstore.core.io.ByteArrayOutputStream;
005import java.io.IOException;
006import java.io.StringReader;
007import java.io.UnsupportedEncodingException;
008
009import org.slf4j.Logger;
010import org.slf4j.LoggerFactory;
011
012/**
013 * Thread used to log standard-out or standard-error from a child-process.
014 * <p>
015 * Besides logging, it can write all data to a {@link StringBuffer}.
016 * <p>
017 * An instance of this class is usally not created explicitly, but implicitly via an instance of
018 * {@link DumpStreamThread}.
019 * @author Marco หงุ่ยตระกูล-Schulze - marco at codewizards dot co
020 */
021public class LogDumpedStreamThread extends Thread
022{
023        /**
024         * UTF-8 character set name.
025         */
026        private static final String CHARSET_NAME_UTF_8 = "UTF-8";
027
028        private static final Logger logger = LoggerFactory.getLogger(LogDumpedStreamThread.class);
029
030        /**
031         * If the first data which arrived here via {@link #write(byte[], int)} have not yet been written
032         * after this time (i.e. their age exceeds this time) in milliseconds, it is logged.
033         */
034        private static final long logMaxAge = 5000L;
035
036        /**
037         * If there was no write after this many milliseconds, the current buffer is logged.
038         */
039        private static final long logAfterNoWritePeriod = 500L;
040
041        /**
042         * If the buffer grows bigger than this size in bytes, it is logged - no matter when the last
043         * write occured.
044         */
045        private static final int logWhenBufferExceedsSize = 50 * 1024; // 50 KB
046
047        private final ByteArrayOutputStream bufferOutputStream = new ByteArrayOutputStream();
048        private volatile boolean forceInterrupt = false;
049        private Long firstNonLoggedWriteTimestamp = null;
050        private long lastWriteTimestamp = 0;
051
052        private volatile StringBuffer outputStringBuffer;
053        private volatile int outputStringBufferMaxLength = 1024 * 1024;
054
055        private Logger childProcessLogger;
056
057        public LogDumpedStreamThread(final String childProcessLoggerName) {
058                this(childProcessLoggerName == null ? null : LoggerFactory.getLogger(childProcessLoggerName));
059        }
060
061        public LogDumpedStreamThread(final Logger childProcessLogger)
062        {
063                if (childProcessLogger == null)
064                        this.childProcessLogger = logger;
065                else
066                        this.childProcessLogger = childProcessLogger;
067        }
068
069        public void write(final byte[] data, final int length)
070        {
071                if (data == null)
072                        throw new IllegalArgumentException("data == null"); //$NON-NLS-1$
073
074                synchronized (bufferOutputStream) {
075                        bufferOutputStream.write(data, 0, length);
076                        lastWriteTimestamp = System.currentTimeMillis();
077                        if (firstNonLoggedWriteTimestamp == null)
078                                firstNonLoggedWriteTimestamp = lastWriteTimestamp;
079                }
080        }
081
082        public void setOutputStringBuffer(final StringBuffer outputStringBuffer) {
083                this.outputStringBuffer = outputStringBuffer;
084        }
085        public StringBuffer getOutputStringBuffer() {
086                return outputStringBuffer;
087        }
088        public void setOutputStringBufferMaxLength(final int outputStringBufferMaxLength) {
089                this.outputStringBufferMaxLength = outputStringBufferMaxLength;
090        }
091        public int getOutputStringBufferMaxLength() {
092                return outputStringBufferMaxLength;
093        }
094
095        @Override
096        public void interrupt() {
097                forceInterrupt = true;
098                super.interrupt();
099        }
100
101        @Override
102        public boolean isInterrupted() {
103                return forceInterrupt || super.isInterrupted();
104        }
105
106        @Override
107        public void run() {
108                while (!isInterrupted()) {
109                        try {
110                                synchronized (bufferOutputStream) {
111                                        try {
112                                                bufferOutputStream.wait(500L);
113                                        } catch (final InterruptedException x) {
114//                                              doNothing(); TODO reference to the Util.doNothing class after Util moved into this util project!
115                                        }
116
117                                        processBuffer(false);
118                                }
119                        } catch (final Throwable e) {
120                                logger.error("run: " + e, e); //$NON-NLS-1$
121                        }
122                }
123                processBuffer(true);
124        }
125
126        public void flushBuffer()
127        {
128                processBuffer(true);
129        }
130
131        protected void processBuffer(final boolean force)
132        {
133                synchronized (bufferOutputStream) {
134                        if (bufferOutputStream.size() > 0) {
135                                final long firstNonLoggedWriteAge = firstNonLoggedWriteTimestamp == null ? 0 : System.currentTimeMillis() - firstNonLoggedWriteTimestamp;
136                                final long noWritePeriod = System.currentTimeMillis() - lastWriteTimestamp;
137                                if (force || firstNonLoggedWriteAge > logMaxAge || noWritePeriod > logAfterNoWritePeriod || bufferOutputStream.size() > logWhenBufferExceedsSize) {
138                                        String currentBufferString;
139                                        try {
140                                                currentBufferString = bufferOutputStream.toString(CHARSET_NAME_UTF_8);
141                                        } catch (final UnsupportedEncodingException e) {
142                                                throw new RuntimeException(e);
143                                        }
144
145                                        final StringBuffer outputStringBuffer = getOutputStringBuffer();
146                                        if (outputStringBuffer != null) {
147                                                final int newOutputStringBufferLength = outputStringBuffer.length() + currentBufferString.length();
148                                                if (newOutputStringBufferLength > outputStringBufferMaxLength) {
149                                                        int lastCharPositionToDelete = newOutputStringBufferLength - outputStringBufferMaxLength;
150                                                        // search for first line-break
151                                                        while (outputStringBuffer.length() > lastCharPositionToDelete && outputStringBuffer.charAt(lastCharPositionToDelete) != '\n')
152                                                                ++lastCharPositionToDelete;
153
154                                                        lastCharPositionToDelete = Math.min(lastCharPositionToDelete, outputStringBuffer.length() - 1);
155                                                        outputStringBuffer.delete(0, lastCharPositionToDelete + 1);
156                                                }
157
158                                                outputStringBuffer.append(currentBufferString);
159                                        }
160
161                                        childProcessLogger.info(
162                                                        '\n' + prefixEveryLine(currentBufferString)
163                                                        );
164
165                                        bufferOutputStream.reset();
166                                }
167                        }
168                }
169        }
170
171        private String prefixEveryLine(final String s)
172        {
173                try {
174                        final StringBuilder result = new StringBuilder();
175                        final String prefix = "  >>> "; //$NON-NLS-1$
176                        final BufferedReader r = new BufferedReader(new StringReader(s));
177                        String line;
178                        while (null != (line = r.readLine()))
179                                result.append(prefix).append(line).append('\n');
180
181                        return result.toString();
182                } catch (final IOException x) {
183                        throw new RuntimeException(x);
184                }
185        }
186}