package org.eclipse.hono.client.amqp;

import com.fasterxml.jackson.core.JsonLocation;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.handler.TimeoutHandler;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.RequestResponseClientConfigProperties;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.StatusCodeMapper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.HonoProtonHelper;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.RequestResponseResult;
import org.eclipse.hono.util.TriTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/hono-client-amqp-common-1.8.2.jar:org/eclipse/hono/client/amqp/RequestResponseClient.class */
public class RequestResponseClient<R extends RequestResponseResult<?>> extends AbstractHonoClient {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) RequestResponseClient.class);
    private final String linkTargetAddress;
    private final String replyToAddress;
    private final SendMessageSampler sampler;
    private final Map<Object, TriTuple<Promise<R>, BiFunction<Message, ProtonDelivery, R>, Span>> replyMap;
    private final String requestEndPointName;
    private final String responseEndPointName;
    private final String tenantId;
    private final Supplier<String> messageIdSupplier;
    private long requestTimeoutMillis;

    private RequestResponseClient(HonoConnection honoConnection, String str, String str2, String str3, String str4, Supplier<String> supplier, SendMessageSampler sendMessageSampler) {
        super(honoConnection);
        this.replyMap = new HashMap();
        this.requestEndPointName = (String) Objects.requireNonNull(str);
        this.responseEndPointName = (String) Objects.requireNonNull(str2);
        Objects.requireNonNull(str4);
        this.sampler = (SendMessageSampler) Objects.requireNonNull(sendMessageSampler);
        this.requestTimeoutMillis = honoConnection.getConfig().getRequestTimeout();
        this.messageIdSupplier = (Supplier) Optional.ofNullable(supplier).orElse(this::createMessageId);
        if (str3 == null) {
            this.linkTargetAddress = str;
            this.replyToAddress = String.format("%s/%s", str2, str4);
        } else {
            this.linkTargetAddress = String.format("%s/%s", str, str3);
            this.replyToAddress = String.format("%s/%s/%s", str2, str3, str4);
        }
        this.tenantId = str3;
    }

    public static <T extends RequestResponseResult<?>> Future<RequestResponseClient<T>> forEndpoint(HonoConnection honoConnection, String str, String str2, SendMessageSampler sendMessageSampler, Handler<String> handler, Handler<String> handler2) {
        return forEndpoint(honoConnection, str, str, str2, UUID.randomUUID().toString(), null, sendMessageSampler, handler, handler2);
    }

    public static <T extends RequestResponseResult<?>> Future<RequestResponseClient<T>> forEndpoint(HonoConnection honoConnection, String str, String str2, String str3, String str4, Supplier<String> supplier, SendMessageSampler sendMessageSampler, Handler<String> handler, Handler<String> handler2) {
        RequestResponseClient requestResponseClient = new RequestResponseClient(honoConnection, str, str2, str3, str4, supplier, sendMessageSampler);
        return (Future<RequestResponseClient<T>>) requestResponseClient.createLinks(handler, handler2).map((Future<Void>) requestResponseClient);
    }

    protected final long getResponseCacheDefaultTimeout() {
        if (this.connection.getConfig() instanceof RequestResponseClientConfigProperties) {
            return ((RequestResponseClientConfigProperties) this.connection.getConfig()).getResponseCacheDefaultTimeout();
        }
        return 600L;
    }

    public final void setRequestTimeout(long j) {
        if (j < 0) {
            throw new IllegalArgumentException("request timeout must be >= 0");
        }
        this.requestTimeoutMillis = j;
    }

    private String createMessageId() {
        return String.format("%s-client-%s", this.requestEndPointName, UUID.randomUUID());
    }

    public final Future<Void> createLinks() {
        return createLinks(null, null);
    }

    public final Future<Void> createLinks(Handler<String> handler, Handler<String> handler2) {
        return createReceiver(this.replyToAddress, handler2).compose(protonReceiver -> {
            this.receiver = protonReceiver;
            return createSender(this.linkTargetAddress, handler);
        }).compose(protonSender -> {
            LOG.debug("request-response client for peer [{}] created", this.connection.getConfig().getHost());
            this.offeredCapabilities = (List) Optional.ofNullable(protonSender.getRemoteOfferedCapabilities()).map(symbolArr -> {
                return Collections.unmodifiableList(Arrays.asList(symbolArr));
            }).orElse(Collections.emptyList());
            this.sender = protonSender;
            return Future.succeededFuture();
        });
    }

    private Future<ProtonSender> createSender(String str, Handler<String> handler) {
        return this.connection.createSender(str, ProtonQoS.AT_LEAST_ONCE, handler);
    }

    private Future<ProtonReceiver> createReceiver(String str, Handler<String> handler) {
        return this.connection.createReceiver(str, ProtonQoS.AT_LEAST_ONCE, this::handleResponse, handler);
    }

    private void handleResponse(ProtonDelivery protonDelivery, Message message) {
        TriTuple<Promise<R>, BiFunction<Message, ProtonDelivery, R>, Span> remove = this.replyMap.remove(message.getCorrelationId());
        if (remove == null) {
            LOG.debug("discarding unexpected response [reply-to: {}, correlation ID: {}]", this.replyToAddress, message.getCorrelationId());
            ProtonHelper.rejected(protonDelivery, true);
            return;
        }
        R apply = remove.two().apply(message, protonDelivery);
        Span three = remove.three();
        if (apply == null) {
            LOG.debug("discarding malformed response [reply-to: {}, correlation ID: {}]", this.replyToAddress, message.getCorrelationId());
            remove.one().handle((AsyncResult<R>) Future.failedFuture(new ServerErrorException(JsonLocation.MAX_CONTENT_SNIPPET, "cannot process response from service [" + this.responseEndPointName + "]")));
            ProtonHelper.released(protonDelivery, true);
            return;
        }
        LOG.debug("received response [reply-to: {}, subject: {}, correlation ID: {}, status: {}, cache-directive: {}]", this.replyToAddress, message.getSubject(), message.getCorrelationId(), Integer.valueOf(apply.getStatus()), apply.getCacheDirective());
        if (three != null) {
            three.log("response from peer accepted");
        }
        remove.one().handle((AsyncResult<R>) Future.succeededFuture(apply));
        if (protonDelivery.isSettled()) {
            return;
        }
        LOG.debug("client provided response handler did not settle message, auto-accepting ...");
        ProtonHelper.accepted(protonDelivery, true);
    }

    private boolean cancelRequest(Object obj, AsyncResult<R> asyncResult) {
        Objects.requireNonNull(obj);
        Objects.requireNonNull(asyncResult);
        if (asyncResult.succeeded()) {
            throw new IllegalArgumentException("result must be failed");
        }
        Objects.requireNonNull(asyncResult);
        return cancelRequest(obj, asyncResult::cause);
    }

    private boolean cancelRequest(Object obj, Supplier<Throwable> supplier) {
        Objects.requireNonNull(obj);
        Objects.requireNonNull(supplier);
        return ((Boolean) Optional.ofNullable(this.replyMap.remove(obj)).map(triTuple -> {
            Throwable th = (Throwable) supplier.get();
            LOG.debug("canceling request [target: {}, correlation ID: {}]: {}", this.linkTargetAddress, obj, th.getMessage());
            ((Promise) triTuple.one()).fail(th);
            return true;
        }).orElse(false)).booleanValue();
    }

    private Message createMessage(String str, String str2, Map<String, Object> map) {
        Objects.requireNonNull(str);
        Message message = ProtonHelper.message();
        AbstractHonoClient.setApplicationProperties(message, map);
        message.setAddress(str2);
        message.setReplyTo(this.replyToAddress);
        message.setMessageId(this.messageIdSupplier.get());
        message.setSubject(str);
        return message;
    }

    public final Future<R> createAndSendRequest(String str, Map<String, Object> map, Buffer buffer, String str2, Function<Message, R> function, Span span) {
        return createAndSendRequest(str, this.linkTargetAddress, map, buffer, str2, function, span);
    }

    public final Future<R> createAndSendRequest(String str, String str2, Map<String, Object> map, Buffer buffer, String str3, Function<Message, R> function, Span span) {
        Objects.requireNonNull(function);
        return createAndSendRequest(str, str2, map, buffer, str3, (message, protonDelivery) -> {
            return (RequestResponseResult) function.apply(message);
        }, span);
    }

    public final Future<R> createAndSendRequest(String str, String str2, Map<String, Object> map, Buffer buffer, String str3, BiFunction<Message, ProtonDelivery, R> biFunction, Span span) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(span);
        Objects.requireNonNull(biFunction);
        if (!isOpen()) {
            return Future.failedFuture(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "sender and/or receiver link is not open"));
        }
        Message createMessage = createMessage(str, str2, map);
        MessageHelper.setPayload(createMessage, str3, buffer);
        return sendRequest(createMessage, biFunction, span);
    }

    private Future<R> sendRequest(Message message, BiFunction<Message, ProtonDelivery, R> biFunction, Span span) {
        String str = (String) Optional.ofNullable(message.getAddress()).orElse(this.linkTargetAddress);
        Tags.MESSAGE_BUS_DESTINATION.set(span, str);
        Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_CLIENT);
        Tags.HTTP_METHOD.set(span, message.getSubject());
        if (this.tenantId != null) {
            span.setTag(MessageHelper.APP_PROPERTY_TENANT_ID, this.tenantId);
        }
        return this.connection.executeOnContext(promise -> {
            if (this.sender.sendQueueFull()) {
                LOG.debug("cannot send request to peer, no credit left for link [link target: {}]", this.linkTargetAddress);
                promise.fail(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "no credit available for sending request"));
                this.sampler.queueFull(this.tenantId);
                return;
            }
            HashMap hashMap = new HashMap(3);
            Object orElse = Optional.ofNullable(message.getCorrelationId()).orElse(message.getMessageId());
            if (orElse instanceof String) {
                hashMap.put(TracingHelper.TAG_CORRELATION_ID.getKey(), orElse);
            }
            hashMap.put(TracingHelper.TAG_CREDIT.getKey(), Integer.valueOf(this.sender.getCredit()));
            hashMap.put(TracingHelper.TAG_QOS.getKey(), this.sender.getQoS().toString());
            span.log(hashMap);
            TriTuple<Promise<R>, BiFunction<Message, ProtonDelivery, R>, Span> of = TriTuple.of(promise, biFunction, span);
            TracingHelper.injectSpanContext(this.connection.getTracer(), span.context(), message);
            this.replyMap.put(orElse, of);
            SendMessageSampler.Sample start = this.sampler.start(this.tenantId);
            this.sender.send(message, protonDelivery -> {
                Promise promise = Promise.promise();
                DeliveryState remoteState = protonDelivery.getRemoteState();
                start.completed(remoteState);
                if (Rejected.class.isInstance(remoteState)) {
                    Rejected rejected = (Rejected) remoteState;
                    if (rejected.getError() != null) {
                        LOG.debug("service did not accept request [target address: {}, subject: {}, correlation ID: {}]: {}", str, message.getSubject(), orElse, rejected.getError());
                        promise.fail(StatusCodeMapper.fromTransferError(rejected.getError()));
                        cancelRequest(orElse, promise.future());
                        return;
                    } else {
                        LOG.debug("service did not accept request [target address: {}, subject: {}, correlation ID: {}]", str, message.getSubject(), orElse);
                        promise.fail(new ClientErrorException(400));
                        cancelRequest(orElse, promise.future());
                        return;
                    }
                }
                if (Accepted.class.isInstance(remoteState)) {
                    LOG.trace("service has accepted request [target address: {}, subject: {}, correlation ID: {}]", str, message.getSubject(), orElse);
                    span.log("request accepted by peer");
                    if (message.getReplyTo() == null) {
                        if (this.replyMap.remove(orElse) != null) {
                            promise.complete();
                            return;
                        } else {
                            LOG.trace("accepted request won't be acted upon, request already cancelled [target address: {}, subject: {}, correlation ID: {}]", str, message.getSubject(), orElse);
                            return;
                        }
                    }
                    return;
                }
                if (Released.class.isInstance(remoteState)) {
                    LOG.debug("service did not accept request [target address: {}, subject: {}, correlation ID: {}], remote state: {}", str, message.getSubject(), orElse, remoteState);
                    promise.fail(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE));
                    cancelRequest(orElse, promise.future());
                } else if (Modified.class.isInstance(remoteState)) {
                    LOG.debug("service did not accept request [target address: {}, subject: {}, correlation ID: {}], remote state: {}", str, message.getSubject(), orElse, remoteState);
                    promise.fail(((Modified) protonDelivery.getRemoteState()).getUndeliverableHere().booleanValue() ? new ClientErrorException(404) : new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE));
                    cancelRequest(orElse, promise.future());
                } else if (remoteState == null) {
                    LOG.warn("got undefined delivery state for service request{} [target address: {}, subject: {}, correlation ID: {}]", !this.sender.isOpen() ? ", sender link was closed in between" : "", str, message.getSubject(), orElse);
                    promise.fail(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE));
                    cancelRequest(orElse, promise.future());
                }
            });
            if (this.requestTimeoutMillis > 0) {
                this.connection.getVertx().setTimer(this.requestTimeoutMillis, l -> {
                    if (cancelRequest(orElse, () -> {
                        return new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "request timed out after " + this.requestTimeoutMillis + "ms");
                    })) {
                        start.timeout();
                    }
                });
            }
            if (LOG.isDebugEnabled()) {
                String deviceId = MessageHelper.getDeviceId(message);
                if (deviceId == null) {
                    LOG.debug("sent request [target address: {}, subject: {}, correlation ID: {}] to service", str, message.getSubject(), orElse);
                } else {
                    LOG.debug("sent request [target address: {}, subject: {}, correlation ID: {}, device ID: {}] to service", str, message.getSubject(), orElse, deviceId);
                }
            }
        });
    }

    public final boolean isOpen() {
        return HonoProtonHelper.isLinkOpenAndConnected(this.sender) && HonoProtonHelper.isLinkOpenAndConnected(this.receiver);
    }

    public final Future<Void> close() {
        LOG.debug("closing request-response client ...");
        return closeLinks();
    }
}
