forked from RudolfTheOne/ThetaTracker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_fetch.py
115 lines (95 loc) · 5.49 KB
/
data_fetch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import logging
import requests
import sys
import math
from dateutil.parser import parse
from datetime import datetime, timedelta
def make_api_request(api_key, endpoint):
"""Make an API request to the given endpoint."""
try:
response = requests.get(endpoint)
except requests.exceptions.RequestException as e:
print(f"Error: Unable to connect to the API. {e}")
return None
# Check if the request was successful
if response.status_code != 200:
print(f"Error: Unable to fetch data. Status code: {response.status_code}")
sys.exit(1)
return response.json()
def is_market_open(api_key):
"""Check if the market is open."""
endpoint = f"https://api.tdameritrade.com/v1/marketdata/OPTION/hours?apikey={api_key}"
data = make_api_request(api_key, endpoint)
# Check if the response contains the expected data
if data and "option" in data:
# Iterate over each product in the option market
for product in data["option"].values():
if "isOpen" in product and product["isOpen"]:
for session in product["sessionHours"]["regularMarket"]:
start_time = parse(session["start"])
end_time = parse(session["end"])
current_time = datetime.now(start_time.tzinfo) # ensures the current time is timezone aware
if start_time <= current_time <= end_time:
# Market is open for this product
return True
return False
print("Error: The API response did not contain the expected data.")
sys.exit(1)
def filter_and_sort_options(data, max_delta, buying_power, sorting_method):
"""Filter options based on the delta range and calculate the ARR for each option."""
options = []
put_exp_date_map = data.get("putExpDateMap", {})
for date in put_exp_date_map:
for strike_price in put_exp_date_map[date]:
for option in put_exp_date_map[date][strike_price]:
option['underlyingPrice'] = data['underlyingPrice']
delta = abs(float(option["delta"]))
if 0 <= delta <= max_delta:
option["delta"] = abs(float(option["delta"]))
option["no_of_contracts_to_write"] = math.floor(buying_power / (float(option["strikePrice"]) * 100))
if option["no_of_contracts_to_write"] < 1:
option["message"] = "Not enough buying power"
option["premium_usd"] = round(option["no_of_contracts_to_write"] * float(option["bid"]) * 100, 2)
option["premium_per_day"] = round(option["premium_usd"] / option["daysToExpiration"] \
if option["daysToExpiration"] != 0 else \
option["premium_usd"], 2)
option["arr"] = round(option["premium_usd"] / buying_power * 365 / int(option["daysToExpiration"]) * 100, 3)
options.append(option)
options = sorted(options, key=lambda option: option.get(sorting_method, -1), reverse=True)[:5]
return options
def fetch_option_chain(api_key, tickers, contract_type, from_date, to_date, max_delta, buying_power, sorting_method,
finnhub_api_key):
all_options = []
for ticker, line_number in tickers: # unpack ticker and line number
endpoint = f"https://api.tdameritrade.com/v1/marketdata/chains?apikey={api_key}&symbol={ticker}&contractType={contract_type}&fromDate={from_date.strftime('%Y-%m-%d')}&toDate={to_date.strftime('%Y-%m-%d')}"
data = make_api_request(api_key, endpoint)
# Check if the 'putExpDateMap' key exists in the data
if not data or "putExpDateMap" not in data:
print(f"Error: Unable to fetch options for {ticker}.")
continue
options = filter_and_sort_options(data, float(max_delta), float(buying_power), sorting_method)
# Add ticker value and line number to each option dictionary
for option in options:
option["ticker"] = ticker
option["line_number"] = line_number
expiration_date = datetime.now() + timedelta(days=option["daysToExpiration"])
expiration_date_str = expiration_date.strftime('%Y-%m-%d')
finnhub_endpoint = f"https://finnhub.io/api/v1/calendar/earnings?from={from_date.strftime('%Y-%m-%d')}&to={expiration_date_str}&symbol={ticker}&token={finnhub_api_key}"
response = requests.get(finnhub_endpoint)
if response.status_code == 200 and response.text:
earnings_data = response.json()
if earnings_data and "earningsCalendar" in earnings_data and earnings_data["earningsCalendar"]:
option["has_earnings"] = True
else:
option["has_earnings"] = False
else:
logging.error(f"Error: Unable to fetch earnings data for {ticker}. HTTP status code: {response.status_code}")
option["has_earnings"] = False # Default value if unable to fetch earnings data
all_options.extend(options)
# Sort all options regardless of their ticker
if sorting_method == "message":
all_options.sort(key=lambda option: option.get(sorting_method, ""), reverse=True)
else:
all_options.sort(key=lambda option: float(option.get(sorting_method, "Key not present")), reverse=True)
# logging.debug(f'All options sorted by {sorting_method}')
return all_options