SpringBoot开发案例之整合mail队列进阶篇

redis-queue.png

前情提要

上一篇文章,我们为了解决实际场景中遇到的问题,使得其更像一个安全高效的邮件服务平台,我们引入了LinkedBlockingQueue队列对邮件发送进行流量削锋、间隔发送以及重复内容检测。

当然,文章末尾也就此方案提出了几点疑问,就比如邮件服务挂了,队列还没消费完怎么办?

怎么办?怎么办?还能怎么办,等着被老板扣工资吧!!!

mail-sai.jpg

有没有一种想屎的感觉的?

解决方案

由于LinkedBlockingQueue是进程内的队列,如果容器无情的挂掉以后,随之队列中的内容也便荡然无存。

其实你也可以不用去屎,山人自有妙计。

这里给大家介绍一款进程外的队列实现,redis,对没错就是有些人熟悉 有些人陌生的 NoSql数据库。

代码案例

pom.xml 引入以下依赖:

<!-- spring-boot-starter-redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-redis</artifactId>
    <version>1.4.7.RELEASE</version>
</dependency>

定义接口(部分代码)

     public void sendRedisQueue(Email mail) throws Exception;

定义实现(部分代码)

@Override
    public void sendRedisQueue(Email mail) throws Exception {
        redisTemplate.convertAndSend("mail",mail);
    }

重写CachingConfigurerSupport

import java.lang.reflect.Method;

import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

import com.fasterxml.jackson.databind.ObjectMapper;

@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {
    /**
     * 自定义key(消息队列 暂时用不到 自行忽略)
     * 此方法将会根据类名+方法名+所有参数的值生成唯一的一个key,即使@Cacheable中的value属性一样,key也会不一样。
     * @Author  科帮网
     * @return 
     * @Date    2017年8月13日
     * 更新日志
     * 2017年8月13日  科帮网 首次创建
     *
     */
    @Bean
    public KeyGenerator keyGenerator() {
        return new KeyGenerator() {
            @Override
            public Object generate(Object target, Method method,
                    Object... params) {
                StringBuilder sb = new StringBuilder();
                sb.append(target.getClass().getName());
                sb.append(method.getName());
                for (Object obj : params) {
                    sb.append(obj.toString());
                }
                return sb.toString();
            }
        };
    }
    /**
     * 缓存管理器
     * @Author  科帮网
     * @param redisTemplate
     * @return  CacheManager
     * @Date    2017年8月13日
     * 更新日志
     * 2017年8月13日  科帮网 首次创建
     */
    @SuppressWarnings("rawtypes")
    @Bean
    public CacheManager cacheManager(RedisTemplate redisTemplate) {
        return new RedisCacheManager(redisTemplate);
    }
    /**
     * 序列化Java对象
     * @Author  科帮网
     * @param factory
     * @return  RedisTemplate<String,String>
     * @Date    2017年8月13日
     * 更新日志
     * 2017年8月13日  科帮网 首次创建
     *
     */
    @Bean
    public RedisTemplate<String, String> redisTemplate(
            RedisConnectionFactory factory) {
        StringRedisTemplate template = new StringRedisTemplate(factory);
        setSerializer(template); //使用Jackson序列化
        template.afterPropertiesSet();
        return template;
    }
    @SuppressWarnings({ "rawtypes", "unchecked" })
    private void setSerializer(StringRedisTemplate template) {
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(
                Object.class);
        ObjectMapper om = new ObjectMapper();
        //om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        //om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setValueSerializer(jackson2JsonRedisSerializer);
    }
}

配置RedisListener监听

import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
/**
 * 注意 扫描监听 否则无法接收消息
 * 创建者 科帮网
 * 创建时间    2017年8月13日
 *
 */
@Component
public class RedisListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);
    @Bean
    RedisMessageListenerContainer container(
            RedisConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {
        LOGGER.info("启动监听"); 
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("mail"));
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    Receiver receiver(CountDownLatch latch) {
        return new Receiver(latch);
    }
    
    @Bean
    CountDownLatch latch() {
        return new CountDownLatch(1);
    }

    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
}

定义Receiver接收着

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.itstyle.mail.model.Email;
import com.itstyle.mail.service.IMailService;

public class Receiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
    @Autowired
    private IMailService mailService;
    private CountDownLatch latch;

    @Autowired
    public Receiver(CountDownLatch latch) {
        this.latch = latch;
    }

    public void receiveMessage(String message) {
        LOGGER.info("接收email消息 <{}>",message);
        if(message == null){
            LOGGER.info("接收email消息 <" + null + ">");
        }else {
            ObjectMapper mapper = new ObjectMapper();  
            try {
                Email email = mapper.readValue(message, Email.class);
                mailService.send(email);
                LOGGER.info("接收email消息内容 <{}>",email.getContent());
            } catch (JsonParseException e) {
                e.printStackTrace();
            } catch (JsonMappingException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        latch.countDown();
    }
}

SpringbootMailApplication测试

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ImportResource;

import com.itstyle.mail.model.Email;
import com.itstyle.mail.service.IMailService;

@SpringBootApplication
@ComponentScan(basePackages={"com.itstyle.mail"})
@ImportResource({"classpath:spring-context-dubbo.xml","classpath:spring-context-task.xml"})
public class SpringbootMailApplication implements CommandLineRunner {
    @Autowired
    private IMailService mailService;
    public static void main(String[] args) {
        SpringApplication.run(SpringbootMailApplication.class, args);
    }

    @Override
    public void run(String... args) {
        try {
            Email mail = new Email();
            mail.setEmail(new String[]{"345849402@qq.com"});
            mail.setSubject("你个小逗比");
            mail.setContent("科帮网欢迎您");
            mail.setTemplate("welcome");
            for(int i=0;i<1000;i++){
                mailService.sendRedisQueue(mail);
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

详细代码案例:码云

qrcode_for_gh_bf7a27ade681_258.jpg

作者: 小柒

出处: https://blog.52itstyle.com

分享是快乐的,也见证了个人成长历程,文章大多都是工作经验总结以及平时学习积累,基于自身认知不足之处在所难免,也请大家指正,共同进步。

本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出, 如有问题, 可邮件(345849402@qq.com)咨询。