package org.eclipse.hono.client.kafka.producer;

import io.opentracing.References;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.EncodeException;
import io.vertx.ext.web.handler.TimeoutHandler;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.KafkaProducerConfigProperties;
import org.eclipse.hono.client.kafka.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.tracing.KafkaTracingHelper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.propertyeditors.StringArrayPropertyEditor;

/* loaded from: input_file:BOOT-INF/lib/hono-client-kafka-common-1.8.2.jar:org/eclipse/hono/client/kafka/producer/AbstractKafkaBasedMessageSender.class */
public abstract class AbstractKafkaBasedMessageSender implements Lifecycle {
    protected final Tracer tracer;
    private final KafkaProducerConfigProperties config;
    private final KafkaProducerFactory<String, Buffer> producerFactory;
    private final String producerName;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private boolean stopped = false;

    public AbstractKafkaBasedMessageSender(KafkaProducerFactory<String, Buffer> kafkaProducerFactory, String str, KafkaProducerConfigProperties kafkaProducerConfigProperties, Tracer tracer) {
        Objects.requireNonNull(kafkaProducerFactory);
        Objects.requireNonNull(str);
        Objects.requireNonNull(kafkaProducerConfigProperties);
        Objects.requireNonNull(tracer);
        this.producerFactory = kafkaProducerFactory;
        this.producerName = str;
        this.config = kafkaProducerConfigProperties;
        this.tracer = tracer;
    }

    @Override // org.eclipse.hono.util.Lifecycle
    public Future<Void> start() {
        this.stopped = false;
        getOrCreateProducer();
        return Future.succeededFuture();
    }

    @Override // org.eclipse.hono.util.Lifecycle
    public Future<Void> stop() {
        this.stopped = true;
        return this.producerFactory.closeProducer(this.producerName);
    }

    protected void send(String str, String str2, String str3, Buffer buffer, Map<String, Object> map, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        Objects.requireNonNull(map);
        Span startSpan = startSpan(str, str2, str3, References.FOLLOWS_FROM, spanContext);
        send(str, str2, str3, buffer, encodePropertiesAsKafkaHeaders(map, startSpan), startSpan);
    }

    protected void send(String str, String str2, String str3, Buffer buffer, List<KafkaHeader> list, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        Objects.requireNonNull(list);
        send(str, str2, str3, buffer, list, startSpan(str, str2, str3, References.FOLLOWS_FROM, spanContext));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Future<Void> sendAndWaitForOutcome(String str, String str2, String str3, Buffer buffer, Map<String, Object> map, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        Objects.requireNonNull(map);
        Span startSpan = startSpan(str, str2, str3, References.CHILD_OF, spanContext);
        return send(str, str2, str3, buffer, encodePropertiesAsKafkaHeaders(map, startSpan), startSpan);
    }

    protected Future<Void> sendAndWaitForOutcome(String str, String str2, String str3, Buffer buffer, List<KafkaHeader> list, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        Objects.requireNonNull(list);
        return send(str, str2, str3, buffer, list, startSpan(str, str2, str3, References.CHILD_OF, spanContext));
    }

    private Future<Void> send(String str, String str2, String str3, Buffer buffer, List<KafkaHeader> list, Span span) {
        if (this.stopped) {
            return Future.failedFuture(new ServerErrorException(TimeoutHandler.DEFAULT_ERRORCODE, "sender already stopped"));
        }
        KafkaProducerRecord<String, Buffer> create = KafkaProducerRecord.create(str, str3, buffer);
        Promise promise = Promise.promise();
        this.log.trace("sending message to Kafka [topic: {}, tenantId: {}, deviceId: {}]", str, str2, str3);
        create.addHeaders(list);
        KafkaTracingHelper.injectSpanContext(this.tracer, create, span.context());
        logProducerRecord(span, create);
        getOrCreateProducer().send(create, promise);
        return promise.future().recover(th -> {
            logError(span, str, str2, str3, th);
            span.finish();
            return Future.failedFuture(new ServerErrorException(getErrorCode(th), th));
        }).map(recordMetadata -> {
            logRecordMetadata(span, str3, recordMetadata);
            span.finish();
            return null;
        });
    }

    private KafkaProducer<String, Buffer> getOrCreateProducer() {
        return this.producerFactory.getOrCreateProducer(this.producerName, this.config);
    }

    private List<KafkaHeader> encodePropertiesAsKafkaHeaders(Map<String, Object> map, Span span) {
        ArrayList arrayList = new ArrayList();
        map.forEach((str, obj) -> {
            try {
                arrayList.add(KafkaRecordHelper.createKafkaHeader(str, obj));
            } catch (EncodeException e) {
                this.log.info("failed to serialize property with key [{}] to Kafka header", str);
                span.log("failed to create Kafka header from property: " + str);
            }
        });
        return arrayList;
    }

    private Span startSpan(String str, String str2, String str3, String str4, SpanContext spanContext) {
        return KafkaTracingHelper.newProducerSpan(this.tracer, str, str4, spanContext).setTag(TracingHelper.TAG_TENANT_ID.getKey(), str2).setTag(TracingHelper.TAG_DEVICE_ID.getKey(), str3);
    }

    private void logProducerRecord(Span span, KafkaProducerRecord<String, Buffer> kafkaProducerRecord) {
        String str = (String) kafkaProducerRecord.headers().stream().map(kafkaHeader -> {
            return kafkaHeader.key() + "=" + kafkaHeader.value();
        }).collect(Collectors.joining(StringArrayPropertyEditor.DEFAULT_SEPARATOR, "{", "}"));
        this.log.trace("producing message [topic: {}, key: {}, partition: {}, timestamp: {}, headers: {}]", kafkaProducerRecord.topic(), kafkaProducerRecord.key(), kafkaProducerRecord.partition(), kafkaProducerRecord.timestamp(), str);
        span.log("producing message with headers: " + str);
    }

    private void logRecordMetadata(Span span, String str, RecordMetadata recordMetadata) {
        this.log.trace("message produced to Kafka [topic: {}, key: {}, partition: {}, offset: {}, timestamp: {}]", recordMetadata.getTopic(), str, Integer.valueOf(recordMetadata.getPartition()), Long.valueOf(recordMetadata.getOffset()), Long.valueOf(recordMetadata.getTimestamp()));
        span.log("message produced to Kafka");
        KafkaTracingHelper.setRecordMetadataTags(span, recordMetadata);
        Tags.HTTP_STATUS.set(span, (Integer) 202);
    }

    private void logError(Span span, String str, String str2, String str3, Throwable th) {
        this.log.debug("sending message failed [topic: {}, key: {}, tenantId: {}, deviceId: {}]", str, str3, str2, str3, th);
        Tags.HTTP_STATUS.set(span, Integer.valueOf(getErrorCode(th)));
        TracingHelper.logError(span, th);
    }

    private int getErrorCode(Throwable th) {
        return TimeoutHandler.DEFAULT_ERRORCODE;
    }
}
