1

私は、Google App Engineデータストアからデータを取得し、MapReduceパイプラインを設定してGoogleCloudStorageを介してBigQueryにデータを移動する方法を示すこのCodelabをフォローしようとしています。Google App Engine Datastoreエンティティを設定し、テストとしてデータを収集したい特定の株式に関するツイートを収集するプロセスがあります。例で概説したようにすべてを実行したと思いますが、データを分割してCloudStorageにロードするすべての作業を実行するシャードがUnicodeEncodeErrorsを発生させています。これが、開発アプリサーバーでアプリをテストした場所からのログです。

INFO     2012-12-18 20:41:07,645 dev_appserver.py:3103] "POST /mapreduce/worker_callback HTTP/1.1" 500 -
WARNING  2012-12-18 20:41:07,648 taskqueue_stub.py:1981] Task appengine-mrshard-1582400592541472B07B9-0-0 failed to execute. This task will retry in 0.100 seconds
ERROR    2012-12-18 20:41:09,453 webapp2.py:1552] 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128)
Traceback (most recent call last):
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1535, in __call__
rv = self.handle_exception(request, response, e)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1529, in __call__
rv = self.router.dispatch(request, response)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1278, in default_dispatcher
return route.handler_adapter(request, response)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1102, in __call__
return handler.dispatch()
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 572, in dispatch
return self.handle_exception(e, self.app.debug)
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 570, in dispatch
return method(*args, **kwargs)
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\base_handler.py", line 65, in post
self.handle()
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 181, in handle
entity, input_reader, ctx, tstate)
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 298, in process_data
output_writer.write(output, ctx)
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\output_writers.py", line 659, in write
ctx.get_pool("file_pool").append(self._filename, str(data))
UnicodeEncodeError: 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128)

コードは次のとおりです。

import json
import webapp2
import urllib2
import time
import calendar
import datetime
import httplib2

from google.appengine.ext import db
from google.appengine.api import taskqueue
from google.appengine.ext import blobstore
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import template
from google.appengine.api import urlfetch

from mapreduce.lib import files
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from apiclient.discovery import build
from oauth2client.appengine import AppAssertionCredentials

SCOPE = 'https://www.googleapis.com/auth/bigquery'
PROJECT_ID = 'project_id' # Your Project ID here
BQ_DATASET_ID = 'datastore_data'
GS_BUCKET = 'bucketname'
ENTITY_KIND = 'main.streamdata'

class streamdata(db.Model):
    querydate = db.DateTimeProperty(auto_now_add = True)
    ticker = db.StringProperty()
    created_at = db.StringProperty()
    tweet_id = db.StringProperty()
    text = db.TextProperty()
    source = db.StringProperty()

class DatastoreMapperPipeline(base_handler.PipelineBase):

    def run(self, entity_type):

        output = yield mapreduce_pipeline.MapperPipeline(
          "Datastore Mapper %s" % entity_type,
          "main.datastore_map",
          "mapreduce.input_readers.DatastoreInputReader",
          output_writer_spec="mapreduce.output_writers.FileOutputWriter",
          params={
              "input_reader":{
                  "entity_kind": entity_type,
                  },
              "output_writer":{
                  "filesystem": "gs",
                  "gs_bucket_name": GS_BUCKET,
                  "output_sharding":"none",
                  }
              },
              shards=10)

        yield CloudStorageToBigQuery(output)

class CloudStorageToBigQuery(base_handler.PipelineBase):

    def run(self, csv_output):

        credentials = AppAssertionCredentials(scope=SCOPE)
        http = credentials.authorize(httplib2.Http())
        bigquery_service = build("bigquery", "v2", http=http)

        jobs = bigquery_service.jobs()
        table_name = 'datastore_data_%s' % datetime.datetime.utcnow().strftime(
            '%m%d%Y_%H%M%S')
        files = [str(f.replace('/gs/', 'gs://')) for f in csv_output]
        result = jobs.insert(projectId=PROJECT_ID,
                            body=build_job_data(table_name,files))

        result.execute()

def build_job_data(table_name, files):
  return {"projectId": PROJECT_ID,
          "configuration":{
              "load": {
                  "sourceUris": files,
                  "schema":{
                      "fields":[
                          {
                              "name":"querydate",
                              "type":"INTEGER",
                          },
                          {
                              "name":"ticker",
                              "type":"STRING",
                          },
                          {
                              "name":"created_at",
                              "type":"STRING",
                          },
                          {
                              "name":"tweet_id",
                              "type":"STRING",
                          },
                          {   "name":"text",
                              "type":"TEXT",
                          },
                          {    
                              "name":"source",
                              "type":"STRING",
                          }
                          ]
                      },
                  "destinationTable":{
                      "projectId": PROJECT_ID,
                      "datasetId": BQ_DATASET_ID,
                      "tableId": table_name,
                      },
                  "maxBadRecords": 0,
                  }
              }
          }

def datastore_map(entity_type):
    data = db.to_dict(entity_type)
    resultlist = [timestamp_to_posix(data.get('querydate')),
                    data.get('ticker'),
                    data.get('created_at'),
                    data.get('tweet_id'),
                    data.get('text'),
                    data.get('source')]
    result = ','.join(['"%s"' % field for field in resultlist])
    yield("%s\n" % result)

def timestamp_to_posix(timestamp):
    return int(time.mktime(timestamp.timetuple()))

class DatastoretoBigQueryStart(webapp2.RequestHandler):
    def get(self):
        pipeline = DatastoreMapperPipeline(ENTITY_KIND)
        pipeline.start()
        path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
        self.redirect(path)

class StreamHandler(webapp2.RequestHandler):

    def get(self):

        tickers = ['AAPL','GOOG', 'IBM', 'BAC', 'INTC',
                   'DELL', 'C', 'JPM', 'WFM', 'WMT', 
                   'AMZN', 'HOT', 'SPG', 'SWY', 'HTSI', 
                   'DUK', 'CEG', 'XOM', 'F', 'WFC', 
                   'CSCO', 'UAL', 'LUV', 'DAL', 'COST', 'YUM',
                   'TLT', 'HYG', 'JNK', 'LQD', 'MSFT',
                   'GE', 'LVS', 'MGM', 'TWX', 'DIS', 'CMCSA',
                   'TWC', 'ORCL', 'WPO', 'NYT', 'GM', 'JCP', 
                   'LNKD', 'OPEN', 'NFLX', 'SBUX', 'GMCR', 
                   'SPLS', 'BBY', 'BBBY', 'YHOO', 'MAR', 
                   'L', 'LOW', 'HD', 'HOV', 'TOL', 'NVR', 'RYL', 
                   'GIS', 'K', 'POST', 'KRFT', 'CHK', 'GGP', 
                   'RSE', 'RWT', 'AIG', 'CB', 'BRK.A', 'CAT']

        for i in set(tickers):

            url = 'http://search.twitter.com/search.json?q='
            resultcount = '&rpp=100'
            language = '&lang=en'
            encoding = '%40%24'
            tickerstring = url + encoding + i + resultcount + language
            tickurl = urllib2.Request(tickerstring)
            tweets = urllib2.urlopen(tickurl)
            code = tweets.getcode()

            if code == 200:
                results = json.load(tweets, 'utf-8')
                if "results" in results:
                    entries = results["results"]
                    for entry in entries:
                        tweet = streamdata()
                        created = entry['created_at']
                        tweetid = entry['id_str']
                        tweettxt = entry['text']
                        tweet.ticker = i
                        tweet.created_at = created
                        tweet.tweet_id = tweetid
                        tweet.text = tweettxt
                        tweet.source = "Twitter"
                        tweet.put()

class MainHandler(webapp2.RequestHandler):

    def get(self):
        self.response.out.write('<a href="/start">Click here</a> to start the Datastore to BigQuery pipeline. ')
        self.response.out.write('<a href="/add_data">Click here</a> to start adding data to the datastore. ')


app = webapp2.WSGIApplication([
                               ('/', MainHandler),
                               ('/start', DatastoretoBigQueryStart), 
                               ('/add_data', StreamHandler)], 
                              debug=True)

誰かが持っているかもしれないどんな洞察も大きな助けになるでしょう。

どうもありがとう。

4

2 に答える 2

3

Unicode データをバイト文字列に変換しています:

ctx.get_pool("file_pool").append(self._filename, str(data))

エンコーディングを指定せずにこれを行うと、Python はデフォルトの ASCII にフォールバックします。代わりに、データに含まれるすべての Unicode コードポイントを処理できる別のエンコーディングに落ち着く必要があります。

ほとんどのテキストでは、UTF-8 が適しています。西洋以外のテキスト (アラビア語、アジア語など) が多い場合は、UTF-16 の方が効率的かもしれません。いずれの場合も、明示的にエンコードする必要があります。

ctx.get_pool("file_pool").append(self._filename, data.encode('utf8'))

そのファイルからデータを読み戻すときは、filedata.decode('utf8')Unicode にデコードするために使用します。

Python と Unicode の詳細については、 Python Unicode HOWTOを参照してください。

于 2012-12-18T21:12:30.227 に答える
0
ctx.get_pool("file_pool").append(self._filename, str(data))

データに Unicode 文字が含まれている場合、これは失敗します。試す

ctx.get_pool("file_pool").append(self._filename, unicode(data))
于 2012-12-18T21:03:25.067 に答える