0

別の AWS アカウント B に存在する S3 ファイルを処理する AWS Glue ジョブをセットアップします。アカウント A の IAM ロール (glue ジョブ IAM ロール) は、STS を使用して、目的のファイルへのアクセスを提供するアカウント B のロールを引き受けます。アカウント B の IAM ロールは、アカウント A の Glue ジョブ ロールと信頼関係を持っています。アクセス キーとシークレット キーを出力できたので、STS がうまく機能していると仮定します。

以下のエラーが発生します。

An error occurred while calling o83.json. com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied;

Access Denied Exception が発生したときの S3A コネクタの正しい実装は何ですか。

これが私のコードです:

from __future__ import print_function
from pyspark import SparkContext 
from pyspark import SparkConf
from pyspark import SQLContext 
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col
from pyspark.sql.types import *
from pyspark.sql import HiveContext
from pyspark.sql.functions import explode
from pyspark.sql.functions import explode_outer
from pyspark.sql.functions import substring_index
from pyspark.sql.functions import input_file_name
from pyspark.sql import functions as f 
import sys
import os
import os
import boto3
import sys
import errno
import time
import datetime
from datetime import timedelta, date
from pyspark.sql.functions import split
from pyspark.sql.functions import substring
from boto3.session import Session

spark = SparkSession\
        .builder\
        .appName("JsonInputFormat")\
        .enableHiveSupport()\
        .getOrCreate()\

sc = spark.sparkContext

hive_context = HiveContext(sc)

hive_context.setConf("hive.exec.dynamic.partition", "true")
hive_context.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
hive_context.setConf("hive.serialization.extend.nesting.levels","true")


sqlCtx = HiveContext(sc)

client = boto3.client('sts')
response = client.assume_role(RoleArn='ROLE_TO_ASSUME', RoleSessionName='AssumeRoleSession1')
credentials = response['Credentials']


ACCESS_KEY = credentials['AccessKeyId']
SECRET_KEY = credentials['SecretAccessKey']
print('access key is {}'.format(ACCESS_KEY))
print('secret key is {}'.format(SECRET_KEY))
print("Hadoop version: " + sc._gateway.jvm.org.apache.hadoop.util.VersionInfo.getVersion())
session = Session(aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)
s3      = session.resource('s3')

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", ACCESS_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET_KEY)
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3a.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3-us-east-1.amazonaws.com")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")


def flatten_schema(schema):
    """Take schema as returned from schema().jsonValue()
    and return list of field names with full path"""
    def _flatten(schema, path="", accum=None):
        # Extract name of the current element
        name = schema.get("name")
        # If there is a name extend path
        if name is not None:
            path = "{0}.{1}".format(path, name) if path else name
            print('path is {}'.format(path))
        # It is some kind of struct
        if isinstance(schema.get("fields"), list):
            for field in schema.get("fields"):
                _flatten(field, path, accum)
        elif isinstance(schema.get("type"), dict):
            _flatten(schema.get("type"), path, accum)
        # It is an atomic type
        else:
            accum.append(path)
    accum = []
    _flatten(schema, "", accum)
    return  accum

sqlCtx.sql("set spark.sql.caseSensitive=true")

yesterday = date.today() - timedelta(1)
daybefore=yesterday.strftime("%Y-%m-%d")
currentdate=time.strftime("%Y-%m-%d")
key = 'KEY={}'.format(str(daybefore))
bucket = 'BUKCET_NAME'
df_base=spark.read.json('s3a://{}/{}/*/'.format(bucket,key))

base_schema=df_base.schema

datePrefix=str(daybefore)
source='s3a://{}/{}'.format(bucket,key)

df1=spark.read.json(source,schema=base_schema)


schema=df1.schema.jsonValue()
columns_list=flatten_schema(schema)

print('columns list is {}'.format(columns_list))


df2 = df1.select(*(col(x).alias(x.replace('.','_')) for x in columns_list))
print('df2 is {}'.format(df2))
df3=df2.select("*",explode_outer(df2.contents).alias("contents_flat"))
df3=df3.drop("contents")
print('df3 is {}'.format(df3))


schema4=df3.schema.jsonValue()
columns_list4=flatten_schema(schema4)
print('columns list 4 is {}'.format(columns_list4))
df5 = df3.select(*(col(x).alias(x.replace('.','_')) for x in columns_list4))
print('df5 is {}'.format(df5))

schema5=df5.schema.jsonValue()
columns_list5=flatten_schema(schema5)
print('columns list 5 is {}'.format(columns_list5))
df6 = df5.select(*(col(x).alias(x.replace('contents_flat','contents')) for x in columns_list5))
print('df6 is {}'.format(df6))

schema6=df6.schema.jsonValue()
columns_list6=flatten_schema(schema6)
print('column list 6 is {}'.format(columns_list6))
df7 = df6.select(*(col(x) for x in columns_list6)) #above line edited down here

schema7=df7.schema.jsonValue()
print('schema7 is {}'.format(schema7))
columns_list7=flatten_schema(schema7)
print('columns list 7 is {}'.format(columns_list7))
df7 = df7.select(*(col(x).alias(x.replace('.','_')) for x in columns_list7))
df7=df7.select("*",explode_outer(df7.business_address_latLng_warnings).alias("business_address_latLng_warnings_flat"))
df7=df7.drop("business_address_latLng_warnings")

print('df7 is {}'.format(df7))

df8 = df7.withColumn("filename",input_file_name())
split_col = split(df8['filename'], 'short_date=')
df9 = df8.withColumn('shortfilename', split_col.getItem(1))
df_final = df9.withColumn('filedate', substring('shortfilename',1,10)).drop('shortfilename')
print('df_final is {}'.format(df_final))

df_final.write.mode('append').csv('s3://{bucket}/{folder1}/', header='true')

spark.stop()```
4

0 に答える 0