2

私のウェブサイトは、継続的に更新されるフィードからの時系列データにインデックスを付けています。Web サイトのユーザーは、データ内の特定の属性の値が一定期間にわたって一定の割合で変化したときにトリガーされるアラートを構成できる必要があります。

例: ユーザーの Twitter フォロワー数を追跡しているとします。(単純化された) データ フィードは次のようになります。

日付、フォロワー

  • 10:00、1
  • 10:01、2
  • 10:02、2
  • 10:03、15
  • ...

アラート:

  • 「フォロワー」が過去 1 時間で 15% 増加した場合に通知します。
  • 過去 40 分間で「フォロワー」が 10% 減少した場合は通知してください。

単純なデータ フィードは 1 つだけです。(願わくば) 何千ものアラートが定義されます。これらのアラートの多くは類似している可能性がありますが、一意のアラートがいくつあるかを見積もることは困難です。

編集: 前にこれについて言及するのを忘れていましたが、フォロワーの数はかなり頻繁に (毎分) 変化します。

データストアやその他の App Engine 機能を使用して、このようなメカニズムを実装する最も洗練された方法は何でしょうか? アラートは、比較的リアルタイム (+/- 数分) でトリガーする必要があります。

ありがとう!

4

4 に答える 4

0

ユーザーごとに 1 分間に複数回データを更新する必要がない場合:

  1. ユーザーに「アラート」を設定しLocalStructuredPropertyます。
  2. フィードから受信データ ポイントを「配置」する場合は、事前配置フックを使用して値を事前に計算します。

    • pre-put フックでユーザー エンティティを取得します。(NDB を使用していて、既にユーザーを取得している場合は、ローカル メモリから取得する必要があります)
    • そのユーザーのすべての「アラート」を取得し、非同期で処理します (タスクレット)
    • 全員のアラート データを独自のエンティティに格納し、クエリを高速化するために特別なキー名を使用します (たとえば、キー名を次のように設定すると、 a の代わりに a を<user>_<alert_type>_<time_in_seconds>_<percentage>実行できます。このオブジェクトでは、入ってきて、 time-limit specified. For one update every minutes, you can おそらく 1000+ datapoints as a list of tuples . このプロセスから、定義された構成に基づいてアラートが生成され、新しい値が格納されます。getquery(<timestamp>, <value>)

例(これは大まかな例です。データの保証が必要な場合はトランザクションを使用する必要があります):

class AlertConfiguration(ndb.Model):
  timespan_in_seconds = ndb.IntegerProperty('tis', indexed=False)
  percent_change = ndb.FloatProperty('pc', indexed=False)

class User(ndb.Model):
  alerts = LocalStructuredProperty(AlertConfiguration, repeated=True, name='a')
  ...

class DataPoint(ndb.Model):
   timestamp = ndb.DateTimeProperty('ts', auto_now_add=True)
   value = ndb.FloatProperty('v')
   user = ndb.KeyProperty(name='u', kind=User)

   def _pre_put_hook(self):
     alerts = self.user.get().alerts
     futures = []
     for alert in alerts:
       futures.append(process_alert(alert, self))
     yield futures

class AlertProcessor(ndb.Model):
  previous_data_points = ndb.JsonProperty(name='pdp', compressed=True)

@ndb.tasklet
def process_alert(alert_config, data_point):
  key_name = '{user}_{timespan}_{percentage}'.format(user=data_point.user.id(), timespan=alert_config.timespan_in_seconds, percentage=alert_config.percent_change)
  processor = yield AlertProcessor.get_or_insert_async(key_name)
  new_points = []
  found = False
  for point in processor.previous_data_points:
     delta = data_point.timestamp - datetime.strptime(point[0], '%c')
     seconds_diff = (86400 * delta.days) + delta.seconds
     if seconds_diff < alert_config.timespan_in_seconds:
       new_points.add(point)
       if not found:
         found = True
         if (data_point.value - point[1]) / data_point.value >= alert_config.percent_change:
            #E-mail alert here?
  new_points.append((data_point.timestamp.strftime('%c'), data_point.value))
  processor.previous_data_points = new_points
  yield processor.put_async()
于 2013-04-29T14:55:30.513 に答える
0

モデルを非正規化して、パフォーマンスと冗長性のバランスを見つけ、書き込み操作と読み取り操作を試みます。

例えば:

  1. このサービスはリアルタイムの変化に重点を置いているため、特定の属性ごとの乗算データを 1 つのデータストアにまとめて格納できます。たとえば、大規模なエンティティでは、同じユーザーのすべての変更が 5 日間で保存されます。したがって、時間の経過に伴う変化を計算するために追加のクエリは必要ありません。これは、Google が App Engine でコード ジャムをホストする方法でもあります。ツリー構造をデータストアに適用して、いくつかの追加機能を提供できます。

  2. アラートの一般的な方法は、データ モデル自体にデータの変更を直接監視しているユーザーを書き留めることです。

denormalize は実際にユースケースが何であるかを明確にする必要があるため、この設計は私の仮定のみに基づいています。

class Watcher(ndb.Model):
    # define the rule such as "Notify me if 'followers' has increased by 15% in the past 1 hour."
    pass


class Attribute(ndb.Model):
    name = ndb.StringProperty() # the name of this attribute such as "twitter_user_1:followers"
    data = ndb.PickleProperty() # a tree store all changes of the specify attribute

    watch_list = ndb.LocalStructureProperty(repeated=True, kind=Watcher) # who want to received the notification

したがって、このサービスは必要なすべての情報を 1 か所に集めることができます。

于 2013-04-22T18:35:20.760 に答える