1

私の目標は、TWS API と一緒にティッカーのリストを使用して、会社のスナップショット (reqFundamentalData() -> "ReportSnapshot") の一部と、これらのティッカーの財務諸表 (reqFundamentalData() -> "ReportsFinStatements") を抽出することです。データフレームに入れ、寄木細工のファイルとして保存します。
提供されたソリューションをマージしようとしました:

  1. ティッカーのリスト TWS API を使用して株式のファンダメンタル データをダウンロードすると、最初のデータ エントリのみが実行され、その他は無視されます。誰がこれを解決しますか?
  2. XML をデータフレームとして保存する XML を Pandas に変換する
  3. データ の保存 TWS API から csv ファイルへのデータの保存

コード:

from datetime import datetime
from bs4 import BeautifulSoup as bs
import pandas as pd

from ibapi.client import EClient
from ibapi.contract import Contract
from ibapi.wrapper import EWrapper
import logging

import random
import pathlib
import time
from datetime import date
import datetime
from pathlib import Path

class TestApp(EWrapper, EClient):
def __init__(self, addr, port, client_id):
    EWrapper.__init__(self)   # new - book
    EClient.__init__(self, self)

    self.firstReqId = 8001
    self.contracts = {}  # keep in dict so you can lookup
    self.contNumber = self.firstReqId

    # add dataframes to store the result
    self.df_company_info = pd.DataFrame(data=None, index=None, columns=None)
    self.df_fin_stmts = pd.DataFrame(data=None, index=None, columns=None)

def addContracts(self, cont):
    self.contracts[self.contNumber] = cont  # add to dict using 8001 first time
    self.contNumber += 1  # next id will be 8002 etc.

def nextValidId(self, orderId: int):
    # now you are connected, ask for data, no need for sleeps
    # this isn't the only way to know the api is started but it's what IB recommends
    self.contNumber = self.firstReqId   # start with first reqId
    self.getNextData()

def error(self, reqId, errorCode, errorString):
    print("Error: ", reqId, "", errorCode, "", errorString)

    # if there was an error in one of your requests, just contimue with next id
    if reqId > 0 and self.contracts.get(self.contNumber):
        # err in reqFundametalData based on reqid existing in map
        print('err in', self.contracts[reqId].symbol)
        self.getNextData() # try next one

def fundamentalData(self, reqId, fundamental_data):
    self.fundamentalData = fundamental_data
    try:
        if self.fundamentalData is not None:
            # convert XML to dictionary entry
            dict_company_info = self.CompanyInfoXMLtoDict(self.fundamentalData)
            # add dict entry to dataframe
            df_add_row = pd.DataFrame([dict_company_info])
            self.df_company_info = self.df_company_info.append(df_add_row, ignore_index=True)
    except KeyError:
        print('Ticker: ' + str(self.contNumber) + ' could not get company_info')
    except TypeError:
        print('Ticker: ' + str(self.contNumber) + ' could not get company_info')
    except ValueError:
        print('Ticker: ' + str(self.contNumber) + ' could not get company_info')
    except IndexError:
        print('Ticker: ' + str(self.contNumber) + ' could not get company_info')

    self.getNextData()

def getNextData(self):
    if self.contracts.get(self.contNumber):     # means a contract exists
        # so req data
        self.reqFundamentalData(self.contNumber, self.contracts[self.contNumber], "ReportSnapshot", [])
        self.contNumber += 1    # now get ready for next request
    else:   # means no more sequentially numbered contracts
        print('done')
        self.disconnect()   # just exit


def CompanyInfoXMLtoDict(self, fundamentals):
    soup = bs(fundamentals, 'xml')
    
    df_company_info = pd.DataFrame(data=None, index=None, columns=None)
    ticker = ''
    longName = ''
    fullTimeEmployees = 0

    # search for a tag e.g. </IssueID>
    for issues in soup.find_all('IssueID'):
        # within this tag -> search of unique ID e.g. IssueID type=...
        if issues.get('Type') == "Ticker":
            ticker = issues.get_text()
            break

    for coID_i in soup.find_all('CoID'):
        if coID_i.get('Type') == "CompanyName":
            longName = coID_i.get_text()
            break

    for employees_i in soup.find_all('Employees'):
        fullTimeEmployees = employees_i.get_text()
        break

    # create result entry row
    if ticker is not None and ticker != '':
        new_row_dict = {'ticker': ticker, 'name': longName,
                        'num_employees': fullTimeEmployees}
    else:
        new_row_dict = {}

    return new_row_dict

def FinStmtsXMLtoDF(self, fundamentals, ticker, stmts_type):
    today = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    today_date = date.today().strftime("%Y-%m-%d")
   
    if stmts_type == 'annual':
        period_type = 'Annual'
    else:
        period_type = 'Interim'

    soup = bs(fundamentals, 'xml')

    # build dict
    stmts_terms = {}
    for terms in soup.find_all("mapItem"):
        # add entry to dict -> dict for maping of code to description
        stmts_terms[terms.get('coaItem')] = terms.get_text()

    bal_l = []
    inc_l = []
    cas_l = []
    for period in soup.find_all('FiscalPeriod'):
        # quarterly vs. annually
        if period.get('Type') == period_type:
            for statement in period.find_all('Statement'):
                if statement.find('UpdateType').get('Code') != 'CLA':
                    dic = {}

                    stmts_type = statement.get('Type')
                    # source_date = statement.find('Source').get('Date')
                    statement_date = statement.find('StatementDate').text
                    # dic['date'] = source_date
                    dic['rep_date'] = statement_date

                    for item in statement.find_all('lineItem'):
                        # dic[item.get('coaCode')] = item.text
                        dic[stmts_terms.get(item.get('coaCode'), 'DEFAULT')] = item.text

                    if stmts_type == 'BAL':
                        bal_l.append(dic)
                        # print(stmts_type, date, dic)
                    elif stmts_type == 'INC':
                        inc_l.append(dic)
                    elif stmts_type == 'CAS':
                        cas_l.append(dic)

    df_balance_sheet = pd.DataFrame(bal_l).sort_values('rep_date')
    df_income_statement = pd.DataFrame(inc_l).sort_values('rep_date')
    df_cash_flow = pd.DataFrame(cas_l).sort_values('rep_date')

    # merge all stmts for same rep_date
    df_fin_stmts = pd.DataFrame(data=None, index=None, columns=None)
    df_fin_stmts = df_balance_sheet.merge(df_income_statement, how='left',
                           left_on=['rep_date'],
                           right_on=['rep_date'])
    df_fin_stmts = df_fin_stmts.merge(df_cash_flow, how='left',
                                         left_on=['rep_date'],
                                         right_on=['rep_date'])

    df_fin_stmts.insert(loc=0, column='ticker', value=ticker)
    df_fin_stmts.insert(loc=1, column='date_updated', value=today_date)

    return df_fin_stmts

def main():
    # ----- config
    project_data_folder = '/home/data/'
    project_data_folder = Path(project_data_folder)
    # ticker are stored in a csv file
    csv_master_ticker = Path('home/data/ticker/ticker-list.csv')

    # load list of tickers
    df = pd.read_csv(csv_master_ticker)
    list_master_ticker = df['ticker'].tolist()

    fusion_company_info = pd.DataFrame(data=None, index=None, columns=None)
    fusion_fin_stmts = pd.DataFrame(data=None, index=None, columns=None)
    fusion_q_fin_stmts = pd.DataFrame(data=None, index=None, columns=None)

    client = TestApp('127.0.0.1', 7496, 0)
    
    for ticker in list_master_ticker:
  
       # remove additional postfix for exchange e.g. XYZ.F -> XYZ
       ticker_clean = ticker.rstrip('.')

       contract = Contract()
       contract.symbol = ticker_clean
       contract.secType = 'STK'
       contract.exchange = "SMART"
       contract.currency = 'USD'

       client.addContracts(contract)

    client.connect('127.0.0.1', 7496, 0)
    client.run()

    if fusion_company_info.empty:
        fusion_company_info = client.df_company_info
    else:
        fusion_company_info = pd.concat([fusion_company_info, client.df_company_info])

            tws_company_info_file_name = 'tws_company_info.parquet'
        file_name = project_data_folder / tws_company_info_file_name
        try:
            if fusion_company_info is not None:
                if not fusion_company_info.empty:
                    fusion_company_info.to_parquet(file_name, engine='pyarrow')

        #    financial statements - annual
        tws_fin_stmts_file_name = 'tws_fin_stmts.parquet'
        file_name = project_data_folder / tws_fin_stmts_file_name
        try:
            if fusion_fin_stmts is not None:
                if not fusion_fin_stmts.empty:
                    fusion_fin_stmts.to_parquet(file_name, engine='pyarrow')

    

エラー メッセージが表示される

Traceback (most recent call last):
      File "...\ibapi\client.py", line 239, in run
        self.decoder.interpret(fields)
      File "...\ibapi\decoder.py", line 1278, in interpret
        self.interpretWithSignature(fields, handleInfo)
      File "...\ibapi\decoder.py", line 1259, in interpretWithSignature
        method(*args)
    TypeError: 'str' object is not callable
    python-BaseException

このエラー メッセージについて誰か助けてもらえますか? forループを削除して、単一のティッカーに対してのみ実行すると、たとえば

client.contracts = {}
contract = Contract()
contract.symbol = 'AMD'
contract.secType = 'STK'
contract.currency = 'USD'
contract.exchange = "SMART"
client.addContracts(contract)
client.connect('127.0.0.1', 7496, 0)
client.run()

エラー メッセージが表示されず、データフレームの self.company_info に AMD の正しいデータが入力されます。

一般的な質問:

  1. reqFundamentalData() を介して、会社情報「ReportSnapshot」だけでなく、財務諸表「ReportsFinStatements」(df_fin_stmts および関数「FinStmtsXMLtoDF」) も 1 回の要求/実行で取得できますか?

  2. 私はpythonを初めて使用し、関数がコード内で呼び出された場合にのみ関数が実行されることを期待しますが、TWS API(ソケット、reqID)を使用すると、動作が異なるように見え、どの関数がいつ呼び出されるかが完全にはわかりません次々と。たとえば、reqFundamentalData() を実行することによって、基本データ () 関数が呼び出されることをどのように知ることができますか。または、たとえば、 nextValidID() が何らかの形でトリガーされますが、プログラム内で明示的に呼び出されません。どの関数がどの順序で呼び出されるかのプロセスを紹介する良いチュートリアルはありますか?

どうもありがとうございました

4

0 に答える 0