zhangjian
2023-05-30 dabbcc356af21f9f2f88ac69ff07994e6e32e4fc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import { Subject } from '../Subject';
import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { refCount as higherOrderRefCount } from '../operators/refCount';
import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
import { hasLift } from '../util/lift';
 
/**
 * @class ConnectableObservable<T>
 * @deprecated Will be removed in v8. Use {@link connectable} to create a connectable observable.
 * If you are using the `refCount` method of `ConnectableObservable`, use the {@link share} operator
 * instead.
 * Details: https://rxjs.dev/deprecations/multicasting
 */
export class ConnectableObservable<T> extends Observable<T> {
  protected _subject: Subject<T> | null = null;
  protected _refCount: number = 0;
  protected _connection: Subscription | null = null;
 
  /**
   * @param source The source observable
   * @param subjectFactory The factory that creates the subject used internally.
   * @deprecated Will be removed in v8. Use {@link connectable} to create a connectable observable.
   * `new ConnectableObservable(source, factory)` is equivalent to
   * `connectable(source, { connector: factory })`.
   * When the `refCount()` method is needed, the {@link share} operator should be used instead:
   * `new ConnectableObservable(source, factory).refCount()` is equivalent to
   * `source.pipe(share({ connector: factory }))`.
   * Details: https://rxjs.dev/deprecations/multicasting
   */
  constructor(public source: Observable<T>, protected subjectFactory: () => Subject<T>) {
    super();
    // If we have lift, monkey patch that here. This is done so custom observable
    // types will compose through multicast. Otherwise the resulting observable would
    // simply be an instance of `ConnectableObservable`.
    if (hasLift(source)) {
      this.lift = source.lift;
    }
  }
 
  /** @internal */
  protected _subscribe(subscriber: Subscriber<T>) {
    return this.getSubject().subscribe(subscriber);
  }
 
  protected getSubject(): Subject<T> {
    const subject = this._subject;
    if (!subject || subject.isStopped) {
      this._subject = this.subjectFactory();
    }
    return this._subject!;
  }
 
  protected _teardown() {
    this._refCount = 0;
    const { _connection } = this;
    this._subject = this._connection = null;
    _connection?.unsubscribe();
  }
 
  /**
   * @deprecated {@link ConnectableObservable} will be removed in v8. Use {@link connectable} instead.
   * Details: https://rxjs.dev/deprecations/multicasting
   */
  connect(): Subscription {
    let connection = this._connection;
    if (!connection) {
      connection = this._connection = new Subscription();
      const subject = this.getSubject();
      connection.add(
        this.source.subscribe(
          createOperatorSubscriber(
            subject as any,
            undefined,
            () => {
              this._teardown();
              subject.complete();
            },
            (err) => {
              this._teardown();
              subject.error(err);
            },
            () => this._teardown()
          )
        )
      );
 
      if (connection.closed) {
        this._connection = null;
        connection = Subscription.EMPTY;
      }
    }
    return connection;
  }
 
  /**
   * @deprecated {@link ConnectableObservable} will be removed in v8. Use the {@link share} operator instead.
   * Details: https://rxjs.dev/deprecations/multicasting
   */
  refCount(): Observable<T> {
    return higherOrderRefCount()(this) as Observable<T>;
  }
}