游戏内消息处理

时间:2021-09-25 05:34:31

发送消息有4种渠道
1. io
2. bus
3. stage
4. public
前面已经讲过了关于EasyManager的初始化。

  public class EasyManager implements ApplicationContextAware {
//实现ApplicationContextAware,能够得到applicationContext
private String scanPackage;
private ApplicationContext applicationContext;
private Map<String, EasyResolver> resolvers = new HashMap<>();

public void init() {
//开始扫描
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false);
provider.addIncludeFilter(new AnnotationTypeFilter(EasyWorker.class));
Set<BeanDefinition> candidates = provider.findCandidateComponents(scanPackage);
for (BeanDefinition candidate : candidates) {
try {
String className = candidate.getBeanClassName();
Class<?> cls = Class.forName(className);
EasyWorker easyWorker = cls.getAnnotation(EasyWorker.class);
if (null != easyWorker) {
try {
Object target = load ? applicationContext.getBean(cls) : null;
Method[] methods = cls.getDeclaredMethods();
for (Method m : methods) {
EasyMapping commandMapping = m.getAnnotation(EasyMapping.class);
if (null != commandMapping) {
resolvers.put(commandMapping.mapping(), new EasyResolver(easyWorker.group(), easyWorker.module(), commandMapping.mapping(), m, target));
cmdList.add(commandMapping.mapping());
}
}
} catch (Exception e) {
throw new ServiceException("error in analyzeClass", e);
}
}
} catch (ClassNotFoundException e) {
}
}
}
//执行的
public void execute(String command, Message message) {
long start = System.nanoTime();

EasyResolver resolver = getResolver(command);
if (null != resolver) {
resolver.execute(message); //method.invoke(target,msg);
}
}

public EasyResolver getResolver(String command) {
return resolvers.get(command);
}

public List<String> getCmdList() {
return cmdList;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}

## 先来讲解下stage的消息处理 ##
游戏内消息处理
## BalanceBusinessExector的调用流程图##
游戏内消息处理

下面来重点看下BalanceBusinessExector
在使用分组执行器之前,会先创建一个RouteInfo(group,info) group只要为了寻找到分组执行器,而info主要是为了寻找到Executor。
  • BalanceBusinessExector

根据配置创建几组分组执行器

 Map<String, ExecutorPoolGroup> groups = new HashMap<>();
public BalanceBusinessExector(long time, Map<String, Integer> groupConfigMap) {
for(String key : groupConfigMap.keyValue()) {
groups.put(key, new ExecutorPoolGroup(groupConfigMap.get(size));
}
}
public void execute(IRunnable runnable, RouteInfo routeInfo) {
groups.get(routeInfo.getGroup()).execute(routeInfo.getInfo(), runnable);
}
  • ExecutorPoolGroup
    根据配置先创建几个执行器,利用guava cache来,根据key来寻找相对应的Executor( ThreadLocalRandom.current() 这个是ThreadLoacl,每个线程一个变量,所以不会引起并发。随机给出一个Executor)
    LoadingCache<String, Executor> routeCacheMap ;
Executor[] executors;
public ExecutorPoolGroup(int size) {
executors = new Executor[size];
this.routeCacheMap = CacheBuilder.newBuilder()
.expireAfterAccess(clean_gap, TimeUnit.MILLISECONDS)
.build(new CacheLoader<String, Executor>() {
public Executor load(String key) throws Exception {
int threadIndex = ThreadLocalRandom.current().nextInt(executors.length);
return executors[threadIndex];
});
}
public void execute(String info, IRunnable runnable) {
this.routeCacheMap.getUnchecked(info).execute(runnable);
}
  • Executor

    将要执行的IRunnable放入到队列中,并有个线程会触发.

  private final IMsgQueue<IRunnable> queue;
private Executor(final String name) {
this.name = name;
this.queue = MsgQueueFactory.getInstance().createQueue(name).start();
}
public void execute(IRunnable command) {
queue.add(command);
}

总结: 初始化时: 先创建几组分组执行器,在执行器中,再创建几个执行器
调用时:eg:发送到场景中,先根据key(stage)来找出分组执行器。再根据stageId来从缓存中找出相应的Executor,然后将IRunnable放入到队列中. 这个就叫做负载均衡
使用这个可以比较平均的找到每个Executor。

下面看着2种队列

  • MsgBlockingQueue
    他其实就用到BlockingQueue队列,他是一种先进先出,当大小为0时,线程一直在等待,一直到有数据时,线程才唤醒.
    主要是启动一个线程,从队列中取出,然后利用java反射调用action.
    private final BlockingQueue<IRunnable> queue = new LinkedBlockingQueue<>();
//加入到队列中
public boolean add(IRunnable e) {
return queue.add(e);
}
// 启动
public void start(final String name) {
Thread t = new Thread(name) {
public void run() {
while (true) {
IRunnable task = queue.take();
task.run(); //method.invoke(target,msg);
}
}
};
t.start();
}

private Disruptor<DisruptorEvent> disruptor;

private RingBuffer<DisruptorEvent> ringBuffer;

private RunnableEventHandler runnableEventHandler = new RunnableEventHandler();

private ExecutorService executor = Executors.newCachedThreadPool(new GameThreadFactory(name));

private final int BUFFER_SIZE = Util.ceilingNextPowerOfTwo(1024*64);

@Override
public boolean add(final IRunnable e) {
long remainingCapacity = ringBuffer.remainingCapacity();
if( remainingCapacity < 10){
execute(e);
return true;
}
//放到指针上,并且发布
long next = ringBuffer.next();
try{
DisruptorEvent runnable = ringBuffer.get(next);
runnable.copy(e);
}finally{
ringBuffer.publish(next);
}
return true;
}
//直接调用
private void execute(IRunnable e) {
executor.submit(new Runnable() {
public void run() {
DisruptorEvent runnable = new DisruptorEvent(name);
runnable.copy(e);
runnableEventHandler.onEvent(runnable, 1, true);
}
});
}
public void start(String name) {
disruptor = new Disruptor<>(new RunnableEventFactory(name), BUFFER_SIZE, executor);
disruptor.handleEventsWith(runnableEventHandler);
ringBuffer = disruptor.start();
}