项目亮点之异步队列

异步队列

点赞,回复评论的时候,表面上是赞数增加了,其实还有很多其他的工作要做。比如,被赞的人收到消息提醒,成就值增加等。一些行为会引起一系列连锁反应。如果在执行点赞功能时立马处理,会影响程序运行效率。而且会造成代码冗余,比如发布新闻,和回复评论都可以使得成就值增加,如果都跟着写在后面的代码里会把成就值增加这段代码写两遍,所以大型服务需要服务化异步化

服务化:某一个单独的业务独立成一个工程,提供接口。不只是service层的一个类。 暴露一些接口,比如数据库服务,如果一个部门要去数据库查询,小公司可能写个SQL语句。对于大公司,需要一个组单独做数据库服务,暴露接口给别的部门用。好处是防止别的部门有数据库权限,数据库单独有一个组维护,出问题找他们运维就好。
异步化:用户点赞,用户首先要知道的是这个赞已经点上了。用户提交Oj,用户要立马知道的是代码有没有通过。而后面应该的东西,比如积分增加了,用户不会有立马想知道的需求,如果间隔几秒钟再更新,用户也不会有很大的意见。
异步化的好处:
一、能够分开紧急动作和非紧急动作。能够先执行对用户最重要的事务,反馈给用户。其他的相应该事务的动作可以在后台延迟执行,

二、增加程序的鲁棒性。举提交在线OJ为例,如果用户提交程序代码后,服务器进行判题,判题通过后执行增加积分,成就值这些动作,如果程序在判题后运行失败,服务器不能立刻返回给用户结果。如果是异步化分开后,起码能保证主业务,即判题结果能及时反馈给用户。积分挂了,只会造成用户看不到最新的积分,这个对用户的影响比较小。如下图所示:

屏幕快照 2019-03-16 下午9.49.31-2825469

实现代码

在项目中异步架构实现结构:

屏幕快照 2019-03-16 下午9.53.13

1.利用JSON进行对象序列化和反序列化

JSON存到Redis中,后面可以知道是以String形式存储的

操作封装在JedisAdapter中

    public String get(String key) {
        Jedis jedis = null;
        try {
            jedis = pool.getResource();
            return getJedis().get(key);
        } catch (Exception e) {
            logger.error("发生异常" + e.getMessage());
            return null;
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }

    public void set(String key, String value) {
        Jedis jedis = null;
        try {
            jedis = pool.getResource();
            jedis.set(key, value);
        } catch (Exception e) {
            logger.error("发生异常" + e.getMessage());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
    public void setObject(String key, Object obj) {
        set(key, JSON.toJSONString(obj));
    }

    public <T> T getObject(String key, Class<T> clazz) {
        String value = get(key);
        if (value != null) {
            return JSON.parseObject(value, clazz);
        }
        return null;
    }

其中将对象序列化过程中:是使用JSON.toJSONString(Object obj)的方法将对象序列化为JSON串

代码如下:

/**
 * 重点:This method serializes the specified object into its equivalent Json representation. Note that this method works fine if the any of the object fields are of generic type,
 * just the object itself should not be of a generic type. If you want to write out the object to a
 * {@link Writer}, use {@link #writeJSONString(Writer, Object, SerializerFeature[])} instead.
 *
 * @param object the object for which json representation is to be created setting for fastjson
 * @return Json representation of {@code object}.
 */
public static String toJSONString(Object object) {
    return toJSONString(object, emptyFilters);
}

知识点补充

==测试中序列化和反序列化过程中用到了redis中String的操作==:

例子:

redis> SET mykey "Hello"
OK
redis> GET mykey
"Hello"
redis> 

2.构建事件模型EventModel

EventModel:代表进入队列的具体活动

public class EventModel {
    //这个触发事件是什么类型
    private EventType type;
    //触发对象的原目标,谁触发的
    private int actorId;
    //触发的对象是什么。是新闻还是点赞类型的事件,是哪个事件
    private int entityId;
    private int entityType;
    //触发对象的拥有者(如果是点赞新闻事件,那就是发这条新闻的UserId)
    private int entityOwnerId;
    //在触发的现场有哪些数据需要保存下来
    private Map<String,String> exts=new HashMap<>();

    public EventModel() {

    }
    public EventModel(EventType type) {
        this.type = type;
    }

    public String getExt(String name) {
        return exts.get(name);
    }

    public EventModel setExt(String name, String value) {
        exts.put(name, value);
        return this;
    }

    public EventType getType() {
        return type;
    }

    public EventModel setType(EventType type) {
        this.type = type;
        return this;
    }

    public int getActorId() {
        return actorId;
    }

    public EventModel setActorId(int actorId) {
        this.actorId = actorId;
        return this;
    }

    public int getEntityId() {
        return entityId;
    }

    public EventModel setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public EventModel setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityOwnerId() {
        return entityOwnerId;
    }

    public EventModel setEntityOwnerId(int entityOwnerId) {
        this.entityOwnerId = entityOwnerId;
        return this;
    }
}

事件的类型:有点赞、评论、登录、发邮件等类型

public enum EventType {
    LIKE(0),
    COMMENT(1),
    LOGIN(2),
    MAIL(3);
    private int value;
    EventType(int value) {
        this.value = value;
    }
    public int getValue() {
        return value;
    }
}

3.活动的生产者和消费者

生产者(EventProducer)

这个类的功能其实就是把事件模型(代表着一次事件)存储到redis的列表(List)中,其中key是规定标记需要异步化处理的事件的统一名称(在该项目中是”EVENT”),value是EventModel被序列化后的JSON串。

简单的说,这个List存放EventModel,以后消费者就在这个List中去event去执行。

@Service
public class EventProducer {

    @Autowired
    JedisAdapter jedisAdapter;

    public boolean fireEvent(EventModel eventModel) {
        try {
            String json = JSONObject.toJSONString(eventModel);
            String key = RedisKeyUtil.getEventQueueKey();
            jedisAdapter.lpush(key, json);
            return true;
        } catch (Exception e) {
            return false;
        }
    }
}

知识点补充

List的Lpush操作:

将所有指定的值插入到存于 key 的列表的头部。如果 key 不存在,那么在进行 push 操作前会创建一个空列表。 如果 key 对应的值不是一个 list 的话,那么会返回一个错误。

可以使用一个命令把多个元素 push 进入列表,只需在命令末尾加上多个指定的参数。元素是从最左端的到最右端的、一个接一个被插入到 list 的头部。 所以对于这个命令例子 LPUSH mylist a b c,返回的列表是 c 为第一个元素, b 为第二个元素, a 为第三个元素。


消费者(EventConsumer)

为了拓展性,首先定义一个EventHandler接口:

接口中

public interface EventHandler {
    //执行事件
    void doHandle(EventModel eventModel);
    //handler执行哪些EventModel(如点赞呀,评论呀,登录异常这些。。。)
    List<EventType> getSupportEventTypes();
}

具体的执行类:

目前写了2中执行类:LikeHandlerLoginExceptionHandler

LikeHandler:执行点赞的异步操作

@Component
public class LikeHandler implements EventHandler {

    @Autowired
    MessageService messageService;
    @Autowired
    UserService userService;

    @Override
    public void doHandle(EventModel model) {
        Message message = new Message();
        User user = userService.getUser(model.getActorId());
        message.setToId(model.getEntityOwnerId());
        message.setContent("用户" + user.getName() +
                " 赞了你的资讯,http://127.0.0.1:8080/news/"
                + String.valueOf(model.getEntityId()));
        // SYSTEM ACCOUNT
        message.setFromId(3);
        message.setCreatedDate(new Date());
        messageService.addMessage(message);
    }

    @Override
    public List<EventType> getSupportEventTypes() {
        return Arrays.asList(EventType.LIKE);
    }
}

点赞的异步操作实际就是谁给你点赞了,系统会通知你新闻被点赞了

LoginExceptionHandler

@Component
public class LoginExceptionHandler implements EventHandler {
    @Autowired
    MessageService messageService;
    @Autowired
    MailSender mailSender;
    @Override
    public void doHandle(EventModel eventModel) {
        Message message=new Message();
        message.setCreatedDate(new Date());
        //加入系统默认ID是3
        message.setFromId(3);
        message.setToId(eventModel.getActorId());
        message.setContent("你上次的登陆IP异常");
        messageService.addMessage(message);

        Map<String, Object> map = new HashMap();
        map.put("username", eventModel.getExt("username"));
        mailSender.sendWithHTMLTemplate(eventModel.getExt("email"),"登录异常",
                "mails/welcome.html",map);
    }

    @Override
    public List<EventType> getSupportEventTypes() {
        return Arrays.asList(EventType.LOGIN);
    }
}

登录异常的异步操作就是给你发个说明异常的email

Consumer

@Service
public class EventConsumer implements InitializingBean, ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
    private Map<EventType, List<EventHandler>> config = new HashMap<>();
    private ApplicationContext applicationContext;
    @Autowired
    private JedisAdapter jedisAdapter;

    @Override
    public void afterPropertiesSet() throws Exception {
      //这个很关键:根据传入的上下文(applicationContext),查找EventHandler.class类
        Map<String, EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class);
        if (beans != null) {
            for (Map.Entry<String, EventHandler> entry : beans.entrySet()) {
              //得到每个EventHandler实现类支持的事件类型
                List<EventType> eventTypes = entry.getValue().getSupportEventTypes();
                for (EventType type : eventTypes) {
                  //每种事件类型对应一个执行该类型事件的执行者列表
                    if (!config.containsKey(type)) {
                        config.put(type, new ArrayList<EventHandler>());
                    }
                    // 注册每个事件的处理函数
                    config.get(type).add(entry.getValue());
                }
            }
        }

        // 启动线程去消费事件
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                // 从队列一直消费
                while (true) {
                    String key = RedisKeyUtil.getEventQueueKey();
                    //brpop返回的是key和value
                    List<String> messages = jedisAdapter.brpop(0, key);
                    // 第一个元素是队列名字
                    for (String message : messages) {
                      //因为brpop操作返回一个双元素,第一个是key,第二个是value
                        if (message.equals(key)) {
                            continue;
                        }

                        EventModel eventModel = JSON.parseObject(message, EventModel.class);

                        if (!config.containsKey(eventModel.getType())) {
                            logger.error("不能识别的事件");
                            continue;
                        }
                      // 找到这个事件的处理handler列表
                        for (EventHandler handler : config.get(eventModel.getType())) {
                            handler.doHandle(eventModel);
                        }
                    }
                }
            }
        });
        thread.start();
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

知识补充:

1.补充2个接口:InitializingBeanApplicationContextAware

InitializingBean接口:为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会执行该方法。

ApplicationContextAware的最本质的应用就是:对当前bean传入对应的Spring上下文。

appContext.getBeansOfType()方法:

<T> Map<String, T> getBeansOfType(Class<T> var1) throws BeansException;

1.该方法返回一个map类型的实例,map中的key为bean的名字,key对应的内容bean的实例。

2.该方法有两种类型的重载:
getBeansOfType(Class),获取某一类的所有的bean。

2.BRPOP

是一个阻塞的列表弹出原语。 它是 RPOP 的阻塞版本,因为这个命令会在给定list无法弹出任何元素的时候阻塞连接。 该命令会按照给出的 key 顺序查看 list,并在找到的第一个非空 list 的尾部弹出一个元素。

​ 返回值:

​ 当没有元素可以被弹出时返回一个 nil 的多批量值,并且 timeout 过期。

​ 当有元素弹出时会返回一个双元素的多批量值,其中第一个元素是弹出元素的 key,第二个元素是 value

​ 例子:

redis> DEL list1 list2
(integer) 0
redis> RPUSH list1 a b c
(integer) 3
redis> BRPOP list1 list2 0
1) "list1"
2) "c
redis 127.0.0.1:6379> BRPOP list1 100

在以上实例中,操作会被阻塞,如果指定的列表 key list1 存在数据则会返回第一个元素,否则在等待100秒后会返回 nil 。

开启异步操作

拿点赞操作举例:

@RequestMapping(path = {"/like"}, method = {RequestMethod.GET, RequestMethod.POST})
@ResponseBody
public String like(@Param("newId") int newsId) {
    long likeCount = likeService.like(hostHolder.getUser().getId(), EntityType.ENTITY_NEWS, newsId);
    // 更新喜欢数
    News news = newsService.getById(newsId);
    newsService.updateLikeCount(newsId, (int) likeCount);
    //这行
    eventProducer.fireEvent(new EventModel(EventType.LIKE)
            .setEntityOwnerId(news.getUserId())
            .setActorId(hostHolder.getUser().getId()).setEntityId(newsId));
    return ToutiaoUtil.getJSONString(0, String.valueOf(likeCount));
}

上述代码,我们调用eventProducer.fireEvent()方法,将点赞操作封装成一个EventModel对象,并序列化保存到redis的列表(EVENT)中,然后服务端会调用EventConsumer对象,”消费”事件列表中EVENT。


   转载规则


《项目亮点之异步队列》 xuxinghua 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录
I I