1

Gunicorn サーバーを使用して実行している Falcon フレームワークでアプリを作成しました。サーバーが起動すると、アプリは最初にランダム フォレスト モデルを学習します。

forest = sklearn.ensemble.ExtraTreesClassifier(n_estimators=150, n_jobs=-1)
forest.fit(x, t)

次に、投稿されたリクエストの確率を返します。iPython でコードを実行すると、これは私のサーバーで正常に動作します (このモデルのトレーニングには 15 秒かかり、12 コアで実行されます)。

私がアプリを書いていたとき、私は設定n_estimators=10し、すべてが機能していました。アプリの微調整が終わったら、n_estimators150 に戻しました。ただし、Gunicorn を実行するとgunicorn -c ./app.conf app:app、htop から、すべてのコアで数秒間の実行が確認forest.fit(x, t)され、その後、すべてのコアの使用率が 0 に低下しました。その後、Gunicorn ワーカーが 10 分後にタイムアウトするまで、メソッドは無期限に実行され続けます。

Gunicorn と Falcon、または WSGI テクノロジを使用するのはこれが初めてであり、問​​題の原因やトラブルシューティング方法についてはわかりません。

編集:

gunicorn の設定ファイル:

# app.conf
# run with gunicorn -c ./app.conf app:app 
import sys
sys.path.append('/home/user/project/Module')

bind = "127.0.0.1:8123"
timeout = 60*20 # Timeout worker after more than 20 minutes`

ハヤブサのコード:

class Processor(object):
    """ Processor object handles the training of the models, 
    feature generation of requests and probability predictions. 
    """
    # data statistics used in feature calculations
    data_statistics = {}

    # Classification targets
    targets = ()

    # Select features for the models.
    cols1 = [ #... 
            ]

    cols2 = [ #...
            ]
    model_1 = ExtraTreesClassifier(n_estimators=150, n_jobs=-1)
    model_2 = ExtraTreesClassifier(n_estimators=150, n_jobs=-1)

    def __init__(self, features_dataset, tr_prepro):
        # Get the datasets
        da_1, da_2 = self.prepare_datasets(features_dataset)
        # Train models
# ----THIS IS WHERE THE PROGRAM HANGS -----------------------------------
        self.model_1.fit(da_1.x, utils.vectors_to_labels(da_1.t))
# -----------------------------------------------------------------------
        self.model_2.fit(da_2.x, utils.vectors_to_labels(da_2.t))
        # Generate data statistics for feature calculations
        self.calculate_data_statistics(tr_prepro)

    def prepare_datasets(self, features_dataset):
        sel_cols = [ #... 
                   ]

        # Build dataset
        d = features_dataset[sel_cols].dropna()
        da, scalers = ft.build_dataset(d, scaling='std', target_feature='outcome')

        # Binirize data
        da_bin = utils.binirize_dataset(da)

        # What are the classification targets
        self.targets = da_bin.t_labels

        # Prepare the datasets
        da_1 = da_bin.select_attributes(self.cols1)
        da_2 = da_bin.select_attributes(self.cols2)
        return da_1, da_2

    def calculate_data_statistics(self, tr_prepro):
        logger.info('Getting data and feature statistics...')
        #...
        logger.info('Done.')

    def import_data(self, data):
        # convert dictionary generated from json to Pandas DataFrame
        return tr

    def generate_features(self, tr):
        # Preprocessing, Feature calculations, imputation 
        return tr

    def predict_proba(self, data):
        # Convert Data
        tr = self.import_data(data)
        # Generate features
        tr = self.generate_features(tr)
        # Select model based on missing values - either no. 1 or no. 2
        tr_1 = #...
        tr_2 = #...
        # Get the probabilities from different models
        if tr_1.shape[0] > 0:
            tr_1.loc[:, 'prob'] = self.model_1.predict_proba(tr_1.loc[:, self.cols1])[:, self.targets.index('POSITIVE')]
        if tr_2.shape[0] > 0:
            tr_2.loc[:, 'prob'] = self.model_2.predict_proba(tr_2.loc[:, self.cols2])[:, self.targets.index('POSITIVE')]
        return pd.concat([tr_1, tr_2], axis=0)

    @staticmethod
    def export_single_result(tr):
        result = {'sample_id': tr.loc[0, 'sample_id'],
                  'batch_id': tr.loc[0, 'batch_id'],
                  'prob': tr.loc[0, 'prob']
                  }
        return result

class JSONTranslator(object):
    def process_request(self, req, resp):
        """Generic method for extracting json from requets

        Throws
        ------
        HTTP 400 (Bad Request)
        HTTP 753 ('Syntax Error')
        """
        if req.content_length in (None, 0):
            # Nothing to do
            return
        body = req.stream.read()
        if not body:
            raise falcon.HTTPBadRequest('Empty request body',
                                        'A valid JSON document is required.')
        try:
            req.context['data'] = json.loads(body.decode('utf-8'))
        except (ValueError, UnicodeDecodeError):
            raise falcon.HTTPError(falcon.HTTP_753,
                                   'Malformed JSON',
                                   'Could not decode the request body. The '
                                   'JSON was incorrect or not encoded as '
                                   'UTF-8.')

    def process_response(self, req, resp, resource):
        """Generic method for putting response to json
        Does not do anything if 'result_json' not in req.context.
        """
        if 'result_json' not in req.context:
            return
        resp.body = json.dumps(req.context['result_json'])

class ProbResource(object):

    def __init__(self, processor):
        self.schema_raw = open(config.__ROOT__ + "app_docs/broadcast_schema.json").read()
        self.schema = json.loads(self.schema_raw)
        self.processor = processor

    def validate_request(self, req):
        """ Validate the request json against the schema.
        Throws
        ------
        HTTP 753 ('Syntax Error')
        """       
        data = req.context['data']
        # validate the json
        try:
            v = jsonschema.Draft4Validator(self.schema)  # using jsonschema draft 4
            err_msg = str()
            for error in sorted(v.iter_errors(data), key=str):
                err_msg += str(error)

            if len(err_msg) > 0:
                raise falcon.HTTPError(falcon.HTTP_753,
                                       'JSON failed validation',
                                       err_msg)
        except jsonschema.ValidationError as e:
            print("Failed to use schema:\n" + str(self.schema_raw))
            raise e

    def on_get(self, req, resp):
        """Handles GET requests

        Throws
        ------
        HTTP 404 (Not Found)
        """
        self.validate_request(req)
        data = req.context['data']
        try:
            # get probability
            tr = self.processor.predict_proba(data)
            # convert pandas dataframe to dictionary
            result = self.processor.export_single_result(tr)
            # send the dictionary away
            req.context['result_json'] = result
        except Exception as ex:
            raise falcon.HTTPError(falcon.HTTP_404, 'Error', ex.message)
        resp.status = falcon.HTTP_200


# Get data
features_data = fastserialize.load(config.__ROOT__ + 'data/samples.s')
prepro_data = fastserialize.load(config.__ROOT__ + 'data/prepro/samples_preprocessed.s')

# Get the models - this is where the code hangs
sp = SampleProcessor(features_data, prepro_data)

app = falcon.API(middleware=[JSONTranslator()])

prob = ProbResource(sp)

app.add_route('/prob', prob)
4

0 に答える 0