0

Authlibを使用して OAuth2 プロバイダーとクライアントを実装しようとしています。ドキュメントに従ってプロバイダーを記述しましたが、正しいようですが、クライアントが認証ページにリダイレクトされ、リソースにアクセスするための確認が正常に行われた後、次のエラーが発生しました: {"error": "invalid_grant"} . 残念ながら、私はその理由を見つけることができませんでした。

ここに私のコード:

models.py

from pony.orm import *
from authlib.oauth2.rfc6749 import ClientMixin

db = Database()

class User(db.Entity):
    id = PrimaryKey(int, auto=True)
    name = Required(str)
    username = Required(str)
    password = Required(str)
    clients = Set("Client")
    tokens = Set("Token")

    def get_user_id(self):
        return self.id

    def get_username(self):
        return self.name    


class Client(db.Entity, ClientMixin):
    id = PrimaryKey(int, auto=True)
    name = Required(str)
    client_id = Required(str)
    client_secret = Required(str)
    redirect_url = Required(str)
    users = Set("User")
    tokens = Set("Token")

    def check_redirect_uri(self, redirect_uri): 
        print "Client::check_redirect_uri:", redirect_uri
        return True

    def check_response_type(self, response_type):
        print "Client::check_response_type: ", response_type
        return response_type in ["authorization_code", "code", "password"]

#Authorization token
class Token(db.Entity):
    id = PrimaryKey(int, auto=True)
    value = Required(str)
    user = Required("User")
    client = Required("Client")
    scope = Optional(str)


@db_session
def create_new_user(name, username, password):
    user = User(name=name, username=username, password=password)
    commit()

@db_session
def create_new_client(name, client_id, client_secret, redirect_url):
    client = Client(name=name, client_id=client_id, client_secret=client_secret, redirect_url=redirect_url)
    commit()



from authlib.oauth2.rfc6749 import grants
from authlib.common.security import generate_token

class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):

    @db_session
    def create_authorization_code(self, client, grant_user, request):
        code = generate_token(48)
        token = Token(value=token, user=User[int(grant_user.get_user_id)],
                      client=Client[int(client.client_id)], scope=request.scope)
        commit()
        print "AuthorizationCodeGrant::create_authorization_code: token:", token
        return code

    def parse_authorization_code(self, code, client):
        print "AuthorizationCodeGrant::parse_authorization_code"
        token = Token.select(lambda t: t.value == code and t.client_id == client.client_id).first()
        return token

    def delete_authorization_code(self, authorization_code):
        print "AuthorizationCodeGrant::delete autorization code: ", authorization_code.user_id

    def authenticate_user(self, authorization_code):
        print "authenticate_user"
        return User[int(authorization_code.user_id)]


db.bind(provider='sqlite', filename='database.sqlite', create_db=True)
db.generate_mapping(create_tables=True)
set_sql_debug(True)

サーバー.py


app = Flask(__name__, template_folder='templates')

@db_session
def query_client(client_id):
    print "query_client id", client_id
    client = Client.select(lambda c: c.client_id == client_id).first()
    return client

@db_session
def save_token(token_data, request):
    if request.user:
        user_id = request.user.get_user_id()
        print "user_id", user_id
    else:
        user_id = request.client.user_id
        print "user_id", user_id
    client =   Client[int(request.client.client_id)]
    user =  User[int(user_id)]
    token = Token(user=user, client=client)
    commit()


server = AuthorizationServer(
    query_client=query_client,
    save_token=save_token,
)

#AuthorizationCodeGrant defindo em models.py
server.register_grant(AuthorizationCodeGrant)
from authlib.flask.oauth2 import ResourceProtector

require_oauth = ResourceProtector()


def current_user():
    if 'id' in session:
        uid = session['id']
        try:
            return User[int(uid)]
        except:
            return None    
    return None


@app.route('/', methods=['GET', 'POST'])
@db_session
def index():
    if session['id']:
        user = current_user()
        return render_template('user.html', user=user)
    return render_template('index.html')


@app.route('/login', methods=['GET', 'POST'])
@db_session
def login():
    if request.method == 'GET':
        if session['id']:
            user = current_user()
            return render_template('user.html', user=user)
        return render_template('login.html')
    else:
        username = request.form['username']
        user = User.select(lambda c: c.username == username).first()
        if user:
            session['id'] = user.id
            return render_template('user.html', user=user)
        else:
            return render_template('error.html', msg="Usuario nao encontrado")    


@app.route('/newlogin', methods=['GET', 'POST'])
def create_login():
    if request.method == 'GET':
        return render_template('newlogin.html')
    else:
        name = request.form['name']
        username = request.form['username']
        password = request.form['password']
        create_new_user(name, username, password)
        return redirect("/")

@app.route('/logout', methods=['GET', 'POST'])
def logout():
    session['id'] = None
    return redirect('/')


@app.route('/clients', methods=['GET', 'POST'])
@db_session
def clients():
    if request.method == 'POST':
       name = request.form['name']
       client_id = gen_salt(24)
       client_secret = gen_salt(48)
       redirect_url = request.form['redirecturl']
       create_new_client(name, client_id, client_secret, redirect_url=redirect_url)

    clients = Client.select()
    return render_template('clients.html', clients=clients)


@app.route('/oauth/authorize', methods=['GET', 'POST'])
@db_session
def authorize():
    user = current_user()
    if request.method == 'GET':
        try:
            grant = server.validate_consent_request(end_user=user)
            print "Server: /authorize: grant:"
            pprint(vars(grant))
        except OAuth2Error as error:
            return error.error
        return render_template('authorize.html', user=user, grant=grant)
    if not user and 'username' in request.form:
        username = request.form.get('username')
        user = User.select(lambda u: u.username == username).first()
    if request.form['confirm']:
        grant_user = user
    else:
        grant_user = None
    print "Server: /authorize: grant_user:", grant_user    
    return server.create_authorization_response(grant_user=grant_user)

@app.route('/oauth/token', methods=['GET', 'POST'])
def issue_token():
    return server.create_token_response()

@app.route('/user/profile')
@require_oauth("profile")
def get_profile():
    return jsonify(current_user())



if __name__ == '__main__':
    import os 
    os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = 'true'
    os.environ['AUTHLIB_INSECURE_TRANSPORT'] ='1'
    app.debug = True
    app.secret_key = 'development'
    server.init_app(app)
    app.run()

client.py

app = Flask(__name__)
oauth = OAuth(app)

client = oauth.register(
    name='ecl-app1',
    client_id='wKbpjHNvSSeirBO9SzYdWOg4',
    client_secret='1SNDG8hxpNjP2rrlapVAPy8d9CHdVQ8Kzd4NqHydV7VdIrv7',
    access_token_url='http://localhost:5000/oauth/access_token',
    authorize_url='http://localhost:5000/oauth/authorize',
    api_base_url='http://localhost:5000/',
    grant_type='code',
    client_kwargs={'scope': 'profile'},
)

@app.route('/')
def index():
    redirect_uri = url_for('authorize', _external=True)

    print "App Client: index: redirect_uri:", redirect_uri
    resp = client.authorize_redirect(redirect_uri)
    print "App Client: index: authorize_redirect: ", resp
    #print(resp.status)
    #print(resp.headers)
    #print(resp.get_data())
    return client.authorize_redirect(redirect_uri)

@app.route('/authorize')
def authorize():
    print "App Client: authorize:"
    token = client.authorize_access_token()
    # you can save the token into database
    profile = client.get('/user')
    return "ok"




if __name__ == '__main__':
    import os 
    os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = 'true'
    os.environ['AUTHLIB_INSECURE_TRANSPORT'] ='1'
    app.debug = True
    app.secret_key = 'development'
    app.run(port=9000)

4

1 に答える 1