ユーザーごとに 1 分間に複数回データを更新する必要がない場合:
- ユーザーに「アラート」を設定し
LocalStructuredProperty
ます。
フィードから受信データ ポイントを「配置」する場合は、事前配置フックを使用して値を事前に計算します。
- pre-put フックでユーザー エンティティを取得します。(NDB を使用していて、既にユーザーを取得している場合は、ローカル メモリから取得する必要があります)
- そのユーザーのすべての「アラート」を取得し、非同期で処理します (タスクレット)
- 全員のアラート データを独自のエンティティに格納し、クエリを高速化するために特別なキー名を使用します (たとえば、キー名を次のように設定すると、 a の代わりに a を
<user>_<alert_type>_<time_in_seconds>_<percentage>
実行できます。このオブジェクトでは、入ってきて、 time-limit specified. For one update every minutes, you can おそらく 1000+ datapoints as a list of tuples . このプロセスから、定義された構成に基づいてアラートが生成され、新しい値が格納されます。get
query
(<timestamp>, <value>)
例(これは大まかな例です。データの保証が必要な場合はトランザクションを使用する必要があります):
class AlertConfiguration(ndb.Model):
timespan_in_seconds = ndb.IntegerProperty('tis', indexed=False)
percent_change = ndb.FloatProperty('pc', indexed=False)
class User(ndb.Model):
alerts = LocalStructuredProperty(AlertConfiguration, repeated=True, name='a')
...
class DataPoint(ndb.Model):
timestamp = ndb.DateTimeProperty('ts', auto_now_add=True)
value = ndb.FloatProperty('v')
user = ndb.KeyProperty(name='u', kind=User)
def _pre_put_hook(self):
alerts = self.user.get().alerts
futures = []
for alert in alerts:
futures.append(process_alert(alert, self))
yield futures
class AlertProcessor(ndb.Model):
previous_data_points = ndb.JsonProperty(name='pdp', compressed=True)
@ndb.tasklet
def process_alert(alert_config, data_point):
key_name = '{user}_{timespan}_{percentage}'.format(user=data_point.user.id(), timespan=alert_config.timespan_in_seconds, percentage=alert_config.percent_change)
processor = yield AlertProcessor.get_or_insert_async(key_name)
new_points = []
found = False
for point in processor.previous_data_points:
delta = data_point.timestamp - datetime.strptime(point[0], '%c')
seconds_diff = (86400 * delta.days) + delta.seconds
if seconds_diff < alert_config.timespan_in_seconds:
new_points.add(point)
if not found:
found = True
if (data_point.value - point[1]) / data_point.value >= alert_config.percent_change:
#E-mail alert here?
new_points.append((data_point.timestamp.strftime('%c'), data_point.value))
processor.previous_data_points = new_points
yield processor.put_async()