package org.genemania.broker;

import java.io.IOException;
import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.genemania.Constants;
import org.genemania.broker.Constants;
import org.genemania.dto.EnrichmentEngineResponseDto;
import org.genemania.dto.NetworkDto;
import org.genemania.dto.RelatedGenesEngineRequestDto;
import org.genemania.dto.RelatedGenesEngineResponseDto;
import org.genemania.dto.UploadNetworkEngineResponseDto;
import org.genemania.engine.IMania;
import org.genemania.engine.Mania2;
import org.genemania.engine.cache.DataCache;
import org.genemania.engine.cache.FileSerializedObjectCache;
import org.genemania.engine.cache.MemObjectCache;
import org.genemania.exception.ApplicationException;
import org.genemania.message.RelatedGenesRequestMessage;
import org.genemania.message.RelatedGenesResponseMessage;
import org.genemania.message.UploadNetworkRequestMessage;
import org.genemania.message.UploadNetworkResponseMessage;
import org.genemania.util.ApplicationConfig;
import org.genemania.util.BrokerUtils;

/* JADX WARN: Classes with same name are omitted:
  
 */
/* loaded from: input_file:org/genemania/broker/Worker.class */
public class Worker implements MessageListener, ExceptionListener, TransportListener {
    private static Logger LOG = Logger.getLogger(Worker.class);
    private static final String CONFIG_MESSAGE_EXPIRATION_MILLIS = "messageExpirationMillis";
    private String appVer;
    private String brokerUrl;
    private Session session;
    private MessageConsumer requestHandler;
    private MessageProducer responseHandler;
    private IMania engine;
    private String cacheDir;
    private String mqRequestsQueueName;
    private long messageExpirationMillis;
    private int processedMessages = 0;
    private long checkisActivePollingIntervalMillis = 60000;
    private boolean active = true;

    public static void main(String[] strArr) {
        new Worker().start();
    }

    public Worker() {
        config();
    }

    public void start() {
        try {
            if (StringUtils.isEmpty(this.cacheDir)) {
                LOG.error("Missing required parameter: engine cache dir. Exiting...");
                System.exit(1);
            }
            this.engine = new Mania2(new DataCache(new MemObjectCache(new FileSerializedObjectCache(this.cacheDir))));
            LOG.info("GeneMANIA Worker ver: " + this.appVer);
            LOG.info("Engine ver: " + this.engine.getVersion());
            LOG.info("cache dir: " + this.cacheDir);
            LOG.info("broker URL: " + this.brokerUrl);
            LOG.info("request Queue Name: " + this.mqRequestsQueueName);
            LOG.info("messageExpirationMillis: " + this.messageExpirationMillis);
            startNewConnection();
            waitForExit();
        } catch (JMSException e) {
            LOG.error("Worker startup error", e);
        }
    }

    public void waitForExit() {
        while (isActive()) {
            try {
                Thread.sleep(this.checkisActivePollingIntervalMillis);
            } catch (InterruptedException e) {
            }
        }
    }

    @Override // javax.jms.MessageListener
    public synchronized void onMessage(Message message) {
        if (!(message instanceof TextMessage)) {
            LOG.warn("Unexpected message instance type: " + message.getClass().getName());
            return;
        }
        try {
            TextMessage textMessage = (TextMessage) message;
            LOG.debug("new " + message.getJMSType() + " message received on queue " + ((Queue) message.getJMSDestination()).getQueueName() + "[correlation id: " + message.getJMSCorrelationID() + DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END);
            String invokeEngine = invokeEngine(textMessage.getJMSType(), textMessage.getText());
            LOG.debug("Responding to " + textMessage.getJMSDestination() + ", msg id " + textMessage.getJMSCorrelationID() + ", response body size " + invokeEngine.length());
            TextMessage createTextMessage = this.session.createTextMessage();
            createTextMessage.setJMSDestination(textMessage.getJMSReplyTo());
            createTextMessage.setJMSDeliveryMode(2);
            createTextMessage.setJMSCorrelationID(message.getJMSCorrelationID());
            createTextMessage.setText(invokeEngine);
            this.responseHandler.send(createTextMessage.getJMSDestination(), createTextMessage);
            this.processedMessages++;
            LOG.info("successfully processed messages: " + this.processedMessages);
        } catch (JMSException e) {
            LOG.error("JMS Exception: ", e);
        }
    }

    String invokeEngine(String str, String str2) {
        String buildErrorMessage;
        if (MessageType.RELATED_GENES.equals(MessageType.fromCode(str))) {
            buildErrorMessage = getRelatedGenes(RelatedGenesRequestMessage.fromXml(str2)).toXml();
        } else if (MessageType.TEXT2NETWORK.equals(MessageType.fromCode(str))) {
            buildErrorMessage = uploadNetwork(UploadNetworkRequestMessage.fromXml(str2)).toXml();
        } else {
            LOG.warn("Unknown message type: " + str);
            buildErrorMessage = buildErrorMessage("Unknown message type");
        }
        return buildErrorMessage;
    }

    @Override // javax.jms.ExceptionListener
    public synchronized void onException(JMSException jMSException) {
        LOG.error("JMS Exception detected.", jMSException);
    }

    private void config() {
        ApplicationConfig applicationConfig = ApplicationConfig.getInstance();
        this.appVer = applicationConfig.getProperty(Constants.CONFIG_PROPERTIES.APP_VER);
        this.brokerUrl = applicationConfig.getProperty(Constants.CONFIG_PROPERTIES.BROKER_URL);
        this.mqRequestsQueueName = applicationConfig.getProperty(Constants.CONFIG_PROPERTIES.MQ_REQUESTS_QUEUE_NAME);
        this.cacheDir = applicationConfig.getProperty(Constants.CONFIG_PROPERTIES.CACHE_DIR);
        this.messageExpirationMillis = Integer.parseInt(applicationConfig.getProperty(CONFIG_MESSAGE_EXPIRATION_MILLIS));
    }

    private void startNewConnection() throws JMSException {
        Connection createConnection = new PooledConnectionFactory(this.brokerUrl).createConnection();
        createConnection.setExceptionListener(this);
        ((ActiveMQConnection) ((PooledConnection) createConnection).getConnection()).addTransportListener(this);
        this.session = createConnection.createSession(false, 1);
        this.responseHandler = this.session.createProducer(null);
        this.responseHandler.setTimeToLive(this.messageExpirationMillis);
        Queue createQueue = this.session.createQueue(this.mqRequestsQueueName);
        this.requestHandler = this.session.createConsumer(createQueue);
        this.requestHandler.setMessageListener(this);
        createConnection.start();
        LOG.info("Listening to " + createQueue.getQueueName());
    }

    private RelatedGenesResponseMessage getRelatedGenes(RelatedGenesRequestMessage relatedGenesRequestMessage) {
        RelatedGenesResponseMessage relatedGenesResponseMessage = new RelatedGenesResponseMessage();
        try {
            RelatedGenesEngineRequestDto msg2dto = BrokerUtils.msg2dto(relatedGenesRequestMessage);
            RelatedGenesEngineResponseDto findRelated = this.engine.findRelated(msg2dto);
            printEngineReturn(findRelated);
            EnrichmentEngineResponseDto enrichmentEngineResponseDto = null;
            try {
                enrichmentEngineResponseDto = this.engine.computeEnrichment(BrokerUtils.buildEnrichmentRequestFrom(msg2dto, findRelated, relatedGenesRequestMessage.getOntologyId()));
            } catch (Exception e) {
                LOG.error("Failed to compute enrichment", e);
            }
            if (enrichmentEngineResponseDto != null) {
                LOG.debug("enriched categories size:" + enrichmentEngineResponseDto.getEnrichedCategories().size());
                relatedGenesResponseMessage = BrokerUtils.dto2msg(findRelated, enrichmentEngineResponseDto);
            } else {
                relatedGenesResponseMessage = BrokerUtils.dto2msg(findRelated);
                LOG.warn("enriched categories response DTO is null");
            }
            relatedGenesResponseMessage.setNodes(findRelated.getNodes());
            relatedGenesResponseMessage.setOrganismId(relatedGenesRequestMessage.getOrganismId());
            printConverterReturn(relatedGenesResponseMessage);
        } catch (ApplicationException e2) {
            LOG.error("Failed to get related genes: ", e2);
            relatedGenesResponseMessage.setErrorCode(2);
            relatedGenesResponseMessage.setErrorMessage(e2.getMessage());
        }
        return relatedGenesResponseMessage;
    }

    private UploadNetworkResponseMessage uploadNetwork(UploadNetworkRequestMessage uploadNetworkRequestMessage) {
        UploadNetworkResponseMessage uploadNetworkResponseMessage = new UploadNetworkResponseMessage();
        try {
            UploadNetworkEngineResponseDto uploadNetwork = this.engine.uploadNetwork(BrokerUtils.msg2dto(uploadNetworkRequestMessage));
            LOG.debug(uploadNetwork.toString());
            uploadNetworkResponseMessage = BrokerUtils.dto2msg(uploadNetwork);
        } catch (ApplicationException e) {
            LOG.error("Failed to load network", e);
            uploadNetworkResponseMessage.setErrorCode(2);
            uploadNetworkResponseMessage.setErrorMessage(e.getMessage());
        }
        return uploadNetworkResponseMessage;
    }

    private void printConverterReturn(RelatedGenesResponseMessage relatedGenesResponseMessage) {
        StringBuffer stringBuffer = new StringBuffer("dto2msg returned " + relatedGenesResponseMessage.getNetworks().size() + " networks=[");
        Iterator<NetworkDto> it = relatedGenesResponseMessage.getNetworks().iterator();
        while (it.hasNext()) {
            stringBuffer.append(it.next().getId());
            if (it.hasNext()) {
                stringBuffer.append(" ");
            }
        }
        stringBuffer.append(DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END);
        stringBuffer.append(" and " + relatedGenesResponseMessage.getAnnotations().size() + " annotations.");
        LOG.debug(stringBuffer);
    }

    private void printEngineReturn(RelatedGenesEngineResponseDto relatedGenesEngineResponseDto) {
        StringBuffer stringBuffer = new StringBuffer("engine returned " + relatedGenesEngineResponseDto.getNetworks().size() + " networks=[");
        Iterator<NetworkDto> it = relatedGenesEngineResponseDto.getNetworks().iterator();
        while (it.hasNext()) {
            stringBuffer.append(it.next().getId());
            if (it.hasNext()) {
                stringBuffer.append(" ");
            }
        }
        stringBuffer.append(DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END);
        if (relatedGenesEngineResponseDto.getAttributes() != null) {
            stringBuffer.append(" and " + relatedGenesEngineResponseDto.getAttributes().size() + " attributes");
        } else {
            stringBuffer.append(" and 0 attributes");
        }
        LOG.debug(stringBuffer);
    }

    private String buildErrorMessage(String str) {
        RelatedGenesResponseMessage relatedGenesResponseMessage = new RelatedGenesResponseMessage();
        relatedGenesResponseMessage.setErrorCode(2);
        relatedGenesResponseMessage.setErrorMessage(str);
        return relatedGenesResponseMessage.toXml();
    }

    public synchronized boolean isActive() {
        return this.active;
    }

    public synchronized void setActive(boolean z) {
        this.active = z;
    }

    @Override // org.apache.activemq.transport.TransportListener
    public void onCommand(Object obj) {
        LOG.trace("Transport event 'Command': " + obj);
    }

    @Override // org.apache.activemq.transport.TransportListener
    public void onException(IOException iOException) {
        LOG.warn("Transport event 'Exception'", iOException);
    }

    @Override // org.apache.activemq.transport.TransportListener
    public void transportInterupted() {
        LOG.info("transport event 'Interrupted'");
    }

    @Override // org.apache.activemq.transport.TransportListener
    public void transportResumed() {
        LOG.info("transport event 'Resumed'");
    }
}
