package org.apache.falcon.workflow;

import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.aspect.GenericAlert;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/falcon/workflow/WorkflowJobEndNotificationService.class */
public class WorkflowJobEndNotificationService implements FalconService {
    private Set<WorkflowExecutionListener> listeners = new LinkedHashSet();
    private Map<String, Properties> contextMap = new ConcurrentHashMap();
    private static final Logger LOG = LoggerFactory.getLogger(WorkflowJobEndNotificationService.class);
    public static final String SERVICE_NAME = WorkflowJobEndNotificationService.class.getSimpleName();
    private static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();

    @Override // org.apache.falcon.service.FalconService
    public String getName() {
        return SERVICE_NAME;
    }

    Map<String, Properties> getContextMap() {
        return this.contextMap;
    }

    @Override // org.apache.falcon.service.FalconService
    public void init() throws FalconException {
        String property = StartupProperties.get().getProperty("workflow.execution.listeners");
        if (StringUtils.isEmpty(property)) {
            return;
        }
        for (String str : property.split(",")) {
            String trim = str.trim();
            if (!trim.isEmpty()) {
                registerListener((WorkflowExecutionListener) ReflectionUtils.getInstanceByClassName(trim));
            }
        }
    }

    @Override // org.apache.falcon.service.FalconService
    public void destroy() throws FalconException {
        this.listeners.clear();
    }

    public void registerListener(WorkflowExecutionListener workflowExecutionListener) {
        this.listeners.add(workflowExecutionListener);
    }

    public void unregisterListener(WorkflowExecutionListener workflowExecutionListener) {
        this.listeners.remove(workflowExecutionListener);
    }

    public void notifyFailure(WorkflowExecutionContext workflowExecutionContext) {
        notifyWorkflowEnd(workflowExecutionContext);
    }

    public void notifySuccess(WorkflowExecutionContext workflowExecutionContext) {
        notifyWorkflowEnd(workflowExecutionContext);
    }

    public void notifyStart(WorkflowExecutionContext workflowExecutionContext) {
        updateContextFromWFConf(workflowExecutionContext);
        LOG.debug("Sending workflow start notification to listeners with context : {} ", workflowExecutionContext);
        for (WorkflowExecutionListener workflowExecutionListener : this.listeners) {
            try {
                workflowExecutionListener.onStart(workflowExecutionContext);
            } catch (Throwable th) {
                LOG.error("Error in listener {}", workflowExecutionListener.getClass().getName(), th);
            }
        }
    }

    public void notifySuspend(WorkflowExecutionContext workflowExecutionContext) {
        updateContextFromWFConf(workflowExecutionContext);
        LOG.debug("Sending workflow suspend notification to listeners with context : {} ", workflowExecutionContext);
        for (WorkflowExecutionListener workflowExecutionListener : this.listeners) {
            try {
                workflowExecutionListener.onSuspend(workflowExecutionContext);
            } catch (Throwable th) {
                LOG.error("Error in listener {}", workflowExecutionListener.getClass().getName(), th);
            }
        }
        instrumentAlert(workflowExecutionContext);
        this.contextMap.remove(workflowExecutionContext.getWorkflowId());
    }

    public void notifyWait(WorkflowExecutionContext workflowExecutionContext) {
        updateContextFromWFConf(workflowExecutionContext);
        LOG.debug("Sending workflow wait notification to listeners with context : {} ", workflowExecutionContext);
        for (WorkflowExecutionListener workflowExecutionListener : this.listeners) {
            try {
                workflowExecutionListener.onWait(workflowExecutionContext);
            } catch (Throwable th) {
                LOG.error("Error in listener {}", workflowExecutionListener.getClass().getName(), th);
            }
        }
    }

    private void updateContextFromWFConf(WorkflowExecutionContext workflowExecutionContext) {
        try {
            Properties properties = this.contextMap.get(workflowExecutionContext.getWorkflowId());
            if (properties == null) {
                Entity entity = CONFIG_STORE.get(EntityType.valueOf(workflowExecutionContext.getEntityType()), workflowExecutionContext.getEntityName());
                if (entity == null) {
                    return;
                }
                Iterator<String> it = EntityUtil.getClustersDefinedInColos(entity).iterator();
                while (it.hasNext()) {
                    try {
                        InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine().getJobDetails(it.next(), workflowExecutionContext.getWorkflowId()).getInstances();
                        if (instances != null && instances.length > 0) {
                            properties = getWFProps(instances[0].getWfParams());
                            properties.setProperty(WorkflowExecutionArgs.RUN_ID.getName(), Integer.toString(instances[0].getRunId()));
                        }
                        this.contextMap.put(workflowExecutionContext.getWorkflowId(), properties);
                    } catch (FalconException e) {
                    }
                }
            }
            if (properties == null || properties.isEmpty()) {
                return;
            }
            for (WorkflowExecutionArgs workflowExecutionArgs : WorkflowExecutionArgs.values()) {
                if (properties.containsKey(workflowExecutionArgs.getName())) {
                    workflowExecutionContext.setValue(workflowExecutionArgs, properties.getProperty(workflowExecutionArgs.getName()));
                }
            }
        } catch (FalconException e2) {
            LOG.error("Unable to retrieve entity {} of type {} from config store.", e2);
        }
    }

    private Properties getWFProps(InstancesResult.KeyValuePair[] keyValuePairArr) {
        Properties properties = new Properties();
        for (InstancesResult.KeyValuePair keyValuePair : keyValuePairArr) {
            properties.put(keyValuePair.getKey(), keyValuePair.getValue());
        }
        return properties;
    }

    private void notifyWorkflowEnd(WorkflowExecutionContext workflowExecutionContext) {
        if (workflowExecutionContext.getContextType() == WorkflowExecutionContext.Type.POST_PROCESSING) {
            boolean z = false;
            try {
                z = WorkflowEngineFactory.getWorkflowEngine().isNotificationEnabled(workflowExecutionContext.getClusterName(), workflowExecutionContext.getWorkflowId());
            } catch (FalconException e) {
                LOG.debug("Received error while checking if notification is enabled. Hence, assuming notification is not enabled.");
            }
            if (z) {
                LOG.info("Ignoring message from post processing as engine notification is enabled.");
                return;
            }
            updateContextWithTime(workflowExecutionContext);
        } else {
            updateContextFromWFConf(workflowExecutionContext);
        }
        LOG.debug("Sending workflow end notification to listeners with context : {} ", workflowExecutionContext);
        for (WorkflowExecutionListener workflowExecutionListener : this.listeners) {
            try {
                if (workflowExecutionContext.hasWorkflowSucceeded()) {
                    workflowExecutionListener.onSuccess(workflowExecutionContext);
                    instrumentAlert(workflowExecutionContext);
                } else {
                    workflowExecutionListener.onFailure(workflowExecutionContext);
                    if (workflowExecutionContext.hasWorkflowBeenKilled() || workflowExecutionContext.hasWorkflowFailed()) {
                        instrumentAlert(workflowExecutionContext);
                    }
                }
            } catch (Throwable th) {
                LOG.error("Error in listener {}", workflowExecutionListener.getClass().getName(), th);
            }
        }
        this.contextMap.remove(workflowExecutionContext.getWorkflowId());
    }

    private void updateContextWithTime(WorkflowExecutionContext workflowExecutionContext) {
        try {
            InstancesResult jobDetails = WorkflowEngineFactory.getWorkflowEngine().getJobDetails(workflowExecutionContext.getClusterName(), workflowExecutionContext.getWorkflowId());
            Date date = jobDetails.getInstances()[0].startTime;
            Date date2 = jobDetails.getInstances()[0].endTime;
            Date date3 = new Date();
            if (date == null) {
                date = date3;
            }
            if (date2 == null) {
                date2 = date3;
            }
            workflowExecutionContext.setValue(WorkflowExecutionArgs.WF_START_TIME, Long.toString(date.getTime()));
            workflowExecutionContext.setValue(WorkflowExecutionArgs.WF_END_TIME, Long.toString(date2.getTime()));
        } catch (FalconException e) {
            LOG.error("Unable to retrieve job details for " + workflowExecutionContext.getWorkflowId() + " on cluster " + workflowExecutionContext.getClusterName(), e);
        }
    }

    private void instrumentAlert(WorkflowExecutionContext workflowExecutionContext) {
        String clusterName = workflowExecutionContext.getClusterName();
        String entityName = workflowExecutionContext.getEntityName();
        String entityType = workflowExecutionContext.getEntityType();
        String name = workflowExecutionContext.getOperation().name();
        String workflowId = workflowExecutionContext.getWorkflowId();
        String workflowUser = workflowExecutionContext.getWorkflowUser();
        String nominalTimeAsISO8601 = workflowExecutionContext.getNominalTimeAsISO8601();
        String valueOf = String.valueOf(workflowExecutionContext.getWorkflowRunId());
        Date date = new Date();
        Date date2 = workflowExecutionContext.getWorkflowEndTime() == 0 ? date : new Date(workflowExecutionContext.getWorkflowEndTime());
        Date date3 = workflowExecutionContext.getWorkflowStartTime() == 0 ? date : new Date(workflowExecutionContext.getWorkflowStartTime());
        Long valueOf2 = Long.valueOf((date2.getTime() - date3.getTime()) * 1000000);
        if (workflowExecutionContext.hasWorkflowFailed()) {
            GenericAlert.instrumentFailedInstance(clusterName, entityType, entityName, nominalTimeAsISO8601, workflowId, workflowUser, valueOf, name, SchemaHelper.formatDateUTC(date3), "", "", valueOf2.longValue());
        } else {
            GenericAlert.instrumentSucceededInstance(clusterName, entityType, entityName, nominalTimeAsISO8601, workflowId, workflowUser, valueOf, name, SchemaHelper.formatDateUTC(date3), valueOf2.longValue());
        }
    }
}
