/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.stream;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.stream.FileWatcherStrategy;
import org.apache.camel.component.stream.StreamEndpoint;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.FileUtil;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamConsumer
extends DefaultConsumer
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(StreamConsumer.class);
    private static final String TYPES = "in,file,http";
    private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{in,file,http}'";
    private static final List<String> TYPES_LIST = Arrays.asList("in,file,http".split(","));
    private ExecutorService executor;
    private FileWatcherStrategy fileWatcher;
    private volatile boolean watchFileChanged;
    private volatile InputStream inputStream = System.in;
    private volatile InputStream inputStreamToClose;
    private volatile URLConnection urlConnectionToClose;
    private volatile File file;
    private StreamEndpoint endpoint;
    private String uri;
    private volatile boolean initialPromptDone;
    private final List<String> lines = new CopyOnWriteArrayList<String>();

    public StreamConsumer(StreamEndpoint endpoint, Processor processor, String uri) throws Exception {
        super(endpoint, processor);
        this.endpoint = endpoint;
        this.uri = uri;
        this.validateUri(uri);
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        if (this.endpoint.isFileWatcher()) {
            String dir = new File(this.endpoint.getFileName()).getParent();
            this.fileWatcher = new FileWatcherStrategy(dir, file -> {
                String onlyName = file.getName();
                String target = FileUtil.stripPath(this.endpoint.getFileName());
                LOG.trace("File changed: {}", (Object)onlyName);
                if (onlyName.equals(target)) {
                    this.watchFileChanged = true;
                }
            });
            this.fileWatcher.setCamelContext(this.getEndpoint().getCamelContext());
        }
        ServiceHelper.startService(this.fileWatcher);
        if (!this.endpoint.isScanStream()) {
            this.initializeStreamLineMode();
        }
        this.executor = this.endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, this.endpoint.getEndpointUri());
        this.executor.execute(this);
        if (this.endpoint.getGroupLines() < 0) {
            throw new IllegalArgumentException("Option groupLines must be 0 or positive number, was " + this.endpoint.getGroupLines());
        }
    }

    @Override
    public void doStop() throws Exception {
        if (this.executor != null) {
            this.endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(this.executor);
            this.executor = null;
        }
        ServiceHelper.stopAndShutdownService(this.fileWatcher);
        this.lines.clear();
        IOHelper.close((Closeable)this.inputStreamToClose);
        if (this.urlConnectionToClose != null) {
            StreamConsumer.closeURLConnection(this.urlConnectionToClose);
            this.urlConnectionToClose = null;
        }
        super.doStop();
    }

    @Override
    public void run() {
        try {
            if (this.endpoint.isReadLine()) {
                this.readFromStreamLineMode();
            } else {
                this.readFromStreamRawMode();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            this.getExceptionHandler().handleException(e);
        }
    }

    private BufferedReader initializeStreamLineMode() throws Exception {
        IOHelper.close((Closeable)this.inputStreamToClose);
        if (this.urlConnectionToClose != null) {
            StreamConsumer.closeURLConnection(this.urlConnectionToClose);
        }
        if ("in".equals(this.uri)) {
            this.inputStream = System.in;
            this.inputStreamToClose = null;
        } else if ("file".equals(this.uri)) {
            this.inputStreamToClose = this.inputStream = this.resolveStreamFromFile();
        } else if ("http".equals(this.uri)) {
            this.inputStreamToClose = this.inputStream = this.resolveStreamFromUrl();
        }
        if (this.inputStream != null) {
            if ("http".equals(this.uri)) {
                return IOHelper.buffered(new InputStreamReader(this.inputStream));
            }
            Charset charset = this.endpoint.getCharset();
            return IOHelper.buffered(new InputStreamReader(this.inputStream, charset));
        }
        return null;
    }

    private InputStream initializeStreamRawMode() throws Exception {
        IOHelper.close((Closeable)this.inputStreamToClose);
        if (this.urlConnectionToClose != null) {
            StreamConsumer.closeURLConnection(this.urlConnectionToClose);
        }
        if ("in".equals(this.uri)) {
            this.inputStream = System.in;
            this.inputStreamToClose = null;
        } else if ("file".equals(this.uri)) {
            this.inputStreamToClose = this.inputStream = this.resolveStreamFromFile();
        } else if ("http".equals(this.uri)) {
            this.inputStreamToClose = this.inputStream = this.resolveStreamFromUrl();
        }
        return this.inputStream;
    }

    private void readFromStreamRawMode() throws Exception {
        long index = 0L;
        InputStream is = this.initializeStreamRawMode();
        if (this.endpoint.isScanStream()) {
            while (this.isRunAllowed()) {
                boolean eos;
                byte[] data = null;
                try {
                    data = is.readAllBytes();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
                boolean bl = eos = data == null || data.length == 0;
                if (this.isRunAllowed() && this.endpoint.isRetry()) {
                    boolean reOpen = true;
                    if (this.endpoint.isFileWatcher()) {
                        reOpen = this.watchFileChanged;
                    }
                    if (reOpen) {
                        LOG.debug("File: {} changed/rollover, re-reading file from beginning", (Object)this.file);
                        is = this.initializeStreamRawMode();
                        if (this.endpoint.isFileWatcher()) {
                            this.watchFileChanged = false;
                        }
                    } else {
                        LOG.trace("File: {} not changed since last read", (Object)this.file);
                    }
                }
                if (!eos) continue;
                try {
                    Thread.sleep(this.endpoint.getScanStreamDelay());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        } else {
            boolean eos = false;
            byte[] data = null;
            while (!eos && this.isRunAllowed()) {
                if (this.endpoint.getPromptMessage() != null) {
                    this.doPromptMessage();
                }
                try {
                    data = is.readAllBytes();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
                if (eos = data == null || data.length == 0) continue;
                this.processRaw(data, index);
            }
        }
    }

    private void readFromStreamLineMode() throws Exception {
        long index = 0L;
        BufferedReader br = this.initializeStreamLineMode();
        if (this.endpoint.isScanStream()) {
            while (this.isRunAllowed()) {
                boolean eos;
                String line;
                if (br != null) {
                    line = br.readLine();
                    LOG.trace("Read line: {}", (Object)line);
                } else {
                    line = null;
                }
                boolean bl = eos = line == null;
                if (!eos && this.isRunAllowed()) {
                    index = this.processLine(line, false, index);
                } else if (eos && this.isRunAllowed() && this.endpoint.isRetry()) {
                    boolean reOpen = true;
                    if (this.endpoint.isFileWatcher()) {
                        reOpen = this.watchFileChanged;
                    }
                    if (reOpen) {
                        LOG.debug("File: {} changed/rollover, re-reading file from beginning", (Object)this.file);
                        br = this.initializeStreamLineMode();
                        if (this.endpoint.isFileWatcher()) {
                            this.watchFileChanged = false;
                        }
                    } else {
                        LOG.trace("File: {} not changed since last read", (Object)this.file);
                    }
                }
                if (!eos) continue;
                try {
                    Thread.sleep(this.endpoint.getScanStreamDelay());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        } else {
            boolean eos = false;
            String line2 = null;
            while (!eos && this.isRunAllowed()) {
                if (this.endpoint.getPromptMessage() != null) {
                    this.doPromptMessage();
                }
                String line = line2 == null ? br.readLine() : line2;
                LOG.trace("Read line: {}", (Object)line);
                eos = line == null;
                if (eos || !this.isRunAllowed()) continue;
                line2 = this.readAhead(br);
                boolean last = line2 == null;
                index = this.processLine(line, last, index);
            }
            this.processLine(null, true, index);
        }
    }

    protected synchronized long processLine(String line, boolean last, long index) throws Exception {
        if (this.endpoint.getGroupLines() > 0) {
            if (line != null) {
                this.lines.add(line);
            }
            if (!this.lines.isEmpty() && (this.lines.size() >= this.endpoint.getGroupLines() || last)) {
                ArrayList<String> copy = new ArrayList<String>(this.lines);
                Object body = this.endpoint.getGroupStrategy().groupLines(copy);
                Exchange exchange = this.createExchange(body, index++, last);
                this.lines.clear();
                this.getProcessor().process(exchange);
            }
        } else if (line != null) {
            Exchange exchange = this.createExchange(line, index++, last);
            this.getProcessor().process(exchange);
        }
        return index;
    }

    protected synchronized long processRaw(byte[] body, long index) throws Exception {
        Exchange exchange = this.createExchange(body, index++, true);
        this.getProcessor().process(exchange);
        return index;
    }

    protected void doPromptMessage() {
        long delay = 0L;
        if (!this.initialPromptDone && this.endpoint.getInitialPromptDelay() > 0L) {
            this.initialPromptDone = true;
            delay = this.endpoint.getInitialPromptDelay();
        } else if (this.endpoint.getPromptDelay() > 0L) {
            delay = this.endpoint.getPromptDelay();
        }
        if (delay > 0L) {
            try {
                Thread.sleep(delay);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        if (this.inputStream == System.in) {
            System.out.print(this.endpoint.getPromptMessage());
        }
    }

    private String readAhead(BufferedReader br) throws IOException {
        if (this.uri.equals("in")) {
            return null;
        }
        return br.readLine();
    }

    private InputStream resolveStreamFromFile() throws IOException {
        FileInputStream fileStream;
        String fileName = this.endpoint.getFileName();
        StringHelper.notEmpty(fileName, "fileName");
        this.file = new File(fileName);
        if (LOG.isDebugEnabled()) {
            LOG.debug("File to be scanned: {}, path: {}", (Object)this.file.getName(), (Object)this.file.getAbsolutePath());
        }
        if (this.file.canRead()) {
            fileStream = new FileInputStream(this.file);
        } else if (this.endpoint.isScanStream()) {
            fileStream = null;
        } else {
            throw new IllegalArgumentException(INVALID_URI);
        }
        return fileStream;
    }

    Stream<Map.Entry<String, String>> parseHeaders(String headerList) {
        return Arrays.asList(headerList.split(",")).stream().map(s -> s.split("[=:]")).filter(h -> ((String[])h).length == 2).map(h -> Map.entry(h[0].trim(), h[1].trim()));
    }

    private InputStream resolveStreamFromUrl() throws IOException {
        InputStream is;
        String url = this.endpoint.getHttpUrl();
        StringHelper.notEmpty(url, "httpUrl");
        this.urlConnectionToClose = new URL(url).openConnection();
        this.urlConnectionToClose.setUseCaches(false);
        String headers = this.endpoint.getHttpHeaders();
        if (headers != null) {
            this.parseHeaders(headers).forEach(e -> this.urlConnectionToClose.setRequestProperty((String)e.getKey(), (String)e.getValue()));
        }
        try {
            is = this.urlConnectionToClose.getInputStream();
        }
        catch (IOException e2) {
            if (this.urlConnectionToClose instanceof HttpURLConnection) {
                ((HttpURLConnection)this.urlConnectionToClose).disconnect();
            }
            throw e2;
        }
        return is;
    }

    private void validateUri(String uri) throws IllegalArgumentException {
        String[] s = uri.split(":");
        if (s.length < 2) {
            throw new IllegalArgumentException(INVALID_URI);
        }
        String[] t = s[1].split("\\?");
        if (t.length < 1) {
            throw new IllegalArgumentException(INVALID_URI);
        }
        this.uri = t[0].trim();
        if (this.uri.startsWith("//")) {
            this.uri = this.uri.substring(2);
        }
        if (!TYPES_LIST.contains(this.uri)) {
            throw new IllegalArgumentException(INVALID_URI);
        }
    }

    protected Exchange createExchange(Object body, long index, boolean last) {
        Exchange exchange = this.createExchange(true);
        exchange.getIn().setBody(body);
        exchange.getIn().setHeader("CamelStreamIndex", index);
        exchange.getIn().setHeader("CamelStreamComplete", last);
        return exchange;
    }

    private static void closeURLConnection(URLConnection con) {
        if (con instanceof HttpURLConnection) {
            try {
                ((HttpURLConnection)con).disconnect();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }
}

