package org.kitesdk.data.spi.filesystem;

import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Flushable;
import org.kitesdk.data.Format;
import org.kitesdk.data.Formats;
import org.kitesdk.data.PartitionStrategy;
import org.kitesdk.data.Syncable;
import org.kitesdk.data.URIBuilder;
import org.kitesdk.data.ValidationException;
import org.kitesdk.data.spi.AbstractDatasetWriter;
import org.kitesdk.data.spi.ClockReady;
import org.kitesdk.data.spi.DescriptorUtil;
import org.kitesdk.data.spi.EntityAccessor;
import org.kitesdk.data.spi.PartitionListener;
import org.kitesdk.data.spi.ReaderWriterState;
import org.kitesdk.data.spi.RollingWriter;
import org.kitesdk.data.spi.StorageKey;
import org.kitesdk.data.spi.filesystem.FileSystemWriter;
import org.kitesdk.shaded.com.google.common.annotations.VisibleForTesting;
import org.kitesdk.shaded.com.google.common.base.Objects;
import org.kitesdk.shaded.com.google.common.base.Preconditions;
import org.kitesdk.shaded.com.google.common.cache.CacheBuilder;
import org.kitesdk.shaded.com.google.common.cache.CacheLoader;
import org.kitesdk.shaded.com.google.common.cache.LoadingCache;
import org.kitesdk.shaded.com.google.common.cache.RemovalListener;
import org.kitesdk.shaded.com.google.common.cache.RemovalNotification;
import org.kitesdk.shaded.com.google.common.util.concurrent.UncheckedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kitesdk/data/spi/filesystem/PartitionedDatasetWriter.class */
abstract class PartitionedDatasetWriter<E, W extends FileSystemWriter<E>> extends AbstractDatasetWriter<E> implements RollingWriter {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionedDatasetWriter.class);
    private static final int DEFAULT_WRITER_CACHE_SIZE = 10;
    protected FileSystemView<E> view;
    private final int maxWriters;
    private final PartitionStrategy partitionStrategy;
    protected LoadingCache<StorageKey, W> cachedWriters;
    private final StorageKey reusedKey;
    private final EntityAccessor<E> accessor;
    private final Map<String, Object> provided;
    protected ReaderWriterState state;
    protected long targetFileSize;
    protected long rollIntervalMillis;

    /* loaded from: input_file:org/kitesdk/data/spi/filesystem/PartitionedDatasetWriter$ConfAccessor.class */
    public interface ConfAccessor {
        long getTargetFileSize();

        long getRollIntervalMillis();
    }

    @VisibleForTesting
    /* loaded from: input_file:org/kitesdk/data/spi/filesystem/PartitionedDatasetWriter$DatasetWriterCacheLoader.class */
    static class DatasetWriterCacheLoader<E> extends CacheLoader<StorageKey, FileSystemWriter<E>> {
        private final FileSystemView<E> view;
        private final PathConversion convert;
        private final ConfAccessor conf;

        public DatasetWriterCacheLoader(FileSystemView<E> fileSystemView, ConfAccessor confAccessor) {
            this.view = fileSystemView;
            this.convert = new PathConversion(fileSystemView.getDataset().getDescriptor().getSchema());
            this.conf = confAccessor;
        }

        @Override // org.kitesdk.shaded.com.google.common.cache.CacheLoader
        public FileSystemWriter<E> load(StorageKey storageKey) throws Exception {
            Preconditions.checkState(this.view.getDataset() instanceof FileSystemDataset, "FileSystemWriters cannot create writer for " + this.view.getDataset());
            FileSystemDataset fileSystemDataset = (FileSystemDataset) this.view.getDataset();
            Path fromKey = this.convert.fromKey(storageKey);
            FileSystemWriter<E> newWriter = FileSystemWriter.newWriter(fileSystemDataset.getFileSystem(), new Path(fileSystemDataset.getDirectory(), fromKey), this.conf.getRollIntervalMillis(), this.conf.getTargetFileSize(), fileSystemDataset.getDescriptor());
            PartitionListener partitionListener = fileSystemDataset.getPartitionListener();
            if (partitionListener != null) {
                partitionListener.partitionAdded(fileSystemDataset.getNamespace(), fileSystemDataset.getName(), fromKey.toString());
            }
            newWriter.initialize();
            return newWriter;
        }
    }

    /* loaded from: input_file:org/kitesdk/data/spi/filesystem/PartitionedDatasetWriter$DatasetWriterCloser.class */
    private static class DatasetWriterCloser<E> implements RemovalListener<StorageKey, DatasetWriter<E>> {
        private DatasetWriterCloser() {
        }

        @Override // org.kitesdk.shaded.com.google.common.cache.RemovalListener
        public void onRemoval(RemovalNotification<StorageKey, DatasetWriter<E>> removalNotification) {
            DatasetWriter<E> value = removalNotification.getValue();
            PartitionedDatasetWriter.LOG.debug("Closing writer:{} for partition:{}", value, removalNotification.getKey());
            value.close();
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/kitesdk/data/spi/filesystem/PartitionedDatasetWriter$IncrementalDatasetWriterCacheLoader.class */
    static class IncrementalDatasetWriterCacheLoader<E> extends CacheLoader<StorageKey, FileSystemWriter.IncrementalWriter<E>> {
        private final FileSystemView<E> view;
        private final PathConversion convert;
        private final ConfAccessor conf;

        public IncrementalDatasetWriterCacheLoader(FileSystemView<E> fileSystemView, ConfAccessor confAccessor) {
            this.view = fileSystemView;
            this.convert = new PathConversion(fileSystemView.getDataset().getDescriptor().getSchema());
            this.conf = confAccessor;
        }

        @Override // org.kitesdk.shaded.com.google.common.cache.CacheLoader
        @SuppressWarnings(value = {"BC_UNCONFIRMED_CAST_OF_RETURN_VALUE"}, justification = "Writer is known to be IncrementalWriter")
        public FileSystemWriter.IncrementalWriter<E> load(StorageKey storageKey) throws Exception {
            Preconditions.checkState(this.view.getDataset() instanceof FileSystemDataset, "FileSystemWriters cannot create writer for " + this.view.getDataset());
            FileSystemDataset fileSystemDataset = (FileSystemDataset) this.view.getDataset();
            Path fromKey = this.convert.fromKey(storageKey);
            FileSystemWriter newWriter = FileSystemWriter.newWriter(fileSystemDataset.getFileSystem(), new Path(fileSystemDataset.getDirectory(), fromKey), this.conf.getRollIntervalMillis(), this.conf.getTargetFileSize(), fileSystemDataset.getDescriptor());
            PartitionListener partitionListener = fileSystemDataset.getPartitionListener();
            if (partitionListener != null) {
                partitionListener.partitionAdded(fileSystemDataset.getNamespace(), fileSystemDataset.getName(), fromKey.toString());
            }
            newWriter.initialize();
            return (FileSystemWriter.IncrementalWriter) newWriter;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/kitesdk/data/spi/filesystem/PartitionedDatasetWriter$IncrementalPartitionedDatasetWriter.class */
    public static class IncrementalPartitionedDatasetWriter<E> extends PartitionedDatasetWriter<E, FileSystemWriter.IncrementalWriter<E>> implements Flushable, Syncable {
        private IncrementalPartitionedDatasetWriter(FileSystemView<E> fileSystemView) {
            super(fileSystemView);
        }

        @Override // org.kitesdk.data.spi.filesystem.PartitionedDatasetWriter
        protected CacheLoader<StorageKey, FileSystemWriter.IncrementalWriter<E>> createCacheLoader() {
            return new IncrementalDatasetWriterCacheLoader(this.view, new ConfAccessor() { // from class: org.kitesdk.data.spi.filesystem.PartitionedDatasetWriter.IncrementalPartitionedDatasetWriter.1
                @Override // org.kitesdk.data.spi.filesystem.PartitionedDatasetWriter.ConfAccessor
                public long getTargetFileSize() {
                    return IncrementalPartitionedDatasetWriter.this.targetFileSize;
                }

                @Override // org.kitesdk.data.spi.filesystem.PartitionedDatasetWriter.ConfAccessor
                public long getRollIntervalMillis() {
                    return IncrementalPartitionedDatasetWriter.this.rollIntervalMillis;
                }
            });
        }

        @Override // org.kitesdk.data.Flushable, java.io.Flushable
        public void flush() {
            Preconditions.checkState(this.state.equals(ReaderWriterState.OPEN), "Attempt to flush a writer in state:%s", this.state);
            PartitionedDatasetWriter.LOG.debug("Flushing all cached writers for view:{}", this.view);
            for (W w : this.cachedWriters.asMap().values()) {
                PartitionedDatasetWriter.LOG.debug("Flushing partition writer:{}", w);
                w.flush();
            }
        }

        @Override // org.kitesdk.data.Syncable
        public void sync() {
            Preconditions.checkState(this.state.equals(ReaderWriterState.OPEN), "Attempt to sync a writer in state:%s", this.state);
            PartitionedDatasetWriter.LOG.debug("Syncing all cached writers for view:{}", this.view);
            for (W w : this.cachedWriters.asMap().values()) {
                PartitionedDatasetWriter.LOG.debug("Syncing partition writer:{}", w);
                w.sync();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/kitesdk/data/spi/filesystem/PartitionedDatasetWriter$NonDurablePartitionedDatasetWriter.class */
    public static class NonDurablePartitionedDatasetWriter<E> extends PartitionedDatasetWriter<E, FileSystemWriter<E>> {
        private NonDurablePartitionedDatasetWriter(FileSystemView<E> fileSystemView) {
            super(fileSystemView);
        }

        @Override // org.kitesdk.data.spi.filesystem.PartitionedDatasetWriter
        protected CacheLoader<StorageKey, FileSystemWriter<E>> createCacheLoader() {
            return new DatasetWriterCacheLoader(this.view, new ConfAccessor() { // from class: org.kitesdk.data.spi.filesystem.PartitionedDatasetWriter.NonDurablePartitionedDatasetWriter.1
                @Override // org.kitesdk.data.spi.filesystem.PartitionedDatasetWriter.ConfAccessor
                public long getTargetFileSize() {
                    return NonDurablePartitionedDatasetWriter.this.targetFileSize;
                }

                @Override // org.kitesdk.data.spi.filesystem.PartitionedDatasetWriter.ConfAccessor
                public long getRollIntervalMillis() {
                    return NonDurablePartitionedDatasetWriter.this.rollIntervalMillis;
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <E> PartitionedDatasetWriter<E, ?> newWriter(FileSystemView<E> fileSystemView) {
        DatasetDescriptor descriptor = fileSystemView.getDataset().getDescriptor();
        Format format = descriptor.getFormat();
        return Formats.PARQUET.equals(format) ? DescriptorUtil.isDisabled(FileSystemProperties.NON_DURABLE_PARQUET_PROP, descriptor) ? new IncrementalPartitionedDatasetWriter(fileSystemView) : new NonDurablePartitionedDatasetWriter(fileSystemView) : (Formats.AVRO.equals(format) || Formats.CSV.equals(format)) ? new IncrementalPartitionedDatasetWriter(fileSystemView) : new NonDurablePartitionedDatasetWriter(fileSystemView);
    }

    private PartitionedDatasetWriter(FileSystemView<E> fileSystemView) {
        DatasetDescriptor descriptor = fileSystemView.getDataset().getDescriptor();
        Preconditions.checkArgument(descriptor.isPartitioned(), "Dataset " + fileSystemView.getDataset() + " is not partitioned");
        this.view = fileSystemView;
        this.partitionStrategy = descriptor.getPartitionStrategy();
        int cardinality = this.partitionStrategy.getCardinality();
        this.maxWriters = DescriptorUtil.getInt(FileSystemProperties.WRITER_CACHE_SIZE_PROP, descriptor, (cardinality < 0 || cardinality > 10) ? 10 : cardinality);
        this.state = ReaderWriterState.NEW;
        this.reusedKey = new StorageKey(this.partitionStrategy);
        this.accessor = fileSystemView.getAccessor();
        this.provided = fileSystemView.getProvidedValues();
        if (Formats.PARQUET.equals(descriptor.getFormat())) {
            this.targetFileSize = -1L;
        } else {
            this.targetFileSize = DescriptorUtil.getLong(FileSystemProperties.TARGET_FILE_SIZE_PROP, descriptor, -1L);
        }
        this.rollIntervalMillis = 1000 * DescriptorUtil.getLong(FileSystemProperties.ROLL_INTERVAL_S_PROP, descriptor, -1L);
    }

    @Override // org.kitesdk.data.spi.InitializeAccessor
    public void initialize() {
        Preconditions.checkState(this.state.equals(ReaderWriterState.NEW), "Unable to open a writer from state:%s", this.state);
        DatasetDescriptor descriptor = this.view.getDataset().getDescriptor();
        ValidationException.check(FileSystemWriter.isSupportedFormat(descriptor), "Not a supported format: %s", descriptor.getFormat());
        LOG.debug("Opening partitioned dataset writer w/strategy:{}", this.partitionStrategy);
        this.cachedWriters = CacheBuilder.newBuilder().maximumSize(this.maxWriters).removalListener(new DatasetWriterCloser()).build(createCacheLoader());
        this.state = ReaderWriterState.OPEN;
    }

    protected abstract CacheLoader<StorageKey, W> createCacheLoader();

    @Override // org.kitesdk.data.DatasetWriter
    public void write(E e) {
        Preconditions.checkState(this.state.equals(ReaderWriterState.OPEN), "Attempt to write to a writer in state:%s", this.state);
        this.accessor.keyFor(e, this.provided, this.reusedKey);
        W ifPresent = this.cachedWriters.getIfPresent(this.reusedKey);
        if (ifPresent == null) {
            Preconditions.checkArgument(this.view.includes(e), "View %s does not include entity %s", this.view, e);
            try {
                ifPresent = this.cachedWriters.getUnchecked(StorageKey.copy(this.reusedKey));
            } catch (UncheckedExecutionException e2) {
                throw new IllegalArgumentException("Problem creating view for entity: " + e, e2.getCause());
            }
        }
        ifPresent.write(e);
    }

    @Override // org.kitesdk.data.DatasetWriter, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.state.equals(ReaderWriterState.OPEN)) {
            LOG.debug("Closing all cached writers for view:{}", this.view);
            for (W w : this.cachedWriters.asMap().values()) {
                LOG.debug("Closing partition writer:{}", w);
                w.close();
            }
            this.state = ReaderWriterState.CLOSED;
        }
    }

    @Override // org.kitesdk.data.spi.RollingWriter
    public void setRollIntervalMillis(long j) {
        this.rollIntervalMillis = j;
        if (ReaderWriterState.OPEN == this.state) {
            for (W w : this.cachedWriters.asMap().values()) {
                if (w instanceof RollingWriter) {
                    w.setRollIntervalMillis(j);
                }
            }
        }
    }

    @Override // org.kitesdk.data.spi.RollingWriter
    public void setTargetFileSize(long j) {
        this.targetFileSize = j;
        if (ReaderWriterState.OPEN == this.state) {
            for (W w : this.cachedWriters.asMap().values()) {
                if (w instanceof RollingWriter) {
                    w.setTargetFileSize(j);
                }
            }
        }
    }

    @Override // org.kitesdk.data.spi.ClockReady
    public void tick() {
        if (ReaderWriterState.OPEN == this.state) {
            for (W w : this.cachedWriters.asMap().values()) {
                if (w instanceof ClockReady) {
                    w.tick();
                }
            }
        }
    }

    @Override // org.kitesdk.data.DatasetWriter
    public boolean isOpen() {
        return this.state.equals(ReaderWriterState.OPEN);
    }

    public String toString() {
        return Objects.toStringHelper(this).add("partitionStrategy", this.partitionStrategy).add("maxWriters", this.maxWriters).add(URIBuilder.VIEW_SCHEME, this.view).add("cachedWriters", this.cachedWriters).toString();
    }
}
