を使用してダウンストリームのメイン スレッドに変更しようとしていますが、または.receive(on: DispatchQueue.main)
を使用すると入力を受け取りません。スレッドを変更しなければ、適切な入力を受け取ります。.subscribe(:)
.sink(receiveValue:)
出版社
extension URLSessionWebSocketTask {
struct ReceivePublisher: Publisher {
typealias Output = Message
typealias Failure = Error
let task: URLSessionWebSocketTask
func receive<S>(subscriber: S) where S: Subscriber, Output == S.Input, Failure == S.Failure {
task.receive { result in
switch result {
case .success(let message): _ = subscriber.receive(message)
case .failure(let error): subscriber.receive(completion: .failure(error))
}
}
}
}
}
extension URLSessionWebSocketTask {
func receivePublisher() -> ReceivePublisher {
ReceivePublisher(task: self)
}
}
加入者
extension ViewModel: Subscriber {
typealias Input = URLSessionWebSocketTask.Message
typealias Failure = Error
func receive(subscription: Subscription) {}
func receive(_ input: URLSessionWebSocketTask.Message) -> Subscribers.Demand {
// Handle input here.
// When using `.receive(on:)` this method is not called when should be.
return .unlimited
}
func receive(completion: Subscribers.Completion<Error>) {}
}
申し込む
socketTask.receivePublisher()
.receive(on: DispatchQueue.main)
.subscribe(viewModel)
socketTask.resume()