package org.apache.beam.sdk.expansion.service;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.expansion.v1.ExpansionServiceGrpc;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.resources.PipelineResources;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.ServerBuilder;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Converter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/expansion/service/ExpansionService.class */
public class ExpansionService extends ExpansionServiceGrpc.ExpansionServiceImplBase implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ExpansionService.class);
    private Map<String, TransformProvider> registeredTransforms;
    private final PipelineOptions pipelineOptions;

    /* loaded from: input_file:org/apache/beam/sdk/expansion/service/ExpansionService$ExpansionServiceRegistrar.class */
    public interface ExpansionServiceRegistrar {
        Map<String, TransformProvider> knownTransforms();
    }

    /* loaded from: input_file:org/apache/beam/sdk/expansion/service/ExpansionService$ExternalTransformRegistrarLoader.class */
    public static class ExternalTransformRegistrarLoader implements ExpansionServiceRegistrar {
        private static final SchemaRegistry SCHEMA_REGISTRY;
        static final /* synthetic */ boolean $assertionsDisabled;

        @Override // org.apache.beam.sdk.expansion.service.ExpansionService.ExpansionServiceRegistrar
        public Map<String, TransformProvider> knownTransforms() {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            Iterator it = ServiceLoader.load(ExternalTransformRegistrar.class).iterator();
            while (it.hasNext()) {
                for (Map.Entry entry : ((ExternalTransformRegistrar) it.next()).knownBuilderInstances().entrySet()) {
                    String str = (String) entry.getKey();
                    ExternalTransformBuilder externalTransformBuilder = (ExternalTransformBuilder) entry.getValue();
                    builder.put(str, functionSpec -> {
                        try {
                            return externalTransformBuilder.buildExternal(payloadToConfig(ExternalTransforms.ExternalConfigurationPayload.parseFrom(functionSpec.getPayload()), getConfigClass(externalTransformBuilder)));
                        } catch (Exception e) {
                            throw new RuntimeException(String.format("Failed to build transform %s from spec %s", str, functionSpec), e);
                        }
                    });
                }
            }
            return builder.build();
        }

        private static <ConfigT> Class<ConfigT> getConfigClass(ExternalTransformBuilder<ConfigT, ?, ?> externalTransformBuilder) {
            Class<?> cls = null;
            for (Method method : externalTransformBuilder.getClass().getMethods()) {
                if (method.getName().equals("buildExternal")) {
                    Preconditions.checkState(method.getParameterCount() == 1, "Build method for ExternalTransformBuilder %s must have exactly one parameter, but had %s parameters.", externalTransformBuilder.getClass().getSimpleName(), method.getParameterCount());
                    cls = method.getParameterTypes()[0];
                    if (!Object.class.equals(cls)) {
                        break;
                    }
                }
            }
            if (cls == null) {
                throw new AssertionError("Failed to find buildExternal method.");
            }
            return (Class<ConfigT>) cls;
        }

        private static <ConfigT> Row decodeRow(ExternalTransforms.ExternalConfigurationPayload externalConfigurationPayload) {
            Schema schemaFromProto = SchemaTranslation.schemaFromProto(externalConfigurationPayload.getSchema());
            if (schemaFromProto.getFieldCount() == 0) {
                return Row.withSchema(Schema.of(new Schema.Field[0])).build();
            }
            Converter converterTo = CaseFormat.LOWER_UNDERSCORE.converterTo(CaseFormat.LOWER_CAMEL);
            try {
                return (Row) RowCoder.of((Schema) schemaFromProto.getFields().stream().map(field -> {
                    Preconditions.checkNotNull(field.getName());
                    if (!field.getName().contains("_")) {
                        return field;
                    }
                    String str = (String) converterTo.convert(field.getName());
                    if ($assertionsDisabled || str != null) {
                        return field.withName(str);
                    }
                    throw new AssertionError("@AssumeAssertion(nullness): converter type is imprecise; it is nullness-preserving");
                }).collect(Schema.toSchema())).decode(externalConfigurationPayload.getPayload().newInput());
            } catch (IOException e) {
                throw new RuntimeException("Error decoding payload", e);
            }
        }

        @VisibleForTesting
        public static <ConfigT> ConfigT payloadToConfig(ExternalTransforms.ExternalConfigurationPayload externalConfigurationPayload, Class<ConfigT> cls) {
            try {
                return (ConfigT) payloadToConfigSchema(externalConfigurationPayload, cls);
            } catch (NoSuchSchemaException e) {
                ExpansionService.LOG.warn("Configuration class '{}' has no schema registered. Attempting to construct with setter approach.", cls.getName());
                try {
                    return (ConfigT) payloadToConfigSetters(externalConfigurationPayload, cls);
                } catch (ReflectiveOperationException e2) {
                    throw new IllegalArgumentException(String.format("Failed to construct instance of configuration class '%s'", cls.getName()), e2);
                }
            }
        }

        private static <ConfigT> ConfigT payloadToConfigSchema(ExternalTransforms.ExternalConfigurationPayload externalConfigurationPayload, Class<ConfigT> cls) throws NoSuchSchemaException {
            Schema schema = SCHEMA_REGISTRY.getSchema(cls);
            SerializableFunction fromRowFunction = SCHEMA_REGISTRY.getFromRowFunction(cls);
            Row decodeRow = decodeRow(externalConfigurationPayload);
            if (decodeRow.getSchema().assignableTo(schema)) {
                return (ConfigT) fromRowFunction.apply(decodeRow);
            }
            throw new IllegalArgumentException(String.format("Schema in expansion request payload is not assignable to the schema for the configuration object.%n%nPayload Schema: %s%n%nConfiguration Schema: %s", decodeRow.getSchema(), schema));
        }

        private static <ConfigT> ConfigT payloadToConfigSetters(ExternalTransforms.ExternalConfigurationPayload externalConfigurationPayload, Class<ConfigT> cls) throws ReflectiveOperationException {
            Row decodeRow = decodeRow(externalConfigurationPayload);
            Constructor<ConfigT> declaredConstructor = cls.getDeclaredConstructor(new Class[0]);
            declaredConstructor.setAccessible(true);
            ConfigT newInstance = declaredConstructor.newInstance(new Object[0]);
            for (Schema.Field field : decodeRow.getSchema().getFields()) {
                String name = field.getName();
                Object value = decodeRow.getValue(field.getName());
                Coder coderForFieldType = SchemaCoder.coderForFieldType(field.getType());
                Class<?> rawType = coderForFieldType.getEncodedTypeDescriptor().getRawType();
                String str = "set" + Character.toUpperCase(name.charAt(0)) + name.substring(1);
                try {
                    invokeSetter(newInstance, value, newInstance.getClass().getMethod(str, rawType));
                } catch (NoSuchMethodException e) {
                    throw new IllegalArgumentException(String.format("The configuration class %s is missing a setter %s for %s with type %s", newInstance.getClass(), str, name, coderForFieldType.getEncodedTypeDescriptor().getType().getTypeName()), e);
                }
            }
            return newInstance;
        }

        private static <ConfigT> void invokeSetter(ConfigT configt, Object obj, Method method) throws IllegalAccessException, InvocationTargetException {
            method.invoke(configt, obj);
        }

        static {
            $assertionsDisabled = !ExpansionService.class.desiredAssertionStatus();
            SCHEMA_REGISTRY = SchemaRegistry.createDefault();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/expansion/service/ExpansionService$NotRunnableRunner.class */
    private static class NotRunnableRunner extends PipelineRunner<PipelineResult> {
        private NotRunnableRunner() {
        }

        public static NotRunnableRunner fromOptions(PipelineOptions pipelineOptions) {
            return new NotRunnableRunner();
        }

        public PipelineResult run(Pipeline pipeline) {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/expansion/service/ExpansionService$TransformProvider.class */
    public interface TransformProvider<InputT extends PInput, OutputT extends POutput> {
        default InputT createInput(Pipeline pipeline, Map<String, PCollection<?>> map) {
            Map map2 = (Map) org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull(map);
            if (map2.size() == 0) {
                return pipeline.begin();
            }
            if (map2.size() == 1) {
                return (InputT) Iterables.getOnlyElement(map2.values());
            }
            PCollectionTuple empty = PCollectionTuple.empty(pipeline);
            for (Map.Entry entry : map2.entrySet()) {
                empty = empty.and(new TupleTag((String) entry.getKey()), (PCollection) entry.getValue());
            }
            return empty;
        }

        PTransform<InputT, OutputT> getTransform(RunnerApi.FunctionSpec functionSpec);

        default Map<String, PCollection<?>> extractOutputs(OutputT outputt) {
            if (outputt instanceof PDone) {
                return Collections.emptyMap();
            }
            if (outputt instanceof PCollection) {
                return ImmutableMap.of("output", (PCollection) outputt);
            }
            if (outputt instanceof PCollectionTuple) {
                return (Map) ((PCollectionTuple) outputt).getAll().entrySet().stream().collect(Collectors.toMap(entry -> {
                    return ((TupleTag) entry.getKey()).getId();
                }, (v0) -> {
                    return v0.getValue();
                }));
            }
            if (!(outputt instanceof PCollectionList)) {
                throw new UnsupportedOperationException("Unknown output type: " + outputt.getClass());
            }
            ImmutableMap.Builder builder = ImmutableMap.builder();
            int i = 0;
            Iterator it = ((PCollectionList) outputt).getAll().iterator();
            while (it.hasNext()) {
                builder.put(Integer.toString(i), (PCollection) it.next());
                i++;
            }
            return builder.build();
        }

        /* JADX WARN: Multi-variable type inference failed */
        default Map<String, PCollection<?>> apply(Pipeline pipeline, String str, RunnerApi.FunctionSpec functionSpec, Map<String, PCollection<?>> map) {
            return extractOutputs(Pipeline.applyTransform(str, createInput(pipeline, map), getTransform(functionSpec)));
        }
    }

    public ExpansionService() {
        this(new String[0]);
    }

    public ExpansionService(String[] strArr) {
        this(PipelineOptionsFactory.fromArgs(strArr).create());
    }

    public ExpansionService(PipelineOptions pipelineOptions) {
        this.pipelineOptions = pipelineOptions;
    }

    private Map<String, TransformProvider> getRegisteredTransforms() {
        if (this.registeredTransforms == null) {
            this.registeredTransforms = loadRegisteredTransforms();
        }
        return this.registeredTransforms;
    }

    private Map<String, TransformProvider> loadRegisteredTransforms() {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Iterator it = ServiceLoader.load(ExpansionServiceRegistrar.class).iterator();
        while (it.hasNext()) {
            builder.putAll(((ExpansionServiceRegistrar) it.next()).knownTransforms());
        }
        ImmutableMap build = builder.build();
        LOG.info("Registering external transforms: {}", build.keySet());
        return build;
    }

    @VisibleForTesting
    ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest expansionRequest) {
        LOG.info("Expanding '{}' with URN '{}'", expansionRequest.getTransform().getUniqueName(), expansionRequest.getTransform().getSpec().getUrn());
        LOG.debug("Full transform: {}", expansionRequest.getTransform());
        Set keySet = expansionRequest.getComponents().getTransformsMap().keySet();
        Pipeline createPipeline = createPipeline();
        boolean z = ExperimentalOptions.hasExperiment(this.pipelineOptions, "use_deprecated_read") || ExperimentalOptions.hasExperiment(this.pipelineOptions, "beam_fn_api_use_deprecated_read");
        if (z) {
            LOG.warn("Using use_depreacted_read in portable runners is runner-dependent. The ExpansionService will respect that, but if your runner does not have support for native Read transform, your Pipeline will fail during Pipeline submission.");
        } else {
            ExperimentalOptions.addExperiment(createPipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api");
            ExperimentalOptions.addExperiment(createPipeline.getOptions().as(ExperimentalOptions.class), "use_sdf_read");
        }
        ClassLoader classLoader = Environments.class.getClassLoader();
        if (classLoader == null) {
            throw new RuntimeException("Cannot detect classpath: classloader is null (is it the bootstrap classloader?)");
        }
        List detectClassPathResourcesToStage = PipelineResources.detectClassPathResourcesToStage(classLoader, createPipeline.getOptions());
        if (detectClassPathResourcesToStage.isEmpty()) {
            throw new IllegalArgumentException("No classpath elements found.");
        }
        LOG.debug("Staging to files from the classpath: {}", Integer.valueOf(detectClassPathResourcesToStage.size()));
        createPipeline.getOptions().as(PortablePipelineOptions.class).setFilesToStage(detectClassPathResourcesToStage);
        RehydratedComponents withPipeline = RehydratedComponents.forComponents(expansionRequest.getComponents()).withPipeline(createPipeline);
        Map<String, PCollection<?>> map = (Map) expansionRequest.getTransform().getInputsMap().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            try {
                return withPipeline.getPCollection((String) entry.getValue());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }));
        TransformProvider transformProvider = getRegisteredTransforms().get(expansionRequest.getTransform().getSpec().getUrn());
        if (transformProvider == null) {
            throw new UnsupportedOperationException("Unknown urn: " + expansionRequest.getTransform().getSpec().getUrn());
        }
        Map<String, PCollection<?>> apply = transformProvider.apply(createPipeline, expansionRequest.getTransform().getUniqueName(), expansionRequest.getTransform().getSpec(), map);
        SdkComponents withNewIdPrefix = withPipeline.getSdkComponents(Collections.emptyList()).withNewIdPrefix(expansionRequest.getNamespace());
        withNewIdPrefix.registerEnvironment(Environments.createOrGetDefaultEnvironment(createPipeline.getOptions().as(PortablePipelineOptions.class)));
        Map map2 = (Map) apply.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            try {
                return withNewIdPrefix.registerPCollection((PCollection) entry2.getValue());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }));
        if (z) {
            SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(createPipeline);
        }
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(createPipeline, withNewIdPrefix);
        String str = (String) Iterables.getOnlyElement((Iterable) proto.getRootTransformIdsList().stream().filter(str2 -> {
            return !keySet.contains(str2);
        }).collect(Collectors.toList()));
        RunnerApi.Components components = proto.getComponents();
        RunnerApi.PTransform build = components.getTransformsOrThrow(str).toBuilder().setUniqueName(str).clearOutputs().putAllOutputs(map2).build();
        LOG.debug("Expanded to {}", build);
        return ExpansionApi.ExpansionResponse.newBuilder().setComponents(components.toBuilder().removeTransforms(str)).setTransform(build).addAllRequirements(proto.getRequirementsList()).build();
    }

    protected Pipeline createPipeline() {
        PipelineOptions create = PipelineOptionsFactory.create();
        PortablePipelineOptions as = create.as(PortablePipelineOptions.class);
        PortablePipelineOptions as2 = this.pipelineOptions.as(PortablePipelineOptions.class);
        Optional ofNullable = Optional.ofNullable(as2.getDefaultEnvironmentType());
        Objects.requireNonNull(as);
        ofNullable.ifPresent(as::setDefaultEnvironmentType);
        Optional ofNullable2 = Optional.ofNullable(as2.getDefaultEnvironmentConfig());
        Objects.requireNonNull(as);
        ofNullable2.ifPresent(as::setDefaultEnvironmentConfig);
        create.as(ExperimentalOptions.class).setExperiments(this.pipelineOptions.as(ExperimentalOptions.class).getExperiments());
        create.setRunner(NotRunnableRunner.class);
        return Pipeline.create(create);
    }

    public void expand(ExpansionApi.ExpansionRequest expansionRequest, StreamObserver<ExpansionApi.ExpansionResponse> streamObserver) {
        try {
            streamObserver.onNext(expand(expansionRequest));
            streamObserver.onCompleted();
        } catch (RuntimeException e) {
            streamObserver.onNext(ExpansionApi.ExpansionResponse.newBuilder().setError(Throwables.getStackTraceAsString(e)).build());
            streamObserver.onCompleted();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
    }

    public static void main(String[] strArr) throws Exception {
        int parseInt = Integer.parseInt(strArr[0]);
        System.out.println("Starting expansion service at localhost:" + parseInt);
        ExpansionService expansionService = new ExpansionService((String[]) Arrays.copyOfRange(strArr, 1, strArr.length));
        for (Map.Entry<String, TransformProvider> entry : expansionService.getRegisteredTransforms().entrySet()) {
            System.out.println("\t" + entry.getKey() + ": " + entry.getValue());
        }
        Server build = ServerBuilder.forPort(parseInt).addService(expansionService).addService(new ArtifactRetrievalService()).build();
        build.start();
        build.awaitTermination();
    }
}
