package org.yzh.protocol.codec; import io.github.yezhihao.protostar.SchemaManager; import io.netty.buffer.ByteBuf; import lombok.extern.slf4j.Slf4j; import org.yzh.protocol.basics.JTMessage; import java.util.Map; import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; /** * 分包消息管理 * @author yezhihao * https://gitee.com/yezhihao/jt808-server */ @Slf4j public class MultiPacketDecoder extends JTMessageDecoder { private final Map multiPacketsMap; private final MultiPacketListener multiPacketListener; public MultiPacketDecoder(String... basePackages) { this(new SchemaManager(basePackages)); } public MultiPacketDecoder(SchemaManager schemaManager) { this(schemaManager, null); } public MultiPacketDecoder(SchemaManager schemaManager, MultiPacketListener multiPacketListener) { super(schemaManager); if (multiPacketListener == null) { this.multiPacketsMap = new WeakHashMap<>(); this.multiPacketListener = null; } else { this.multiPacketsMap = new ConcurrentHashMap<>(); this.multiPacketListener = multiPacketListener; startListener(); } } @Override protected ByteBuf[] addAndGet(JTMessage message, ByteBuf packetData) { String clientId = message.getClientId(); int messageId = message.getMessageId(); int packageTotal = message.getPackageTotal(); int packetNo = message.getPackageNo(); String key = new StringBuilder(21).append(clientId).append('/').append(messageId).append('/').append(packageTotal).toString(); MultiPacket multiPacket = multiPacketsMap.get(key); if (multiPacket == null) multiPacketsMap.put(key, multiPacket = new MultiPacket(message)); if (packetNo == 1) multiPacket.setSerialNo(message.getSerialNo()); ByteBuf[] packages = multiPacket.addAndGet(packetNo, packetData); log.debug("<<<<<分包消息{}", multiPacket); if (packages == null) return null; multiPacketsMap.remove(key); return packages; } private void startListener() { Thread thread = new Thread(() -> { long timeout = multiPacketListener.timeout; for (; ; ) { long nextDelay = timeout; long now = System.currentTimeMillis(); for (Map.Entry entry : multiPacketsMap.entrySet()) { MultiPacket packet = entry.getValue(); long time = timeout - (now - packet.getLastAccessedTime()); if (time <= 0) { if (!multiPacketListener.receiveTimeout(packet)) { log.warn("<<<<<分包接收超时{}", packet); multiPacketsMap.remove(entry.getKey()); packet.release(); } } else { nextDelay = Math.min(time, nextDelay); } } try { Thread.sleep(nextDelay); } catch (InterruptedException e) { log.error("MultiPacketListener", e); } } }); thread.setName("MultiPacketListener"); thread.setPriority(Thread.MIN_PRIORITY); thread.setDaemon(true); thread.start(); } }