Nirrek が説明したように、データ ソースをプッシュするために接続する必要がある場合、そのソースのイベント イテレータを作成する必要があります。
上記のメカニズムを再利用可能にすることができることを付け加えたいと思います。そのため、異なるソースごとにイベント イテレータを再作成する必要はありません。
解決策は、およびメソッドを使用して汎用チャネルを作成することです。Generator 内からメソッドを呼び出して、メソッドをデータ ソースのリスナー インターフェイスに接続できます。put
take
take
put
これが可能な実装です。メッセージを待っている人がいない場合、チャネルはメッセージをバッファリングすることに注意してください (たとえば、ジェネレータがリモート呼び出しでビジー状態である場合)。
function createChannel () {
const messageQueue = []
const resolveQueue = []
function put (msg) {
// anyone waiting for a message ?
if (resolveQueue.length) {
// deliver the message to the oldest one waiting (First In First Out)
const nextResolve = resolveQueue.shift()
nextResolve(msg)
} else {
// no one is waiting ? queue the event
messageQueue.push(msg)
}
}
// returns a Promise resolved with the next message
function take () {
// do we have queued messages ?
if (messageQueue.length) {
// deliver the oldest queued message
return Promise.resolve(messageQueue.shift())
} else {
// no queued messages ? queue the taker until a message arrives
return new Promise((resolve) => resolveQueue.push(resolve))
}
}
return {
take,
put
}
}
その後、上記のチャネルは、外部のプッシュ データ ソースをリッスンしたいときにいつでも使用できます。あなたの例では
function createChangeChannel (replication) {
const channel = createChannel()
// every change event will call put on the channel
replication.on('change', channel.put)
return channel
}
function * startReplication (getState) {
// Wait for the configuration to be set. This can happen multiple
// times during the life cycle, for example when the user wants to
// switch database/workspace.
while (yield take(DATABASE_SET_CONFIGURATION)) {
let state = getState()
let wrapper = state.database.wrapper
// Wait for a connection to work.
yield apply(wrapper, wrapper.connect)
// Trigger replication, and keep the promise.
let replication = wrapper.replicate()
if (replication) {
yield call(monitorChangeEvents, createChangeChannel(replication))
}
}
}
function * monitorChangeEvents (channel) {
while (true) {
const info = yield call(channel.take) // Blocks until the promise resolves
yield put(databaseActions.replicationChange(info))
}
}