package org.apache.flink.streaming.connectors.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.rabbitmq.RMQDeserializationSchema;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/rabbitmq/RMQSource.class */
public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long> implements ResultTypeQueryable<OUT> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
    private final RMQConnectionConfig rmqConnectionConfig;
    protected final String queueName;
    private final boolean usesCorrelationId;
    protected RMQDeserializationSchema<OUT> deliveryDeserializer;
    protected transient Connection connection;
    protected transient Channel channel;
    protected transient QueueingConsumer consumer;
    protected transient boolean autoAck;
    private volatile transient boolean running;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/rabbitmq/RMQSource$RMQCollectorImpl.class */
    public class RMQCollectorImpl implements RMQDeserializationSchema.RMQCollector<OUT> {
        private final SourceFunction.SourceContext<OUT> ctx;
        private boolean endOfStreamSignalled;
        private String correlationId;
        private long deliveryTag;
        private boolean customIdentifiersSet;

        private RMQCollectorImpl(SourceFunction.SourceContext<OUT> sourceContext) {
            this.endOfStreamSignalled = false;
            this.customIdentifiersSet = false;
            this.ctx = sourceContext;
        }

        public void collect(OUT out) {
            if (this.customIdentifiersSet || setMessageIdentifiers(this.correlationId, this.deliveryTag)) {
                if (isEndOfStream(out)) {
                    this.endOfStreamSignalled = true;
                } else {
                    this.ctx.collect(out);
                }
            }
        }

        public void setFallBackIdentifiers(String str, long j) {
            this.correlationId = str;
            this.deliveryTag = j;
            this.customIdentifiersSet = false;
        }

        @Override // org.apache.flink.streaming.connectors.rabbitmq.RMQDeserializationSchema.RMQCollector
        public boolean setMessageIdentifiers(String str, long j) {
            if (this.customIdentifiersSet) {
                throw new IllegalStateException("You can set only a single set of identifiers for a block of messages.");
            }
            this.customIdentifiersSet = true;
            if (RMQSource.this.autoAck) {
                return true;
            }
            if (RMQSource.this.usesCorrelationId) {
                Preconditions.checkNotNull(str, "RabbitMQ source was instantiated with usesCorrelationId set to true yet we couldn't extract the correlation id from it !");
                if (!RMQSource.this.addId(str)) {
                    return false;
                }
            }
            RMQSource.this.sessionIds.add(Long.valueOf(j));
            return true;
        }

        boolean isEndOfStream(OUT out) {
            return this.endOfStreamSignalled || RMQSource.this.deliveryDeserializer.isEndOfStream(out);
        }

        public boolean isEndOfStreamSignalled() {
            return this.endOfStreamSignalled;
        }

        public void close() {
        }
    }

    public RMQSource(RMQConnectionConfig rMQConnectionConfig, String str, DeserializationSchema<OUT> deserializationSchema) {
        this(rMQConnectionConfig, str, false, (DeserializationSchema) deserializationSchema);
    }

    public RMQSource(RMQConnectionConfig rMQConnectionConfig, String str, boolean z, DeserializationSchema<OUT> deserializationSchema) {
        super(String.class);
        this.rmqConnectionConfig = rMQConnectionConfig;
        this.queueName = str;
        this.usesCorrelationId = z;
        this.deliveryDeserializer = new RMQDeserializationSchemaWrapper(deserializationSchema);
    }

    public RMQSource(RMQConnectionConfig rMQConnectionConfig, String str, RMQDeserializationSchema<OUT> rMQDeserializationSchema) {
        this(rMQConnectionConfig, str, false, (RMQDeserializationSchema) rMQDeserializationSchema);
    }

    public RMQSource(RMQConnectionConfig rMQConnectionConfig, String str, boolean z, RMQDeserializationSchema<OUT> rMQDeserializationSchema) {
        super(String.class);
        this.rmqConnectionConfig = rMQConnectionConfig;
        this.queueName = str;
        this.usesCorrelationId = z;
        this.deliveryDeserializer = rMQDeserializationSchema;
    }

    protected ConnectionFactory setupConnectionFactory() throws Exception {
        return this.rmqConnectionConfig.getConnectionFactory();
    }

    @VisibleForTesting
    protected Connection setupConnection() throws Exception {
        return setupConnectionFactory().newConnection();
    }

    private Channel setupChannel(Connection connection) throws Exception {
        Channel createChannel = connection.createChannel();
        if (this.rmqConnectionConfig.getPrefetchCount().isPresent()) {
            createChannel.basicQos(this.rmqConnectionConfig.getPrefetchCount().get().intValue(), true);
        }
        return createChannel;
    }

    @VisibleForTesting
    protected void setupQueue() throws IOException {
        Util.declareQueueDefaults(this.channel, this.queueName);
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        try {
            this.connection = setupConnection();
            this.channel = setupChannel(this.connection);
            if (this.channel == null) {
                throw new RuntimeException("None of RabbitMQ channels are available");
            }
            setupQueue();
            this.consumer = new QueueingConsumer(this.channel);
            StreamingRuntimeContext runtimeContext = getRuntimeContext();
            if ((runtimeContext instanceof StreamingRuntimeContext) && runtimeContext.isCheckpointingEnabled()) {
                this.autoAck = false;
                this.channel.txSelect();
            } else {
                this.autoAck = true;
            }
            LOG.debug("Starting RabbitMQ source with autoAck status: " + this.autoAck);
            this.channel.basicConsume(this.queueName, this.autoAck, this.consumer);
            this.deliveryDeserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext(), metricGroup -> {
                return metricGroup.addGroup("user");
            }));
            this.running = true;
        } catch (IOException e) {
            IOUtils.closeAllQuietly(new AutoCloseable[]{this.channel, this.connection});
            throw new RuntimeException("Cannot create RMQ connection with " + this.queueName + " at " + this.rmqConnectionConfig.getHost(), e);
        }
    }

    public void close() throws Exception {
        super.close();
        Throwable th = null;
        try {
            if (this.consumer != null && this.channel != null) {
                this.channel.basicCancel(this.consumer.getConsumerTag());
            }
        } catch (IOException e) {
            th = new RuntimeException("Error while cancelling RMQ consumer on " + this.queueName + " at " + this.rmqConnectionConfig.getHost(), e);
        }
        try {
            IOUtils.closeAll(new AutoCloseable[]{this.channel, this.connection});
        } catch (IOException e2) {
            th = (Exception) ExceptionUtils.firstOrSuppressed(new RuntimeException("Error while closing RMQ source with " + this.queueName + " at " + this.rmqConnectionConfig.getHost(), e2), th);
        }
        if (th != null) {
            throw th;
        }
    }

    private void processMessage(Delivery delivery, RMQSource<OUT>.RMQCollectorImpl rMQCollectorImpl) throws IOException {
        AMQP.BasicProperties properties = delivery.getProperties();
        byte[] body = delivery.getBody();
        Envelope envelope = delivery.getEnvelope();
        rMQCollectorImpl.setFallBackIdentifiers(properties.getCorrelationId(), envelope.getDeliveryTag());
        this.deliveryDeserializer.deserialize(envelope, properties, body, rMQCollectorImpl);
    }

    public void run(SourceFunction.SourceContext<OUT> sourceContext) throws Exception {
        RMQSource<OUT>.RMQCollectorImpl rMQCollectorImpl = new RMQCollectorImpl(sourceContext);
        long deliveryTimeout = this.rmqConnectionConfig.getDeliveryTimeout();
        while (this.running) {
            Delivery nextDelivery = this.consumer.nextDelivery(deliveryTimeout);
            synchronized (sourceContext.getCheckpointLock()) {
                if (nextDelivery != null) {
                    processMessage(nextDelivery, rMQCollectorImpl);
                }
                if (rMQCollectorImpl.isEndOfStreamSignalled()) {
                    this.running = false;
                    return;
                }
            }
        }
    }

    public void cancel() {
        this.running = false;
    }

    protected void acknowledgeSessionIDs(List<Long> list) {
        try {
            Iterator<Long> it = list.iterator();
            while (it.hasNext()) {
                this.channel.basicAck(it.next().longValue(), false);
            }
            this.channel.txCommit();
        } catch (IOException e) {
            throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e);
        }
    }

    public TypeInformation<OUT> getProducedType() {
        return this.deliveryDeserializer.getProducedType();
    }
}
