Salesforce アクセス トークンと、これらのアクセス トークンを使用した Salesforce へのクエリは、すべてsimple_salesforce
Python クライアントで正常に動作します。問題は、Web アプリケーションでこれらのトークンを更新することです。以下のようなグローバル変数を使用してそれを試みましたが、それはスレッドセーフではなく、単体テストを作成するのが難しいようです。
sf = SalesforceConnector.connect()
@bp.route("", methods=["GET"])
def index():
global sf
try:
payload = get_salesforce_records(sf)
except Exception as e:
print(e)
sf = SalesforceConnector.connect()
payload = get_salesforce_records(sf)
return jsonify(payload), 200
上記のコードで達成しようとしているのは、ルーター ファイルが読み込まれたときに salesforce オブジェクトをインスタンス化することです。SalesforceConnector
は、Salesforce の新しいトークンを取得し、そのアクセス トークンを使用して salesforce オブジェクトを取得するクラスです。でクエリが失敗した場合はget_salesforce_records
、再試行して新しいオブジェクトをグローバル変数に保存します。
アクセストークンの有効期限が切れたときに並列リクエストが流入したときに複数の新しいセールスフォースオブジェクトがインスタンス化されないように、インデックス機能を実装するより良い方法はありますか?
更新:それ以来、トークンの更新のロジックを、トークンを更新し、トークンの有効期限が切れていることがわかったときにクエリを実行するクラスに移動しました。しかし、複数の並列リクエストが原因で発生する並列リフレッシュの中心的な問題は変わりません!