5

私はAirflowにかなり慣れていません。ドキュメントを何度か読み、多数の S/O に関する質問やランダムな記事をオンラインで調べましたが、まだこの問題を修正していません。私は間違っていることを非常に単純に感じています。Docker for Windows があり、イメージをプルしpuckel/docker-airflowて、ホストから UI にアクセスできるように、ポートを公開してコンテナーを実行しました。mcr.microsoft.com/mssql/serverWideWorldImporters サンプル データベースを復元した別のコンテナーを実行しています。Airflow UI から、このデータベースへの接続を正常に作成でき、データ プロファイリング セクションからクエリを実行することもできました。以下の画像を確認してください: 接続の作成 接続へのクエリの成功

したがって、これは機能しますが、私のダグは 2 番目のタスクで失敗しますsqlData。コードは次のとおりです。

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mssql_operator import MsSqlOperator
from datetime import timedelta, datetime

copyData = DAG(
    dag_id='copyData',
    schedule_interval='@once',
    start_date=datetime(2019,1,1)
)


printHelloBash = BashOperator(
    task_id = "print_hello_Bash",
    bash_command = 'echo "Lets copy some data"',
    dag = copyData
)

mssqlConnection = "WWI"
sqlData = MsSqlOperator(sql="select top 100 InvoiceDate, TotalDryItems from sales.invoices",
                       task_id="select_some_data",
                       mssql_conn_id=mssqlConnection,
                       database="WideWorldImporters",
                       dag = copyData,
                       depends_on_past=True
          )

queryDataSuccess = BashOperator(
    task_id = "confirm_data_queried",
    bash_command = 'echo "We queried data!"',
    dag = copyData
)

printHelloBash >> sqlData >> queryDataSuccess

最初のエラーは次のとおりです。

*[2019-02-22 16:13:09,176] {{logging_mixin.py:95}} INFO - [2019-02-22 16:13:09,176] {{base_hook.py:83}} INFO - Using connection to: 172.17.0.3  
[2019-02-22 16:13:09,186] {{models.py:1760}} ERROR - Could not create Fernet object: Incorrect padding  
Traceback (most recent call last):  
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 171, in get_fernet
    _fernet = Fernet(fernet_key.encode('utf-8'))  
  File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 34, in __init__
    key = base64.urlsafe_b64decode(key)  
  File "/usr/local/lib/python3.6/base64.py", line 133, in urlsafe_b64decode
    return b64decode(s)  
  File "/usr/local/lib/python3.6/base64.py", line 87, in b64decode
    return binascii.a2b_base64(s)
binascii.Error: Incorrect padding*

これは暗号化に関係していることに気付き、先に進んで and を実行pip install cryptographyしましpip install airflow[crytpo]たが、どちらもまったく同じ結果を返し、要件が既に満たされていることを知らせてくれました。最後に、fernet_key を生成するだけでよいという記述を見つけました。airflow.cfg ファイルのデフォルト キーはfernet_key = $FERNET_KEY. だから私が実行したコンテナのcliから:

python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"

そして、私が置き換えたコードを手に入れまし$FERNET_KEYた。コンテナーを再起動し、dag を再実行したところ、エラーは次のようになりました。

[2019-02-22 16:22:13,641] {{models.py:1760}} ERROR -   
Traceback (most recent call last):  
  File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 106, in _verify_signature
    h.verify(data[-32:])  
  File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/primitives/hmac.py", line 69, in verify
    ctx.verify(signature)  
  File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 73, in verify
    raise InvalidSignature("Signature did not match digest.")  
cryptography.exceptions.InvalidSignature: Signature did not match digest.

最初の暗号ドキュメント スキャンのどれが互換性と関係がありますか?

私は今迷っており、この質問をして、これを解決するために間違った道を進んでいる可能性があるかどうかを確認することにしました。Airflowは素晴らしいように見えるので、どんな助けも大歓迎です。

4

1 に答える 1