18045010223
7 天以前 afe371d39a054b2f2a9e5875b945584eec8a8141
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package cn.org.hentai.jtt1078.publisher;
 
import cn.org.hentai.jtt1078.entity.Media;
import cn.org.hentai.jtt1078.subscriber.Subscriber;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * Created by houcheng on 2019-12-11.
 */
public final class PublishManager
{
    static Logger logger = LoggerFactory.getLogger(PublishManager.class);
    ConcurrentHashMap<String, Channel> channels;
 
    private PublishManager()
    {
        channels = new ConcurrentHashMap<String, Channel>();
    }
 
    public Subscriber subscribe(String tag, Media.Type type, ChannelHandlerContext ctx)
    {
        Channel chl = channels.get(tag);
        if (chl == null)
        {
            chl = new Channel(tag);
            channels.put(tag, chl);
        }
        Subscriber subscriber = null;
        if (type.equals(Media.Type.Video)) subscriber = chl.subscribe(ctx);
        else throw new RuntimeException("unknown media type: " + type);
 
        subscriber.setName("subscriber-" + tag + "-" + subscriber.getId());
        subscriber.start();
 
        return subscriber;
    }
 
    public void publishAudio(String tag, int sequence, long timestamp, int payloadType, byte[] data)
    {
        Channel chl = channels.get(tag);
        if (chl != null) chl.writeAudio(timestamp, payloadType, data);
    }
 
    public void publishVideo(String tag, int sequence, long timestamp, int payloadType, byte[] data)
    {
        Channel chl = channels.get(tag);
        if (chl != null) chl.writeVideo(sequence, timestamp, payloadType, data);
    }
 
    public Channel open(String tag)
    {
        Channel chl = channels.get(tag);
        if (chl == null)
        {
            chl = new Channel(tag);
            channels.put(tag, chl);
        }
        if (chl.isPublishing()) throw new RuntimeException("channel already publishing");
        return chl;
    }
 
    public void close(String tag)
    {
        Channel chl = channels.remove(tag);
        if (chl != null) chl.close();
    }
 
    public void unsubscribe(String tag, long watcherId)
    {
        Channel chl = channels.get(tag);
        if (chl != null) chl.unsubscribe(watcherId);
        logger.info("unsubscribe: {} - {}", tag, watcherId);
    }
    static final PublishManager instance = new PublishManager();
    public static void init() { }
 
    public static PublishManager getInstance()
    {
        return instance;
    }
}