package org.apache.beam.runners.fnexecution.artifact;

import com.google.auto.value.AutoValue;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusException;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.class */
public class ArtifactStagingService extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
    private static final Logger LOG;
    private final ArtifactDestinationProvider destinationProvider;
    private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> toStage = new ConcurrentHashMap();
    private final ConcurrentMap<String, Map<String, List<RunnerApi.ArtifactInformation>>> staged = new ConcurrentHashMap();
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$beam$runners$fnexecution$artifact$ArtifactStagingService$State = new int[State.values().length];

        static {
            try {
                $SwitchMap$org$apache$beam$runners$fnexecution$artifact$ArtifactStagingService$State[State.START.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$beam$runners$fnexecution$artifact$ArtifactStagingService$State[State.RESOLVE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$beam$runners$fnexecution$artifact$ArtifactStagingService$State[State.GET.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$beam$runners$fnexecution$artifact$ArtifactStagingService$State[State.GETCHUNK.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService$ArtifactDestination.class */
    public static abstract class ArtifactDestination {
        public static ArtifactDestination create(String str, ByteString byteString, OutputStream outputStream) {
            return new AutoValue_ArtifactStagingService_ArtifactDestination(str, byteString, outputStream);
        }

        public static ArtifactDestination fromFile(String str) throws IOException {
            return fromFile(str, Channels.newOutputStream(FileSystems.create(FileSystems.matchNewResource(str, false), "application/octet-stream")));
        }

        public static ArtifactDestination fromFile(String str, OutputStream outputStream) {
            return create(ArtifactRetrievalService.FILE_ARTIFACT_URN, RunnerApi.ArtifactFilePayload.newBuilder().setPath(str).build().toByteString(), outputStream);
        }

        public abstract String getTypeUrn();

        public abstract ByteString getTypePayload();

        public abstract OutputStream getOutputStream();
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService$ArtifactDestinationProvider.class */
    public interface ArtifactDestinationProvider {
        ArtifactDestination getDestination(String str, String str2) throws IOException;

        void removeStagedArtifacts(String str) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService$OverflowingSemaphore.class */
    public static class OverflowingSemaphore {
        private int totalPermits;
        private int usedPermits = 0;
        private Exception exception;

        public OverflowingSemaphore(int i) {
            this.totalPermits = i;
        }

        synchronized void aquire(int i) throws Exception {
            while (this.usedPermits >= this.totalPermits) {
                if (this.exception != null) {
                    throw this.exception;
                }
                wait();
            }
            this.usedPermits += i;
        }

        synchronized void release(int i) {
            this.usedPermits -= i;
            notifyAll();
        }

        synchronized void setException(Exception exc) {
            this.exception = exc;
            notifyAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService$StagingDriver.class */
    public static class StagingDriver implements StreamObserver<ArtifactApi.ArtifactRequestWrapper> {
        private final ArtifactRetrievalService retrievalService;
        private final StreamObserver<ArtifactApi.ArtifactResponseWrapper> responseObserver;
        private final CompletableFuture<Void> completionFuture = new CompletableFuture<>();

        public StagingDriver(ArtifactRetrievalService artifactRetrievalService, ArtifactStagingServiceGrpc.ArtifactStagingServiceStub artifactStagingServiceStub, String str) {
            this.retrievalService = artifactRetrievalService;
            this.responseObserver = artifactStagingServiceStub.reverseArtifactRetrievalService(this);
            this.responseObserver.onNext(ArtifactApi.ArtifactResponseWrapper.newBuilder().setStagingToken(str).build());
        }

        public CompletableFuture<?> getCompletionFuture() {
            return this.completionFuture;
        }

        public void onNext(ArtifactApi.ArtifactRequestWrapper artifactRequestWrapper) {
            if (this.completionFuture.isCompletedExceptionally()) {
                try {
                    this.completionFuture.get();
                } catch (Throwable th) {
                    this.responseObserver.onError(th);
                    return;
                }
            }
            if (artifactRequestWrapper.hasResolveArtifact()) {
                this.retrievalService.resolveArtifacts(artifactRequestWrapper.getResolveArtifact(), new StreamObserver<ArtifactApi.ResolveArtifactsResponse>() { // from class: org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService.StagingDriver.1
                    public void onNext(ArtifactApi.ResolveArtifactsResponse resolveArtifactsResponse) {
                        StagingDriver.this.responseObserver.onNext(ArtifactApi.ArtifactResponseWrapper.newBuilder().setResolveArtifactResponse(resolveArtifactsResponse).build());
                    }

                    public void onError(Throwable th2) {
                        StagingDriver.this.completionFuture.completeExceptionally(th2);
                        StagingDriver.this.responseObserver.onError(th2);
                    }

                    public void onCompleted() {
                    }
                });
            } else {
                if (artifactRequestWrapper.hasGetArtifact()) {
                    this.retrievalService.getArtifact(artifactRequestWrapper.getGetArtifact(), new StreamObserver<ArtifactApi.GetArtifactResponse>() { // from class: org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService.StagingDriver.2
                        public void onNext(ArtifactApi.GetArtifactResponse getArtifactResponse) {
                            StagingDriver.this.responseObserver.onNext(ArtifactApi.ArtifactResponseWrapper.newBuilder().setGetArtifactResponse(getArtifactResponse).build());
                        }

                        public void onError(Throwable th2) {
                            StagingDriver.this.completionFuture.completeExceptionally(th2);
                            StagingDriver.this.responseObserver.onError(th2);
                        }

                        public void onCompleted() {
                            StagingDriver.this.responseObserver.onNext(ArtifactApi.ArtifactResponseWrapper.newBuilder().setGetArtifactResponse(ArtifactApi.GetArtifactResponse.newBuilder().build()).setIsLast(true).build());
                        }
                    });
                    return;
                }
                StatusException statusException = new StatusException(Status.INVALID_ARGUMENT.withDescription("Expected either a resolve or get request."));
                this.completionFuture.completeExceptionally(statusException);
                this.responseObserver.onError(statusException);
            }
        }

        public void onError(Throwable th) {
            this.completionFuture.completeExceptionally(th);
        }

        public void onCompleted() {
            this.responseObserver.onCompleted();
            this.completionFuture.complete(null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService$State.class */
    public enum State {
        START,
        RESOLVE,
        GET,
        GETCHUNK,
        DONE,
        ERROR
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService$StoreArtifact.class */
    public class StoreArtifact implements Callable<RunnerApi.ArtifactInformation> {
        private String stagingToken;
        private String name;
        private RunnerApi.ArtifactInformation originalArtifact;
        private BlockingQueue<ByteString> bytesQueue;
        private OverflowingSemaphore totalPendingBytes;

        public StoreArtifact(String str, String str2, RunnerApi.ArtifactInformation artifactInformation, BlockingQueue<ByteString> blockingQueue, OverflowingSemaphore overflowingSemaphore) {
            this.stagingToken = str;
            this.name = str2;
            this.originalArtifact = artifactInformation;
            this.bytesQueue = blockingQueue;
            this.totalPendingBytes = overflowingSemaphore;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public RunnerApi.ArtifactInformation call() throws IOException {
            try {
                ArtifactDestination destination = ArtifactStagingService.this.destinationProvider.getDestination(this.stagingToken, this.name);
                ArtifactStagingService.LOG.debug("Storing artifact for {}.{} at {}", new Object[]{this.stagingToken, this.name, destination});
                ByteString take = this.bytesQueue.take();
                while (take.size() > 0) {
                    this.totalPendingBytes.release(take.size());
                    take.writeTo(destination.getOutputStream());
                    take = this.bytesQueue.take();
                }
                destination.getOutputStream().close();
                return this.originalArtifact.toBuilder().setTypeUrn(destination.getTypeUrn()).setTypePayload(destination.getTypePayload()).build();
            } catch (IOException | InterruptedException e) {
                this.totalPendingBytes.setException(e);
                ArtifactStagingService.LOG.error("Exception staging artifacts", e);
                if (e instanceof IOException) {
                    throw ((IOException) e);
                }
                throw new RuntimeException(e);
            }
        }
    }

    public ArtifactStagingService(ArtifactDestinationProvider artifactDestinationProvider) {
        this.destinationProvider = artifactDestinationProvider;
    }

    public void registerJob(String str, Map<String, List<RunnerApi.ArtifactInformation>> map) {
        if (!$assertionsDisabled && this.toStage.containsKey(str)) {
            throw new AssertionError();
        }
        this.toStage.put(str, map);
    }

    public Map<String, List<RunnerApi.ArtifactInformation>> getStagedArtifacts(String str) {
        this.toStage.remove(str);
        return this.staged.remove(str);
    }

    public void removeStagedArtifacts(String str) throws IOException {
        this.destinationProvider.removeStagedArtifacts(str);
    }

    public static ArtifactDestinationProvider beamFilesystemArtifactDestinationProvider(final String str) {
        return new ArtifactDestinationProvider() { // from class: org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService.1
            @Override // org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService.ArtifactDestinationProvider
            public ArtifactDestination getDestination(String str2, String str3) throws IOException {
                return ArtifactDestination.fromFile(stagingDir(str2).resolve(str3, ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString());
            }

            @Override // org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService.ArtifactDestinationProvider
            public void removeStagedArtifacts(String str2) throws IOException {
                ResourceId stagingDir = stagingDir(str2);
                ArrayList arrayList = new ArrayList();
                Iterator it = FileSystems.matchResources(ImmutableList.of(stagingDir.resolve("*", ResolveOptions.StandardResolveOptions.RESOLVE_FILE))).iterator();
                while (it.hasNext()) {
                    Iterator it2 = ((MatchResult) it.next()).metadata().iterator();
                    while (it2.hasNext()) {
                        arrayList.add(((MatchResult.Metadata) it2.next()).resourceId());
                    }
                }
                FileSystems.delete(arrayList, new MoveOptions[]{MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES});
                FileSystems.delete(ImmutableList.of(stagingDir), new MoveOptions[]{MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES});
            }

            private ResourceId stagingDir(String str2) {
                return FileSystems.matchNewResource(str, true).resolve(Hashing.sha256().hashString(str2, Charsets.UTF_8).toString(), ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
            }
        };
    }

    public StreamObserver<ArtifactApi.ArtifactResponseWrapper> reverseArtifactRetrievalService(final StreamObserver<ArtifactApi.ArtifactRequestWrapper> streamObserver) {
        return new StreamObserver<ArtifactApi.ArtifactResponseWrapper>() { // from class: org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService.2
            public static final int THREAD_POOL_SIZE = 10;
            public static final int MAX_PENDING_BYTES = 104857600;
            String stagingToken;
            Map<String, List<RunnerApi.ArtifactInformation>> toResolve;
            Map<String, List<Future<RunnerApi.ArtifactInformation>>> stagedFutures;
            ExecutorService stagingExecutor;
            OverflowingSemaphore totalPendingBytes;
            Queue<String> pendingResolves;
            String currentEnvironment;
            Queue<RunnerApi.ArtifactInformation> pendingGets;
            BlockingQueue<ByteString> currentOutput;
            IdGenerator idGenerator = IdGenerators.incrementingLongs();
            State state = State.START;

            @SuppressFBWarnings(value = {"SF_SWITCH_FALLTHROUGH"}, justification = "fallthrough intended")
            public synchronized void onNext(ArtifactApi.ArtifactResponseWrapper artifactResponseWrapper) {
                switch (AnonymousClass3.$SwitchMap$org$apache$beam$runners$fnexecution$artifact$ArtifactStagingService$State[this.state.ordinal()]) {
                    case 1:
                        this.stagingToken = artifactResponseWrapper.getStagingToken();
                        ArtifactStagingService.LOG.info("Staging artifacts for {}.", this.stagingToken);
                        this.toResolve = (Map) ArtifactStagingService.this.toStage.get(this.stagingToken);
                        if (this.toResolve == null) {
                            streamObserver.onError(new StatusException(Status.INVALID_ARGUMENT.withDescription("Unknown staging token " + this.stagingToken)));
                            return;
                        }
                        this.stagedFutures = new ConcurrentHashMap();
                        this.pendingResolves = new ArrayDeque();
                        this.pendingResolves.addAll(this.toResolve.keySet());
                        this.stagingExecutor = Executors.newFixedThreadPool(10);
                        this.totalPendingBytes = new OverflowingSemaphore(MAX_PENDING_BYTES);
                        resolveNextEnvironment(streamObserver);
                        return;
                    case 2:
                        this.currentEnvironment = this.pendingResolves.remove();
                        this.stagedFutures.put(this.currentEnvironment, new ArrayList());
                        this.pendingGets = new ArrayDeque();
                        for (RunnerApi.ArtifactInformation artifactInformation : artifactResponseWrapper.getResolveArtifactResponse().getReplacementsList()) {
                            Optional<RunnerApi.ArtifactInformation> local = getLocal(artifactInformation);
                            if (local.isPresent()) {
                                this.stagedFutures.get(this.currentEnvironment).add(CompletableFuture.completedFuture(local.get()));
                            } else {
                                this.pendingGets.add(artifactInformation);
                                streamObserver.onNext(ArtifactApi.ArtifactRequestWrapper.newBuilder().setGetArtifact(ArtifactApi.GetArtifactRequest.newBuilder().setArtifact(artifactInformation)).build());
                            }
                        }
                        ArtifactStagingService.LOG.info("Getting {} artifacts for {}.{}.", new Object[]{Integer.valueOf(this.pendingGets.size()), this.stagingToken, this.pendingResolves.peek()});
                        if (this.pendingGets.isEmpty()) {
                            resolveNextEnvironment(streamObserver);
                            return;
                        } else {
                            this.state = State.GET;
                            return;
                        }
                    case 3:
                        RunnerApi.ArtifactInformation remove = this.pendingGets.remove();
                        String createFilename = createFilename(this.currentEnvironment, remove);
                        try {
                            ArtifactStagingService.LOG.debug("Storing artifacts for {} as {}", this.stagingToken, createFilename);
                            this.currentOutput = new ArrayBlockingQueue(100);
                            this.stagedFutures.get(this.currentEnvironment).add(this.stagingExecutor.submit(new StoreArtifact(this.stagingToken, createFilename, remove, this.currentOutput, this.totalPendingBytes)));
                        } catch (Exception e) {
                            ArtifactStagingService.LOG.error("Error submitting.", e);
                            streamObserver.onError(e);
                        }
                        this.state = State.GETCHUNK;
                        break;
                    case 4:
                        break;
                    default:
                        streamObserver.onError(new StatusException(Status.INVALID_ARGUMENT.withDescription("Illegal state " + this.state)));
                        return;
                }
                try {
                    ByteString data = artifactResponseWrapper.getGetArtifactResponse().getData();
                    if (data.size() > 0) {
                        this.totalPendingBytes.aquire(data.size());
                        this.currentOutput.put(data);
                    }
                    if (artifactResponseWrapper.getIsLast()) {
                        this.currentOutput.put(ByteString.EMPTY);
                        if (this.pendingGets.isEmpty()) {
                            resolveNextEnvironment(streamObserver);
                        } else {
                            this.state = State.GET;
                            ArtifactStagingService.LOG.debug("Waiting for {}", this.pendingGets.peek());
                        }
                    }
                } catch (Exception e2) {
                    ArtifactStagingService.LOG.error("Error submitting.", e2);
                    onError(e2);
                }
            }

            private void resolveNextEnvironment(StreamObserver<ArtifactApi.ArtifactRequestWrapper> streamObserver2) {
                if (this.pendingResolves.isEmpty()) {
                    finishStaging(streamObserver2);
                    return;
                }
                this.state = State.RESOLVE;
                ArtifactStagingService.LOG.info("Resolving artifacts for {}.{}.", this.stagingToken, this.pendingResolves.peek());
                streamObserver2.onNext(ArtifactApi.ArtifactRequestWrapper.newBuilder().setResolveArtifact(ArtifactApi.ResolveArtifactsRequest.newBuilder().addAllArtifacts(this.toResolve.get(this.pendingResolves.peek()))).build());
            }

            private void finishStaging(StreamObserver<ArtifactApi.ArtifactRequestWrapper> streamObserver2) {
                ArtifactStagingService.LOG.debug("Finishing staging for {}.", this.stagingToken);
                HashMap hashMap = new HashMap();
                try {
                    for (Map.Entry<String, List<Future<RunnerApi.ArtifactInformation>>> entry : this.stagedFutures.entrySet()) {
                        ArrayList arrayList = new ArrayList();
                        Iterator<Future<RunnerApi.ArtifactInformation>> it = entry.getValue().iterator();
                        while (it.hasNext()) {
                            arrayList.add(it.next().get());
                        }
                        hashMap.put(entry.getKey(), arrayList);
                    }
                    ArtifactStagingService.this.staged.put(this.stagingToken, hashMap);
                    this.stagingExecutor.shutdown();
                    this.state = State.DONE;
                    ArtifactStagingService.LOG.info("Artifacts fully staged for {}.", this.stagingToken);
                    streamObserver2.onCompleted();
                } catch (Exception e) {
                    ArtifactStagingService.LOG.error("Error staging artifacts", e);
                    streamObserver2.onError(e);
                    this.state = State.ERROR;
                }
            }

            private Optional<RunnerApi.ArtifactInformation> getLocal(RunnerApi.ArtifactInformation artifactInformation) {
                return Optional.empty();
            }

            private String createFilename(String str, RunnerApi.ArtifactInformation artifactInformation) {
                try {
                    List splitToList = Splitter.onPattern("[^A-Za-z-_.]]").splitToList(artifactInformation.getRoleUrn().equals(ArtifactRetrievalService.STAGING_TO_ARTIFACT_URN) ? RunnerApi.ArtifactStagingToRolePayload.parseFrom(artifactInformation.getRolePayload()).getStagedName() : artifactInformation.getTypeUrn().equals(ArtifactRetrievalService.FILE_ARTIFACT_URN) ? RunnerApi.ArtifactFilePayload.parseFrom(artifactInformation.getTypePayload()).getPath() : artifactInformation.getTypeUrn().equals(ArtifactRetrievalService.URL_ARTIFACT_URN) ? RunnerApi.ArtifactUrlPayload.parseFrom(artifactInformation.getTypePayload()).getUrl() : "artifact");
                    return clip(String.format("%s-%s-%s", this.idGenerator.getId(), clip(str, 25), (String) splitToList.get(splitToList.size() - 1)), 100);
                } catch (InvalidProtocolBufferException e) {
                    throw new RuntimeException((Throwable) e);
                }
            }

            private String clip(String str, int i) {
                return str.length() < i ? str : str.substring(0, i);
            }

            public void onError(Throwable th) {
                this.stagingExecutor.shutdownNow();
                ArtifactStagingService.LOG.error("Error staging artifacts", th);
                this.state = State.ERROR;
            }

            public void onCompleted() {
                Preconditions.checkArgument(this.state == State.DONE);
            }
        };
    }

    @Override // org.apache.beam.runners.fnexecution.FnService, java.lang.AutoCloseable
    public void close() throws Exception {
    }

    public static void offer(ArtifactRetrievalService artifactRetrievalService, ArtifactStagingServiceGrpc.ArtifactStagingServiceStub artifactStagingServiceStub, String str) throws ExecutionException, InterruptedException {
        new StagingDriver(artifactRetrievalService, artifactStagingServiceStub, str).getCompletionFuture().get();
    }

    static {
        $assertionsDisabled = !ArtifactStagingService.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(ArtifactStagingService.class);
    }
}
