0

を使用してダウンストリームのメイン スレッドに変更しようとしていますが、または.receive(on: DispatchQueue.main)を使用すると入力を受け取りません。スレッドを変更しなければ、適切な入力を受け取ります。.subscribe(:).sink(receiveValue:)

出版社

extension URLSessionWebSocketTask {
  struct ReceivePublisher: Publisher {
    typealias Output = Message
    typealias Failure = Error

    let task: URLSessionWebSocketTask

    func receive<S>(subscriber: S) where S: Subscriber, Output == S.Input, Failure == S.Failure {
      task.receive { result in
        switch result {
        case .success(let message): _ = subscriber.receive(message)
        case .failure(let error): subscriber.receive(completion: .failure(error))
        }
      }
    }
  }
}

extension URLSessionWebSocketTask {
  func receivePublisher() -> ReceivePublisher {
    ReceivePublisher(task: self)
  }
}

加入者

extension ViewModel: Subscriber {
  typealias Input = URLSessionWebSocketTask.Message
  typealias Failure = Error

  func receive(subscription: Subscription) {}

  func receive(_ input: URLSessionWebSocketTask.Message) -> Subscribers.Demand {
    // Handle input here.
    // When using `.receive(on:)` this method is not called when should be.
    return .unlimited
  }

  func receive(completion: Subscribers.Completion<Error>) {}
}

申し込む

socketTask.receivePublisher()
      .receive(on: DispatchQueue.main)
      .subscribe(viewModel)
socketTask.resume()
4

1 に答える 1