/*
 * Decompiled with CFR 0.152.
 */
package com.bimface.message.redis;

import com.alibaba.fastjson.JSON;
import com.bimface.message.MessageListenerAware;
import com.bimface.message.MessageService;
import com.bimface.message.QueueMessage;
import com.bimface.message.exception.MessageException;
import com.bimface.utils.IdGenerator;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

public class RedisMessageServiceImpl
implements MessageService {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private static final String QUEUE_KEY_PREFIX = "bimface:mq:";
    private RedisTemplate<String, String> redisStringTemplate;
    private Map<String, Integer> queuePollTime = new HashMap<String, Integer>();
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

    public void setRedisStringTemplate(RedisTemplate<String, String> redisStringTemplate) {
        this.redisStringTemplate = redisStringTemplate;
    }

    private String getQueueKey(String queueName, Integer priority) {
        return QUEUE_KEY_PREFIX + queueName + ":" + priority;
    }

    private String getHideQueueKey(String queueName) {
        return QUEUE_KEY_PREFIX + queueName + ":hide";
    }

    private String getQueueMapKey(String queueName) {
        return QUEUE_KEY_PREFIX + queueName + ":map";
    }

    @Override
    public void send(String queueName, String messageId, Object object, Integer priority) throws MessageException {
        if (priority == null) {
            priority = 2;
        } else if (priority > 3) {
            priority = 3;
        } else if (priority < 0) {
            priority = 0;
        }
        if (StringUtils.isEmpty((Object)messageId)) {
            messageId = String.valueOf(IdGenerator.nextId());
        }
        QueueMessage queueMessage = new QueueMessage();
        queueMessage.setMessageId(messageId);
        queueMessage.setMessageBody(JSON.toJSONString((Object)object));
        queueMessage.setPriority(priority == 0 ? 1 : priority);
        String message = JSON.toJSONString((Object)queueMessage);
        this.pushMessage(queueName, priority, messageId, message);
    }

    private void pushMessage(final String queueName, final Integer priority, final String messageId, final String message) {
        this.redisStringTemplate.execute((SessionCallback)new SessionCallback<Object>(){

            public Object execute(RedisOperations redisOperations) throws DataAccessException {
                RedisOperations operations = redisOperations;
                operations.multi();
                operations.opsForHash().put((Object)RedisMessageServiceImpl.this.getQueueMapKey(queueName), (Object)messageId, (Object)message);
                if (priority == 0) {
                    operations.opsForList().rightPush((Object)RedisMessageServiceImpl.this.getQueueKey(queueName, 1), (Object)messageId);
                } else {
                    operations.opsForList().leftPush((Object)RedisMessageServiceImpl.this.getQueueKey(queueName, priority), (Object)messageId);
                }
                operations.exec();
                return null;
            }
        });
    }

    @Override
    public void receiveAndHandle(List<String> queueNames, MessageListenerAware listener, Map<String, Integer> queueMessageHandleTimeout) throws MessageException {
        if (CollectionUtils.isEmpty(queueNames)) {
            return;
        }
        String queueGroupKey = "";
        for (String queueName : queueNames) {
            queueGroupKey = queueGroupKey + ":" + queueName;
        }
        Integer pollTime = this.queuePollTime.get(queueGroupKey);
        int remainingPollTime = 0;
        int queueWaitTime = 0;
        if (pollTime != null) {
            remainingPollTime = pollTime;
            queueWaitTime = (int)Math.ceil(1.0 * (double)pollTime.intValue() / (double)(queueNames.size() * 2));
        }
        for (int priority = 1; priority <= 3; ++priority) {
            boolean getMessage = false;
            for (String queueName : queueNames) {
                String messageId = null;
                String hideQueueKey = this.getHideQueueKey(queueName);
                String queueKey = this.getQueueKey(queueName, priority);
                try {
                    if (remainingPollTime <= 0) {
                        messageId = (String)this.redisStringTemplate.opsForList().rightPopAndLeftPush((Object)queueKey, (Object)hideQueueKey);
                    } else {
                        messageId = (String)this.redisStringTemplate.opsForList().rightPopAndLeftPush((Object)queueKey, (Object)hideQueueKey, (long)queueWaitTime, TimeUnit.SECONDS);
                        remainingPollTime -= queueWaitTime;
                    }
                }
                catch (Throwable e) {
                    try {
                        Thread.sleep(pollTime != null ? (long)(pollTime * 1000) : 2000L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
                if (messageId == null) continue;
                getMessage = true;
                pollTime = null;
                this.queuePollTime.remove(queueGroupKey);
                String mapKey = this.getQueueMapKey(queueName);
                String message = (String)this.redisStringTemplate.opsForHash().get((Object)mapKey, (Object)messageId);
                if (message == null) {
                    this.logger.warn("message not found, messageId:[{}]", (Object)messageId);
                    this.redisStringTemplate.opsForList().remove((Object)hideQueueKey, 1L, (Object)messageId);
                    continue;
                }
                QueueMessage queueMessage = (QueueMessage)JSON.parseObject((String)message, QueueMessage.class);
                int holdTimes = queueMessage.getHoldTimes();
                if (holdTimes < 3) {
                    queueMessage.setHoldTimes(++holdTimes);
                    Integer timeout = null;
                    if (queueMessageHandleTimeout != null) {
                        timeout = queueMessageHandleTimeout.get(queueName);
                    }
                    if (timeout == null) {
                        timeout = 10;
                    }
                    Calendar c = Calendar.getInstance();
                    c.add(13, timeout);
                    queueMessage.setHoldOvertime(c.getTime());
                    this.redisStringTemplate.opsForHash().put((Object)mapKey, (Object)messageId, (Object)JSON.toJSONString((Object)queueMessage));
                    try {
                        listener.onMessage(queueMessage);
                    }
                    catch (Throwable e) {
                        if (holdTimes < 3) {
                            this.logger.error("handle message error", e);
                            this.unhideMessage(queueName, priority, messageId);
                        } else {
                            this.logger.error("message consume over 3 times, discard! message:[{}]", (Object)message);
                            this.deleteHideMessage(queueName, messageId);
                        }
                        throw new MessageException(e);
                    }
                } else {
                    this.logger.error("message consume over 3 times, discard! message:[{}]", (Object)message);
                }
                this.deleteHideMessage(queueName, messageId);
            }
            if (!getMessage) continue;
            return;
        }
        if (pollTime == null) {
            pollTime = 2;
        } else if (pollTime < 8) {
            pollTime = pollTime * 2;
        }
        this.queuePollTime.put(queueGroupKey, pollTime);
    }

    @Override
    public void unhideMessage(final String queueName, final int priority, final String messageId) {
        this.redisStringTemplate.execute((SessionCallback)new SessionCallback<Object>(){

            public Object execute(RedisOperations redisOperations) throws DataAccessException {
                RedisOperations operations = redisOperations;
                operations.multi();
                operations.opsForList().rightPush((Object)RedisMessageServiceImpl.this.getQueueKey(queueName, priority), (Object)messageId);
                operations.opsForList().remove((Object)RedisMessageServiceImpl.this.getHideQueueKey(queueName), 1L, (Object)messageId);
                operations.exec();
                return null;
            }
        });
    }

    @Override
    public void deleteHideMessage(final String queueName, final String messageId) {
        this.redisStringTemplate.execute((SessionCallback)new SessionCallback<Object>(){

            public Object execute(RedisOperations redisOperations) throws DataAccessException {
                RedisOperations operations = redisOperations;
                operations.multi();
                operations.opsForHash().delete((Object)RedisMessageServiceImpl.this.getQueueMapKey(queueName), new Object[]{messageId});
                operations.opsForList().remove((Object)RedisMessageServiceImpl.this.getHideQueueKey(queueName), 1L, (Object)messageId);
                operations.exec();
                return null;
            }
        });
    }

    @Override
    public void deleteMessage(final String queueName, final String messageId, final Integer priority) {
        this.redisStringTemplate.execute((SessionCallback)new SessionCallback<Object>(){

            public Object execute(RedisOperations redisOperations) throws DataAccessException {
                RedisOperations operations = redisOperations;
                operations.multi();
                operations.opsForHash().delete((Object)RedisMessageServiceImpl.this.getQueueMapKey(queueName), new Object[]{messageId});
                operations.opsForList().remove((Object)RedisMessageServiceImpl.this.getQueueKey(queueName, priority), 1L, (Object)messageId);
                operations.opsForList().remove((Object)RedisMessageServiceImpl.this.getHideQueueKey(queueName), 1L, (Object)messageId);
                operations.exec();
                return null;
            }
        });
    }

    @Override
    public void processOvertimeMessage(final String queueName, long delay) {
        this.executor.schedule(new Runnable(){

            @Override
            public void run() {
                long nextDelay = RedisMessageServiceImpl.this.processHeadOvertimeMessage(queueName);
                RedisMessageServiceImpl.this.processOvertimeMessage(queueName, nextDelay);
            }
        }, delay, TimeUnit.SECONDS);
    }

    private long processHeadOvertimeMessage(String queueName) {
        String hideQueueKey = this.getHideQueueKey(queueName);
        String mapKey = this.getQueueMapKey(queueName);
        String messageId = (String)this.redisStringTemplate.opsForList().index((Object)hideQueueKey, 0L);
        if (messageId == null) {
            return 600L;
        }
        String message = (String)this.redisStringTemplate.opsForHash().get((Object)mapKey, (Object)messageId);
        if (message == null) {
            this.logger.warn("message not found, messageId:[{}]", (Object)messageId);
            this.redisStringTemplate.opsForList().remove((Object)hideQueueKey, 1L, (Object)messageId);
            return 0L;
        }
        QueueMessage queueMessage = (QueueMessage)JSON.parseObject((String)message, QueueMessage.class);
        Date overtime = queueMessage.getHoldOvertime();
        if (overtime == null) {
            this.logger.warn("hide message without overtime, messageId:[{}]", (Object)messageId);
            return 2L;
        }
        if (overtime.before(new Date())) {
            if (queueMessage.getHoldTimes() < 3) {
                this.unhideMessage(queueName, queueMessage.getPriority(), messageId);
            } else {
                this.logger.error("message consume over 3 times, discard! message:[{}]", (Object)message);
                this.deleteHideMessage(queueName, messageId);
            }
            return 0L;
        }
        return (overtime.getTime() - System.currentTimeMillis()) / 1000L + 1L;
    }

    @Override
    public void changePriority(final String queueName, final String messageId, Integer priority) throws MessageException {
        if (priority == null) {
            priority = 2;
        } else if (priority > 3) {
            priority = 3;
        } else if (priority < 0) {
            priority = 0;
        }
        final Integer priorityF = priority;
        final String mapKey = this.getQueueMapKey(queueName);
        String message = (String)this.redisStringTemplate.opsForHash().get((Object)mapKey, (Object)messageId);
        if (message == null) {
            throw new MessageException("message [" + messageId + "] not found");
        }
        final QueueMessage queueMessage = (QueueMessage)JSON.parseObject((String)message, QueueMessage.class);
        final Integer oldPriority = queueMessage.getPriority();
        if (priority.equals(oldPriority) || queueMessage.getHoldOvertime() != null) {
            return;
        }
        this.redisStringTemplate.execute((SessionCallback)new SessionCallback<Object>(){

            public Object execute(RedisOperations redisOperations) throws DataAccessException {
                RedisOperations operations = redisOperations;
                operations.multi();
                if (priorityF == 0) {
                    operations.opsForList().rightPush((Object)RedisMessageServiceImpl.this.getQueueKey(queueName, 1), (Object)messageId);
                    queueMessage.setPriority(1);
                } else {
                    operations.opsForList().leftPush((Object)RedisMessageServiceImpl.this.getQueueKey(queueName, priorityF), (Object)messageId);
                    queueMessage.setPriority(priorityF);
                }
                operations.opsForHash().put((Object)mapKey, (Object)messageId, (Object)JSON.toJSONString((Object)queueMessage));
                operations.opsForList().remove((Object)RedisMessageServiceImpl.this.getQueueKey(queueName, oldPriority), 1L, (Object)messageId);
                operations.exec();
                return null;
            }
        });
    }
}

