异步队列
点赞,回复评论的时候,表面上是赞数增加了,其实还有很多其他的工作要做。比如,被赞的人收到消息提醒,成就值增加等。一些行为会引起一系列连锁反应。如果在执行点赞功能时立马处理,会影响程序运行效率。而且会造成代码冗余,比如发布新闻,和回复评论都可以使得成就值增加,如果都跟着写在后面的代码里会把成就值增加这段代码写两遍,所以大型服务需要服务化和异步化。
服务化:某一个单独的业务独立成一个工程,提供接口。不只是service层的一个类。 暴露一些接口,比如数据库服务,如果一个部门要去数据库查询,小公司可能写个SQL语句。对于大公司,需要一个组单独做数据库服务,暴露接口给别的部门用。好处是防止别的部门有数据库权限,数据库单独有一个组维护,出问题找他们运维就好。
异步化:用户点赞,用户首先要知道的是这个赞已经点上了。用户提交Oj,用户要立马知道的是代码有没有通过。而后面应该的东西,比如积分增加了,用户不会有立马想知道的需求,如果间隔几秒钟再更新,用户也不会有很大的意见。
异步化的好处:
一、能够分开紧急动作和非紧急动作。能够先执行对用户最重要的事务,反馈给用户。其他的相应该事务的动作可以在后台延迟执行,
二、增加程序的鲁棒性。举提交在线OJ为例,如果用户提交程序代码后,服务器进行判题,判题通过后执行增加积分,成就值这些动作,如果程序在判题后运行失败,服务器不能立刻返回给用户结果。如果是异步化分开后,起码能保证主业务,即判题结果能及时反馈给用户。积分挂了,只会造成用户看不到最新的积分,这个对用户的影响比较小。如下图所示:
实现代码
在项目中异步架构实现结构:
1.利用JSON进行对象序列化和反序列化
JSON存到Redis中,后面可以知道是以String形式存储的
操作封装在JedisAdapter中:
public String get(String key) {
Jedis jedis = null;
try {
jedis = pool.getResource();
return getJedis().get(key);
} catch (Exception e) {
logger.error("发生异常" + e.getMessage());
return null;
} finally {
if (jedis != null) {
jedis.close();
}
}
}
public void set(String key, String value) {
Jedis jedis = null;
try {
jedis = pool.getResource();
jedis.set(key, value);
} catch (Exception e) {
logger.error("发生异常" + e.getMessage());
} finally {
if (jedis != null) {
jedis.close();
}
}
}
public void setObject(String key, Object obj) {
set(key, JSON.toJSONString(obj));
}
public <T> T getObject(String key, Class<T> clazz) {
String value = get(key);
if (value != null) {
return JSON.parseObject(value, clazz);
}
return null;
}
其中将对象序列化过程中:是使用JSON.toJSONString(Object obj)的方法将对象序列化为JSON串
代码如下:
/**
* 重点:This method serializes the specified object into its equivalent Json representation. Note that this method works fine if the any of the object fields are of generic type,
* just the object itself should not be of a generic type. If you want to write out the object to a
* {@link Writer}, use {@link #writeJSONString(Writer, Object, SerializerFeature[])} instead.
*
* @param object the object for which json representation is to be created setting for fastjson
* @return Json representation of {@code object}.
*/
public static String toJSONString(Object object) {
return toJSONString(object, emptyFilters);
}
知识点补充
==测试中序列化和反序列化过程中用到了redis中String的操作==:
例子:
redis> SET mykey "Hello"
OK
redis> GET mykey
"Hello"
redis>
2.构建事件模型EventModel
EventModel:代表进入队列的具体活动
public class EventModel {
//这个触发事件是什么类型
private EventType type;
//触发对象的原目标,谁触发的
private int actorId;
//触发的对象是什么。是新闻还是点赞类型的事件,是哪个事件
private int entityId;
private int entityType;
//触发对象的拥有者(如果是点赞新闻事件,那就是发这条新闻的UserId)
private int entityOwnerId;
//在触发的现场有哪些数据需要保存下来
private Map<String,String> exts=new HashMap<>();
public EventModel() {
}
public EventModel(EventType type) {
this.type = type;
}
public String getExt(String name) {
return exts.get(name);
}
public EventModel setExt(String name, String value) {
exts.put(name, value);
return this;
}
public EventType getType() {
return type;
}
public EventModel setType(EventType type) {
this.type = type;
return this;
}
public int getActorId() {
return actorId;
}
public EventModel setActorId(int actorId) {
this.actorId = actorId;
return this;
}
public int getEntityId() {
return entityId;
}
public EventModel setEntityId(int entityId) {
this.entityId = entityId;
return this;
}
public int getEntityType() {
return entityType;
}
public EventModel setEntityType(int entityType) {
this.entityType = entityType;
return this;
}
public int getEntityOwnerId() {
return entityOwnerId;
}
public EventModel setEntityOwnerId(int entityOwnerId) {
this.entityOwnerId = entityOwnerId;
return this;
}
}
事件的类型:有点赞、评论、登录、发邮件等类型
public enum EventType {
LIKE(0),
COMMENT(1),
LOGIN(2),
MAIL(3);
private int value;
EventType(int value) {
this.value = value;
}
public int getValue() {
return value;
}
}
3.活动的生产者和消费者
生产者(EventProducer)
这个类的功能其实就是把事件模型(代表着一次事件)存储到redis的列表(List)中,其中key是规定标记需要异步化处理的事件的统一名称(在该项目中是”EVENT”),value是EventModel被序列化后的JSON串。
简单的说,这个List存放EventModel,以后消费者就在这个List中去event去执行。
@Service
public class EventProducer {
@Autowired
JedisAdapter jedisAdapter;
public boolean fireEvent(EventModel eventModel) {
try {
String json = JSONObject.toJSONString(eventModel);
String key = RedisKeyUtil.getEventQueueKey();
jedisAdapter.lpush(key, json);
return true;
} catch (Exception e) {
return false;
}
}
}
知识点补充:
List的Lpush操作:
将所有指定的值插入到存于 key 的列表的头部。如果 key 不存在,那么在进行 push 操作前会创建一个空列表。 如果 key 对应的值不是一个 list 的话,那么会返回一个错误。
可以使用一个命令把多个元素 push 进入列表,只需在命令末尾加上多个指定的参数。元素是从最左端的到最右端的、一个接一个被插入到 list 的头部。 所以对于这个命令例子 LPUSH mylist a b c
,返回的列表是 c 为第一个元素, b 为第二个元素, a 为第三个元素。
消费者(EventConsumer)
为了拓展性,首先定义一个EventHandler接口:
接口中
public interface EventHandler {
//执行事件
void doHandle(EventModel eventModel);
//handler执行哪些EventModel(如点赞呀,评论呀,登录异常这些。。。)
List<EventType> getSupportEventTypes();
}
具体的执行类:
目前写了2中执行类:LikeHandler和LoginExceptionHandler
LikeHandler:执行点赞的异步操作
@Component
public class LikeHandler implements EventHandler {
@Autowired
MessageService messageService;
@Autowired
UserService userService;
@Override
public void doHandle(EventModel model) {
Message message = new Message();
User user = userService.getUser(model.getActorId());
message.setToId(model.getEntityOwnerId());
message.setContent("用户" + user.getName() +
" 赞了你的资讯,http://127.0.0.1:8080/news/"
+ String.valueOf(model.getEntityId()));
// SYSTEM ACCOUNT
message.setFromId(3);
message.setCreatedDate(new Date());
messageService.addMessage(message);
}
@Override
public List<EventType> getSupportEventTypes() {
return Arrays.asList(EventType.LIKE);
}
}
点赞的异步操作实际就是谁给你点赞了,系统会通知你新闻被点赞了
LoginExceptionHandler:
@Component
public class LoginExceptionHandler implements EventHandler {
@Autowired
MessageService messageService;
@Autowired
MailSender mailSender;
@Override
public void doHandle(EventModel eventModel) {
Message message=new Message();
message.setCreatedDate(new Date());
//加入系统默认ID是3
message.setFromId(3);
message.setToId(eventModel.getActorId());
message.setContent("你上次的登陆IP异常");
messageService.addMessage(message);
Map<String, Object> map = new HashMap();
map.put("username", eventModel.getExt("username"));
mailSender.sendWithHTMLTemplate(eventModel.getExt("email"),"登录异常",
"mails/welcome.html",map);
}
@Override
public List<EventType> getSupportEventTypes() {
return Arrays.asList(EventType.LOGIN);
}
}
登录异常的异步操作就是给你发个说明异常的email
Consumer
@Service
public class EventConsumer implements InitializingBean, ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
private Map<EventType, List<EventHandler>> config = new HashMap<>();
private ApplicationContext applicationContext;
@Autowired
private JedisAdapter jedisAdapter;
@Override
public void afterPropertiesSet() throws Exception {
//这个很关键:根据传入的上下文(applicationContext),查找EventHandler.class类
Map<String, EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class);
if (beans != null) {
for (Map.Entry<String, EventHandler> entry : beans.entrySet()) {
//得到每个EventHandler实现类支持的事件类型
List<EventType> eventTypes = entry.getValue().getSupportEventTypes();
for (EventType type : eventTypes) {
//每种事件类型对应一个执行该类型事件的执行者列表
if (!config.containsKey(type)) {
config.put(type, new ArrayList<EventHandler>());
}
// 注册每个事件的处理函数
config.get(type).add(entry.getValue());
}
}
}
// 启动线程去消费事件
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
// 从队列一直消费
while (true) {
String key = RedisKeyUtil.getEventQueueKey();
//brpop返回的是key和value
List<String> messages = jedisAdapter.brpop(0, key);
// 第一个元素是队列名字
for (String message : messages) {
//因为brpop操作返回一个双元素,第一个是key,第二个是value
if (message.equals(key)) {
continue;
}
EventModel eventModel = JSON.parseObject(message, EventModel.class);
if (!config.containsKey(eventModel.getType())) {
logger.error("不能识别的事件");
continue;
}
// 找到这个事件的处理handler列表
for (EventHandler handler : config.get(eventModel.getType())) {
handler.doHandle(eventModel);
}
}
}
}
});
thread.start();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
知识补充:
1.补充2个接口:InitializingBean和ApplicationContextAware
InitializingBean接口:为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会执行该方法。
ApplicationContextAware的最本质的应用就是:对当前bean传入对应的Spring上下文。
appContext.getBeansOfType()方法:
<T> Map<String, T> getBeansOfType(Class<T> var1) throws BeansException;
1.该方法返回一个map类型的实例,map中的key为bean的名字,key对应的内容bean的实例。
2.该方法有两种类型的重载:
getBeansOfType(Class),获取某一类的所有的bean。
2.BRPOP
是一个阻塞的列表弹出原语。 它是 RPOP 的阻塞版本,因为这个命令会在给定list无法弹出任何元素的时候阻塞连接。 该命令会按照给出的 key 顺序查看 list,并在找到的第一个非空 list 的尾部弹出一个元素。
返回值:
当没有元素可以被弹出时返回一个 nil
的多批量值,并且 timeout 过期。
当有元素弹出时会返回一个双元素的多批量值,其中第一个元素是弹出元素的 key
,第二个元素是 value
。
例子:
redis> DEL list1 list2
(integer) 0
redis> RPUSH list1 a b c
(integer) 3
redis> BRPOP list1 list2 0
1) "list1"
2) "c
redis 127.0.0.1:6379> BRPOP list1 100
在以上实例中,操作会被阻塞,如果指定的列表 key list1 存在数据则会返回第一个元素,否则在等待100秒后会返回 nil 。
开启异步操作
拿点赞操作举例:
@RequestMapping(path = {"/like"}, method = {RequestMethod.GET, RequestMethod.POST})
@ResponseBody
public String like(@Param("newId") int newsId) {
long likeCount = likeService.like(hostHolder.getUser().getId(), EntityType.ENTITY_NEWS, newsId);
// 更新喜欢数
News news = newsService.getById(newsId);
newsService.updateLikeCount(newsId, (int) likeCount);
//这行
eventProducer.fireEvent(new EventModel(EventType.LIKE)
.setEntityOwnerId(news.getUserId())
.setActorId(hostHolder.getUser().getId()).setEntityId(newsId));
return ToutiaoUtil.getJSONString(0, String.valueOf(likeCount));
}
上述代码,我们调用eventProducer.fireEvent()方法,将点赞操作封装成一个EventModel对象,并序列化保存到redis的列表(EVENT)中,然后服务端会调用EventConsumer对象,”消费”事件列表中EVENT。