/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror;

import java.util.List;
import java.util.Map;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.mirror.MirrorMaker;
import org.apache.kafka.connect.mirror.MirrorMakerConfig;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MirrorHerder
extends DistributedHerder {
    private static final Logger log = LoggerFactory.getLogger(MirrorHerder.class);
    private final MirrorMakerConfig config;
    private final SourceAndTarget sourceAndTarget;
    private boolean wasLeader;

    public MirrorHerder(MirrorMakerConfig mirrorConfig, SourceAndTarget sourceAndTarget, DistributedConfig config, Time time, Worker worker, String kafkaClusterId, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String restUrl, RestClient restClient, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy, List<String> restNamespace, AutoCloseable ... uponShutdown) {
        super(config, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, restUrl, restClient, connectorClientConfigOverridePolicy, restNamespace, uponShutdown);
        this.config = mirrorConfig;
        this.sourceAndTarget = sourceAndTarget;
    }

    protected void rebalanceSuccess() {
        if (this.isLeader()) {
            if (!this.wasLeader) {
                log.info("This node {} is now a leader for {}. Configuring connectors...", (Object)this, (Object)this.sourceAndTarget);
                this.configureConnectors();
            }
            this.wasLeader = true;
        } else {
            this.wasLeader = false;
        }
    }

    private void configureConnectors() {
        MirrorMaker.CONNECTOR_CLASSES.forEach(this::maybeConfigureConnector);
    }

    private void maybeConfigureConnector(Class<?> connectorClass) {
        Map<String, String> desiredConfig = this.config.connectorBaseConfig(this.sourceAndTarget, connectorClass);
        Map actualConfig = this.configState.connectorConfig(connectorClass.getSimpleName());
        if (actualConfig == null || !actualConfig.equals(desiredConfig)) {
            this.configureConnector(connectorClass.getSimpleName(), desiredConfig);
        } else {
            log.info("This node is a leader for {} and configuration for {} is already up to date.", (Object)this.sourceAndTarget, (Object)connectorClass.getSimpleName());
        }
    }

    private void configureConnector(String connectorName, Map<String, String> connectorProps) {
        this.putConnectorConfig(connectorName, connectorProps, true, (e, x) -> {
            if (e == null) {
                log.info("{} connector configured for {}.", (Object)connectorName, (Object)this.sourceAndTarget);
            } else if (e instanceof NotLeaderException) {
                log.info("This node lost leadership for {} while trying to update the connector configuration for {}. Using existing connector configuration.", (Object)connectorName, (Object)this.sourceAndTarget);
            } else {
                log.error("Failed to configure {} connector for {}", new Object[]{connectorName, this.sourceAndTarget, e});
            }
        });
    }
}

