diff --git a/Pipfile b/Pipfile
index 3e0f20f7..11418b9b 100644
--- a/Pipfile
+++ b/Pipfile
@@ -10,7 +10,7 @@ pyinstaller = "*"
requests = "==2.24.0"
click = "*"
selenium = "*"
-chromedriver-py = "==89.0.4389.23"
+chromedriver-py = "==88.0.4324.96"
furl = "*"
twilio = "*"
discord-webhook = "*"
@@ -23,7 +23,7 @@ prompt_toolkit = "*"
aiohttp = "*"
pyobjc = { version = "*", sys_platform = "== 'darwin'" }
async-timeout = "*"
-amazoncaptcha = "==0.4.4"
+amazoncaptcha = "*"
browser-cookie3 = "*"
coloredlogs = "*"
apprise = "*"
@@ -35,6 +35,7 @@ stdiomask = "*"
packaging = "*"
config = "*"
lxml = "*"
+dnspython = "*"
[requires]
python_version = "3.8"
diff --git a/_bbuy-fe.bat b/_bbuy-fe.bat
deleted file mode 100644
index f3619fc2..00000000
--- a/_bbuy-fe.bat
+++ /dev/null
@@ -1 +0,0 @@
-pipenv run python app.py bestbuy --sku 6429440
diff --git a/_bbuy-gbyte.bat b/_bbuy-gbyte.bat
deleted file mode 100644
index 533ea9c1..00000000
--- a/_bbuy-gbyte.bat
+++ /dev/null
@@ -1 +0,0 @@
-pipenv run python app.py bestbuy --sku 6430621
\ No newline at end of file
diff --git a/_bbuy-msi.bat b/_bbuy-msi.bat
deleted file mode 100644
index 87b64f34..00000000
--- a/_bbuy-msi.bat
+++ /dev/null
@@ -1 +0,0 @@
-pipenv run python app.py bestbuy --sku 6432399
\ No newline at end of file
diff --git a/_nvidiadotcom.bat b/_nvidiadotcom.bat
deleted file mode 100644
index 942493a5..00000000
--- a/_nvidiadotcom.bat
+++ /dev/null
@@ -1,3 +0,0 @@
-pipenv install
-
-pipenv run python app.py nvidia
diff --git a/app.py b/app.py
index 8dc8ea53..8788e9f5 100644
--- a/app.py
+++ b/app.py
@@ -48,8 +48,23 @@ def sha256sum(filename):
exit(0)
-from cli import cli
+def notfound_message(exception):
+ print(exception)
+ print(
+ f"Missing '{exception.name}' module. If you ran 'pipenv install', try 'pipenv install {exception.name}'"
+ )
+ print("Exiting...")
+try:
+ from cli import cli
+except ModuleNotFoundError as e:
+ notfound_message(e)
+ exit(0)
+
if __name__ == "__main__":
- cli.main()
+ try:
+ cli.main()
+ except ModuleNotFoundError as e:
+ notfound_message(e)
+ exit(0)
diff --git a/cli/cli.py b/cli/cli.py
index 8c0e1818..39b764e8 100644
--- a/cli/cli.py
+++ b/cli/cli.py
@@ -18,33 +18,26 @@
# https://github.com/Hari-Nagarajan/fairgame
import os
+import platform
import shutil
+import time
from datetime import datetime
from functools import wraps
from pathlib import Path
-from signal import signal, SIGINT
-
-LICENSE_PATH = os.path.join(
- "cli",
- "license",
-)
-
-
-try:
- import click
-except ModuleNotFoundError as e:
- print(e)
- print("Install the missing module noted above.")
- exit(0)
-import time
+from signal import SIGINT, signal
+import click
+from common.globalconfig import AMAZON_CREDENTIAL_FILE, GlobalConfig
from notifications.notifications import NotificationHandler, TIME_FORMAT
+from stores.amazon import Amazon
from utils.logger import log
-from common.globalconfig import GlobalConfig, AMAZON_CREDENTIAL_FILE
from utils.version import is_latest, version
-from stores.amazon import Amazon
-from stores.bestbuy import BestBuyHandler
+
+LICENSE_PATH = os.path.join(
+ "cli",
+ "license",
+)
def get_folder_size(folder):
@@ -59,8 +52,9 @@ def sizeof_fmt(num, suffix="B"):
return "%.1f%s%s" % (num, "Yi", suffix)
-def handler(signal, frame):
- log.info("Caught the stop, exiting.")
+# see https://docs.python.org/3/library/signal.html
+def interrupt_handler(signal_num, frame):
+ log.info(f"Caught the interrupt signal. Exiting.")
exit(0)
@@ -71,7 +65,7 @@ def decorator(*args, **kwargs):
func(*args, **kwargs)
except KeyboardInterrupt:
pass
- except:
+ else:
notification_handler.send_notification(f"FairGame has crashed.")
raise
@@ -80,7 +74,6 @@ def decorator(*args, **kwargs):
@click.group()
def main():
-
pass
@@ -196,6 +189,12 @@ def main():
default=False,
help="Directly hit the offers page. Preferred, but deprecated by Amazon.",
)
+@click.option(
+ "--captcha-wait",
+ is_flag=True,
+ default=False,
+ help="Wait if captcha could not be solved. Only occurs if enters captcha handler during checkout.",
+)
@notify_on_crash
def amazon(
no_image,
@@ -216,6 +215,7 @@ def amazon(
clean_profile,
clean_credentials,
alt_offers,
+ captcha_wait,
):
notification_handler.sound_enabled = not disable_sound
if not notification_handler.sound_enabled:
@@ -248,6 +248,7 @@ def amazon(
log_stock_check=log_stock_check,
shipping_bypass=shipping_bypass,
alt_offers=alt_offers,
+ wait_on_captcha_fail=captcha_wait,
)
try:
amzn_obj.run(delay=delay, test=test)
@@ -257,17 +258,6 @@ def amazon(
time.sleep(5)
-@click.command()
-@click.option("--sku", type=str, required=True)
-@click.option("--headless", is_flag=True)
-@notify_on_crash
-def bestbuy(sku, headless):
- bb = BestBuyHandler(
- sku, notification_handler=notification_handler, headless=headless
- )
- bb.run_item()
-
-
@click.option(
"--disable-sound",
is_flag=True,
@@ -302,6 +292,7 @@ def test_notifications(disable_sound):
@click.option("--w", is_flag=True)
@click.option("--c", is_flag=True)
def show(w, c):
+ show_file = "show_c.txt"
if w and c:
print("Choose one option. Program Quitting")
exit(0)
@@ -328,12 +319,101 @@ def show(w, c):
exit(0)
-signal(SIGINT, handler)
+@click.command()
+@click.option(
+ "--domain",
+ help="Specify the domain you want to find endpoints for (e.g. www.amazon.de, www.amazon.com, smile.amazon.com.",
+)
+def find_endpoints(domain):
+ import dns.resolver
+
+ if not domain:
+ log.error("You must specify a domain to resolve for endpoints with --domain.")
+ exit(0)
+ log.info(f"Attempting to resolve '{domain}'")
+ # Default
+ my_resolver = dns.resolver.Resolver()
+ try:
+ resolved = my_resolver.resolve(domain)
+ for rdata in resolved:
+ log.info(f"Your computer resolves {domain} to {rdata.address}")
+ except Exception as e:
+ log.error(f"Failed to use local resolver due to: {e}")
+ exit(1)
+
+ # Find endpoints from various DNS servers
+ endpoints, resolutions = resolve_domain(domain)
+ log.info(
+ f"{domain} resolves to at least {len(endpoints)} distinct IP addresses across {resolutions} lookups:"
+ )
+ endpoints = sorted(endpoints)
+ for endpoint in endpoints:
+ log.info(f" {endpoint}")
+
+ return endpoints
+
+
+def resolve_domain(domain):
+ import dns.resolver
+
+ public_dns_servers = global_config.get_fairgame_config().get("public_dns_servers")
+ resolutions = 0
+ endpoints = set()
+
+ # Resolve the domain for each DNS server to find out how many end points we have
+ for provider in public_dns_servers:
+ # Provider is Google, Verisign, etc.
+ log.info(f"Testing {provider}")
+ for server in public_dns_servers[provider]:
+ # Server is 8.8.8.8 or 1.1.1.1
+ my_resolver = dns.resolver.Resolver()
+ my_resolver.nameservers = [server]
+
+ try:
+ resolved = my_resolver.resolve(domain)
+ except Exception as e:
+ log.warning(
+ f"Unable to resolve using {provider} server {server} due to: {e}"
+ )
+ continue
+ for rdata in resolved:
+ ipv4_address = rdata.address
+ endpoints.add(ipv4_address)
+ resolutions += 1
+ log.debug(f"{domain} resolves to {ipv4_address} via {server}")
+ return endpoints, resolutions
+
+
+@click.command()
+@click.option(
+ "--domain", help="Specify the domain you want to generate traceroute commands for."
+)
+def show_traceroutes(domain):
+ if not domain:
+ log.error("You must specify a domain to test routes using --domain.")
+ exit(0)
+
+ # Get the endpoints to test
+ endpoints, resolutions = resolve_domain(domain=domain)
+
+ if platform.system() == "Windows":
+ trace_command = "tracert -d "
+ else:
+ trace_command = "traceroute -n "
+
+ # Spitball test routes via Python's traceroute
+ for endpoint in endpoints:
+ log.info(f" {trace_command}{endpoint}")
+
+
+# Register Signal Handler for Interrupt
+signal(SIGINT, interrupt_handler)
main.add_command(amazon)
-main.add_command(bestbuy)
main.add_command(test_notifications)
main.add_command(show)
+main.add_command(find_endpoints)
+main.add_command(show_traceroutes)
# Global scope stuff here
if is_latest():
diff --git a/common/globalconfig.py b/common/globalconfig.py
index 5b021988..addc3d4f 100644
--- a/common/globalconfig.py
+++ b/common/globalconfig.py
@@ -69,6 +69,9 @@ def get_amazon_config(self, encryption_pass=None):
)
return amazon_config
+ def get_fairgame_config(self):
+ return self.fairgame_config
+
def get_browser_profile_path(self):
if not self.profile_path:
self.profile_path = os.path.join(
@@ -76,3 +79,8 @@ def get_browser_profile_path(self):
self.global_config["FAIRGAME"].get("profile_name", ".profile-amz"),
)
return self.profile_path
+
+ def get_property(self, property_name):
+ if property_name not in self.global_config.keys(): # we don't want KeyError
+ return None # just return None if not found
+ return self.global_config[property_name]
diff --git a/config/fairgame.conf b/config/fairgame.conf
index a42fb7d9..946fb238 100644
--- a/config/fairgame.conf
+++ b/config/fairgame.conf
@@ -1,6 +1,41 @@
{
"FAIRGAME": {
- "profile_name": ".profile-amz"
+ "profile_name": ".profile-amz",
+ "public_dns_servers": {
+ "Cloudflare": [
+ "1.1.1.1",
+ "1.0.0.1"
+ ],
+ "Comodo": [
+ "8.26.56.26",
+ "8.20.247.20"
+ ],
+ "Google": [
+ "8.8.8.8",
+ "8.8.4.4"
+ ],
+ "Level3": [
+ "4.2.2.1",
+ "4.2.2.3",
+ "4.2.2.5"
+ ],
+ "NTT": [
+ "129.250.35.250",
+ "129.250.35.251"
+ ],
+ "OpenDNS": [
+ "208.67.222.222",
+ "208.67.220.220"
+ ],
+ "Quad 9": [
+ "9.9.9.9",
+ "149.112.112.112"
+ ],
+ "Verisign": [
+ "64.6.64.6",
+ "64.6.65.6"
+ ]
+ }
},
"AMAZON": {
"SIGN_IN_TEXT": [
@@ -10,7 +45,8 @@
"Bonjour, Identifiez-vous",
"Ciao, Accedi",
"Hallo, Anmelden",
- "Hallo, Inloggen"
+ "Hallo, Inloggen",
+ "Witamy, Zaloguj się"
],
"SIGN_IN_TITLES": [
"Amazon Sign In",
@@ -19,11 +55,14 @@
"Iniciar sesión en Amazon",
"Connexion Amazon",
"Amazon Accedi",
- "Inloggen bij Amazon"
+ "Inloggen bij Amazon",
+ "Amazon Zaloguj się"
],
"CAPTCHA_PAGE_TITLES": [
"Robot Check",
- "Server Busy"
+ "Server Busy",
+ "Amazon.com",
+ "Amazon.ca"
],
"HOME_PAGE_TITLES": [
"Amazon.com: Online Shopping for Electronics, Apparel, Computers, Books, DVDs & more",
@@ -36,7 +75,10 @@
"Amazon.de: Günstige Preise für Elektronik & Foto, Filme, Musik, Bücher, Games, Spielzeug & mehr",
"Amazon.fr : livres, DVD, jeux vidéo, musique, high-tech, informatique, jouets, vêtements, chaussures, sport, bricolage, maison, beauté, puériculture, épicerie et plus encore !",
"Amazon.it: elettronica, libri, musica, fashion, videogiochi, DVD e tanto altro",
- "Amazon.nl: Groot aanbod, kleine prijzen in o.a. Elektronica, boeken, sport en meer"
+ "Amazon.nl: Groot aanbod, kleine prijzen in o.a. Elektronica, boeken, sport en meer",
+ "Your AmazonSmile",
+ "Amazon.com.tr Elektronik, bilgisayar, akıllı telefon, kitap, oyuncak, yapı market, ev, mutfak, oyun konsolları ürünleri ve daha fazlası için internet alışveriş sitesi",
+ "Amazon.pl: zakupy internetowe elektroniki, odzieży, komputerów, książek, płyt DVD i innych"
],
"SHOPPING_CART_TITLES": [
"Amazon.com Shopping Cart",
@@ -50,7 +92,9 @@
"Carrello Amazon.it",
"AmazonSmile Shopping Cart",
"AmazonSmile Shopping Basket",
- "Amazon.nl-winkelwagen"
+ "Amazon.nl-winkelwagen",
+ "Amazon.com.tr Alışveriş Sepeti",
+ "Amazon.pl: koszyk"
],
"CHECKOUT_TITLES": [
"Amazon.com Checkout",
@@ -74,7 +118,11 @@
"Preparing your order",
"Ihre Bestellung wird vorbereitet",
"Pagamento Amazon.it",
- "Ordine in preparazione"
+ "Ordine in preparazione",
+ "Amazon.sg Checkout",
+ "A preparar o seu pedido",
+ "Amazon.com.tr Alışverişi Tamamla",
+ "Kup teraz"
],
"ORDER_COMPLETE_TITLES": [
"Amazon.com Thanks You",
@@ -88,7 +136,10 @@
"Grazie da Amazon.it",
"Hartelijk dank",
"Thank You",
- "Amazon.de Vielen Dank"
+ "Amazon.de Vielen Dank",
+ "Obrigado, o seu pedido foi efetuado",
+ "Teşekkür Ederiz",
+ "Amazon.pl Dziękujemy"
],
"BUSINESS_PO_TITLES": [
"Business order information"
@@ -124,12 +175,43 @@
"FRI FRAKT",
"GRATIS-LIEFERUNG",
"PRIME FREE DELIVERY",
- "SPEDIZIONE SENZA COSTI AGGIUNTIVI CON PRIME"
+ "DELIVERY AT NO EXTRA COST FOR PRIME MEMBERS",
+ "GRATIS LIEFERUNG FÜR PRIME-MITGLIEDER",
+ "FREE DELIVERY:",
+ "SPEDIZIONE SENZA COSTI AGGIUNTIVI CON PRIME",
+ "DARMOWA DOSTAWA"
],
"ADDRESS_SELECT": [
"Select a delivery address",
"Ordine in preparazione",
"Select a shipping address"
- ]
+ ],
+ "XPATHS": {
+ "ADDRESS_SELECT": [
+ "//*[contains(@class,'ship-to-this-address a-button a-button-primary a-button-span12 a-spacing-medium')]",
+ "//input[@aria-labelledby='orderSummaryPrimaryActionBtn-announce']",
+ "//input[@data-testid='Address_selectShipToThisAddress']"
+ ],
+ "PRIME_NO_THANKS": [
+ "//*[contains(@class, 'no-thanks-button')]",
+ "//*[contains(@class, 'prime-nothanks-button')]",
+ "//*[contains(@class, 'prime-no-button')]"
+ ],
+ "CART": [
+ "//*[@id='nav-cart-count']"
+ ],
+ "CART_BUTTON": [
+ "//*[@id='nav-cart']"
+ ],
+ "PTC": [
+ "//*[@id='hlb-ptc-btn-native']",
+ "//input[@name='proceedToRetailCheckout']",
+ "//*[@id='hlb-ptc-btn']",
+ "//*[@id='sc-buy-box-ptc-button']"
+ ],
+ "ATC": [
+ "//div[@id='aod-pinned-offer' or @id='aod-offer' or @id='olpOfferList']//input[@name='submit.addToCart']"
+ ]
+ }
}
}
diff --git a/stores/amazon.py b/stores/amazon.py
index 29371d0c..c6383783 100644
--- a/stores/amazon.py
+++ b/stores/amazon.py
@@ -26,6 +26,7 @@
from contextlib import contextmanager
from datetime import datetime
from enum import Enum
+from typing import List
import psutil
from amazoncaptcha import AmazonCaptcha
@@ -88,7 +89,7 @@
DEFAULT_MAX_TIMEOUT = 10
DEFAULT_MAX_URL_FAIL = 5
-amazon_config = None
+amazon_config = {}
class Amazon:
@@ -108,6 +109,7 @@ def __init__(
log_stock_check=False,
shipping_bypass=False,
alt_offers=False,
+ wait_on_captcha_fail=False,
):
self.notification_handler = notification_handler
self.asin_list = []
@@ -137,6 +139,7 @@ def __init__(
self.shipping_bypass = shipping_bypass
self.unknown_title_notification_sent = False
self.alt_offers = alt_offers
+ self.wait_on_captcha_fail = wait_on_captcha_fail
presence.enabled = not disable_presence
@@ -242,11 +245,12 @@ def run(self, delay=DEFAULT_REFRESH_DELAY, test=False):
time.sleep(30)
return
- keep_going = True
+ continue_stock_check = True
log.info("Checking stock for items.")
- while keep_going:
+ while continue_stock_check:
+ self.unknown_title_notification_sent = False
asin = self.run_asins(delay)
# found something in stock and under reserve
# initialize loop limiter variables
@@ -281,7 +285,7 @@ def run(self, delay=DEFAULT_REFRESH_DELAY, test=False):
self.try_to_checkout = False
# if no items left it list, let loop end
if not self.asin_list:
- keep_going = False
+ continue_stock_check = False
runtime = time.time() - self.start_time
log.info(f"FairGame bot ran for {runtime} seconds.")
time.sleep(10) # add a delay to shut stuff done
@@ -459,7 +463,7 @@ def check_stock(self, asin, reserve_min, reserve_max, retry=0):
)
else:
# Force the flyout by default
- f = furl(self.ACTIVE_OFFER_URL + asin + "/#aod")
+ f = furl(self.ACTIVE_OFFER_URL + asin + "?aod=1")
fail_counter = 0
presence.searching_update()
@@ -509,15 +513,16 @@ def check_stock(self, asin, reserve_min, reserve_max, retry=0):
return False
timeout = self.get_timeout()
+ atc_buttons = None
while True:
# Sanity check to see if we have any offers
try:
# Wait for the page to load before determining what's in it by looking for the footer
- footer: WebElement = WebDriverWait(
+ footer: List[WebElement] = WebDriverWait(
self.driver, timeout=DEFAULT_MAX_TIMEOUT
).until(
lambda d: d.find_elements_by_xpath(
- "//div[@class='nav-footer-line'] | //img[@alt='Dogs of Amazon']"
+ "//div[@class='nav-footer-line'] | //div[@id='navFooter'] | //img[@alt='Dogs of Amazon']"
)
)
if footer and footer[0].tag_name == "img":
@@ -556,9 +561,15 @@ def check_stock(self, asin, reserve_min, reserve_max, retry=0):
elif offers.get_attribute("data-action") == "show-all-offers-display":
# PDP Page
# Find the offers link first, just to burn some cycles in case the flyout is loading
- open_offers_link: WebElement = self.driver.find_element_by_xpath(
- "//span[@data-action='show-all-offers-display']//a"
- )
+ open_offers_link = None
+ try:
+ open_offers_link: WebElement = (
+ self.driver.find_element_by_xpath(
+ "//span[@data-action='show-all-offers-display']//a"
+ )
+ )
+ except sel_exceptions.NoSuchElementException:
+ pass
# Now check to see if we're already loading the flyout...
flyout = self.driver.find_elements_by_xpath(
@@ -578,23 +589,38 @@ def check_stock(self, asin, reserve_min, reserve_max, retry=0):
)
continue
- log.debug("Attempting to click the open offers link...")
- open_offers_link.click()
- try:
- # Now wait for the flyout to load
- log.debug("Waiting for flyout...")
- WebDriverWait(self.driver, timeout=DEFAULT_MAX_TIMEOUT).until(
- lambda d: d.find_element_by_xpath(
- "//div[@id='aod-container'] | //div[@id='olpOfferList']"
+ if open_offers_link:
+ log.debug("Attempting to click the open offers link...")
+ try:
+ open_offers_link.click()
+ except sel_exceptions.WebDriverException as e:
+ log.error("Problem clicking open offers link")
+ log.error(
+ "May have issue with rest of this ASIN check cycle"
)
- )
- log.debug("Flyout should be open and populated.")
- except sel_exceptions.TimeoutException as te:
- log.error(
- "Timed out waiting for the flyout to open and populate. Is the "
- "connection slow? Do you see the flyout populate?"
- )
- continue
+ log.debug(e)
+ filename = "open-offers-link-error"
+ self.save_screenshot(filename)
+ self.save_page_source(filename)
+ try:
+ # Now wait for the flyout to load
+ log.debug("Waiting for flyout...")
+ WebDriverWait(
+ self.driver, timeout=DEFAULT_MAX_TIMEOUT
+ ).until(
+ lambda d: d.find_element_by_xpath(
+ "//div[@id='aod-container'] | //div[@id='olpOfferList']"
+ )
+ )
+ log.debug("Flyout should be open and populated.")
+ except sel_exceptions.TimeoutException as te:
+ log.error(
+ "Timed out waiting for the flyout to open and populate. Is the "
+ "connection slow? Do you see the flyout populate?"
+ )
+ continue
+ else:
+ log.error("Could not open offers link")
elif (
offers.get_attribute("aria-labelledby")
== "submit.add-to-cart-announce"
@@ -642,9 +668,7 @@ def check_stock(self, asin, reserve_min, reserve_max, retry=0):
)
continue
- atc_buttons: WebElement = self.driver.find_elements_by_xpath(
- "//div[@id='aod-pinned-offer' or @id='aod-offer' or @id='olpOfferList']//input[@name='submit.addToCart']"
- )
+ atc_buttons = self.get_amazon_elements(key="ATC")
# if not atc_buttons:
# # Sanity check to see if we have a valid page, but no offers:
# offer_count = WebDriverWait(self.driver, timeout=25).until(
@@ -698,61 +722,68 @@ def check_stock(self, asin, reserve_min, reserve_max, retry=0):
return False
shipping = []
shipping_prices = []
- if self.checkshipping:
- timeout = self.get_timeout()
- while True:
- if not flyout_mode:
- shipping = self.driver.find_elements_by_xpath(
- '//*[@class="a-color-secondary"]'
- )
- if shipping:
- # Convert to prices just in case
- for idx, shipping_node in enumerate(shipping):
- log.debug(f"Processing shipping node {idx}")
- if self.checkshipping:
- if amazon_config["SHIPPING_ONLY_IF"] in shipping_node.text:
- shipping_prices.append(parse_price("0"))
- else:
- shipping_prices.append(parse_price(shipping_node.text))
- else:
+
+ timeout = self.get_timeout()
+ while True:
+ if not flyout_mode:
+ shipping = self.driver.find_elements_by_xpath(
+ '//*[@class="a-color-secondary"]'
+ )
+ if shipping:
+ # Convert to prices just in case
+ for idx, shipping_node in enumerate(shipping):
+ log.debug(f"Processing shipping node {idx}")
+ if self.checkshipping:
+ if amazon_config["SHIPPING_ONLY_IF"] in shipping_node.text:
shipping_prices.append(parse_price("0"))
- else:
- # Check for offers
- offers = self.driver.find_elements_by_xpath(
- "//div[@id='aod-pinned-offer' or @id='aod-offer']"
+ else:
+ shipping_prices.append(parse_price(shipping_node.text))
+ else:
+ shipping_prices.append(parse_price("0"))
+ else:
+ # Check for offers
+ # offer_xpath = "//div[@id='aod-pinned-offer' or @id='aod-offer']"
+ offer_xpath = (
+ "//div[@id='aod-offer' and .//input[@name='submit.addToCart']] | "
+ "//div[@id='aod-pinned-offer' and .//input[@name='submit.addToCart']]"
+ )
+ offers = self.driver.find_elements_by_xpath(offer_xpath)
+ for idx, offer in enumerate(offers):
+ tree = html.fromstring(offer.get_attribute("innerHTML"))
+ shipping_prices.append(
+ get_shipping_costs(tree, amazon_config["FREE_SHIPPING"])
)
- for idx, offer in enumerate(offers):
- tree = html.fromstring(offer.get_attribute("innerHTML"))
- shipping_prices.append(
- get_shipping_costs(tree, amazon_config["FREE_SHIPPING"])
- )
- if shipping_prices:
- break
+ if shipping_prices:
+ break
- if time.time() > timeout:
- log.info(f"failed to load shipping for {asin}, going to next ASIN")
- return False
+ if time.time() > timeout:
+ log.info(f"failed to load shipping for {asin}, going to next ASIN")
+ return False
in_stock = False
for shipping_price in shipping_prices:
log.debug(f"\tShipping Price: {shipping_price}")
for idx, atc_button in enumerate(atc_buttons):
+ # If the user has specified that they only want free items, we can skip any items
+ # that have any shipping cost and early out
+ if not self.checkshipping and shipping_prices[idx].amount_float > 0.00:
+ continue
+
# Condition check first, using the button to find the form that will divulge the item's condition
if flyout_mode:
- condition: WebElement = atc_button.find_elements_by_xpath(
+ condition: List[WebElement] = atc_button.find_elements_by_xpath(
"./ancestor::form[@method='post']"
)
-
if condition:
atc_form_action = condition[0].get_attribute("action")
- item_condition = get_item_condition(atc_form_action)
+ seller_item_condition = get_item_condition(atc_form_action)
# Lower condition value imply newer
- if item_condition.value > self.condition.value:
+ if seller_item_condition.value > self.condition.value:
# Item is below our standards, so skip it
log.debug(
f"Skipping item because its condition is below the requested level: "
- f"{item_condition} is below {self.condition}"
+ f"{seller_item_condition} is below {self.condition}"
)
continue
@@ -764,19 +795,13 @@ def check_stock(self, asin, reserve_min, reserve_max, retry=0):
except IndexError:
log.debug("Price index error")
return False
- try:
- if self.checkshipping:
- ship_price = shipping_prices[idx]
- else:
- ship_price = parse_price("0")
- except IndexError:
- log.debug("shipping index error")
- return False
+ # Include the price, even if it's zero for comparison
+ ship_price = shipping_prices[idx]
ship_float = ship_price.amount
price_float = price.amount
if price_float is None:
return False
- if ship_float is None or not self.checkshipping:
+ if ship_float is None:
ship_float = 0
if (
@@ -787,6 +812,10 @@ def check_stock(self, asin, reserve_min, reserve_max, retry=0):
or math.isclose((price_float + ship_float), reserve_min, abs_tol=0.01)
):
log.info("Item in stock and in reserve range!")
+ log.info(f"{price_float} + {ship_float} shipping <= {reserve_max}")
+ log.debug(
+ f"{reserve_min} <= {price_float} + {ship_float} shipping <= {reserve_max}"
+ )
log.info("Adding to cart")
# Get the offering ID
offering_id_elements = atc_button.find_elements_by_xpath(
@@ -863,22 +892,32 @@ def attempt_atc(self, offering_id, max_atc_retries=DEFAULT_MAX_ATC_TRIES):
atc_attempts = 0
while atc_attempts < max_atc_retries:
with self.wait_for_page_content_change(timeout=5):
- self.driver.get(f)
- xpath = "//input[@value='add' and @name='add']"
- if wait_for_element_by_xpath(self.driver, xpath):
- try:
- with self.wait_for_page_content_change(timeout=10):
- self.driver.find_element_by_xpath(xpath).click()
- except sel_exceptions.NoSuchElementException:
- log.error("Continue button not present on page")
- else:
- log.error("Continue button not present on page")
+ try:
+ self.driver.get(f)
+ except sel_exceptions.TimeoutException:
+ log.error("Failed to get page")
+ atc_attempts += 1
+ continue
+ xpath = "//input[@value='add' and @name='add']"
+ continue_btn = None
+ if wait_for_element_by_xpath(self.driver, xpath):
+ try:
+ continue_btn = WebDriverWait(self.driver, timeout=5).until(
+ EC.element_to_be_clickable((By.XPATH, xpath))
+ )
+ except sel_exceptions.TimeoutException:
+ log.error("No continue button found")
+ if continue_btn:
+ if self.do_button_click(
+ button=continue_btn, fail_text="Could not click continue button"
+ ):
+ if self.get_cart_count() != 0:
+ return True
+ else:
+ log.info("Nothing added to cart, trying again")
- # verify cart is non-zero
- if self.get_cart_count() != 0:
- return True
- else:
- atc_attempts = atc_attempts + 1
+ atc_attempts = atc_attempts + 1
+ log.error("reached maximum ATC attempts, returning to stock check")
return False
# search lists of asin lists, and remove the first list that matches provided asin
@@ -894,11 +933,8 @@ def remove_asin_list(self, asin):
# checkout page navigator
@debug
def navigate_pages(self, test):
- # delay to wait for page load
- # time.sleep(self.page_wait_delay())
-
title = self.driver.title
- log.info(f"Navigating page title: '{title}'")
+ log.debug(f"Navigating page title: '{title}'")
# see if this resolves blank page title issue?
if title == "":
timeout_seconds = DEFAULT_MAX_TIMEOUT
@@ -936,23 +972,15 @@ def navigate_pages(self, test):
elif title in amazon_config["BUSINESS_PO_TITLES"]:
self.handle_business_po()
elif title in amazon_config["ADDRESS_SELECT"]:
- if not self.unknown_title_notification_sent:
- self.notification_handler.play_alarm_sound()
- self.send_notification(
- "User interaction required for checkout!",
- title,
- self.take_screenshots,
+ if self.shipping_bypass:
+ self.handle_shipping_page()
+ else:
+ log.warning(
+ "Landed on address selection screen. Fairgame will NOT select an address for you. "
+ "Please select necessary options to arrive at the Review Order Page before the next "
+ "refresh, or complete checkout manually. You have 30 seconds."
)
- self.unknown_title_notification_sent = True
- log.warning(
- "Landed on address selection screen. Fairgame will NOT select an address for you. "
- "Please select necessary options to arrive at the Review Order Page before the next "
- "refresh, or complete checkout manually. You have 30 seconds."
- )
- for i in range(30, 0, -1):
- log.warning(f"{i}...")
- time.sleep(1)
- return
+ self.handle_unknown_title(title)
else:
log.debug(f"title is: [{title}]")
# see if we can handle blank titles here
@@ -994,51 +1022,22 @@ def navigate_pages(self, test):
element = None
# Prime offer page?
try:
- element = self.driver.find_element_by_xpath(
- '//*[contains(@class, "no-thanks-button") or contains(@class, "prime-nothanks-button") or contains(@class, "prime-no-button")]'
- )
+ element = self.get_amazon_element(key="PRIME_NO_THANKS")
except sel_exceptions.NoSuchElementException:
pass
if element:
- try:
- log.info(
- "FairGame thinks it is seeing a Prime Offer, attempting to click No Thanks"
- )
- element.click()
- self.wait_for_page_change(page_title=title)
- # if we were able to click, return to program flow
+ if self.do_button_click(
+ button=element,
+ clicking_text="FairGame thinks it is seeing a Prime Offer, attempting to click No Thanks",
+ fail_text="FairGame could not click No Thanks button",
+ log_debug=True,
+ ):
return
- except sel_exceptions.ElementNotInteractableException:
- log.debug("FairGame could not click No Thanks button")
-
+ # see if a use this address (or similar) button is on page (based on known xpaths). Only check if
+ # user has set the shipping_bypass flag
if self.shipping_bypass:
- element = None
- try:
- element = self.driver.find_element_by_xpath(
- '//*[@class="ship-to-this-address a-button a-button-primary a-button-span12 a-spacing-medium "]'
- )
- except sel_exceptions.NoSuchElementException:
- pass
- if element:
- log.warning("FairGame thinks it needs to pick a shipping address.")
- log.warning(
- "It will click whichever ship to this address button it found."
- )
- log.warning(
- "If this works, VERIFY THE ADDRESS IT SHIPPED TO IMMEDIATELY!"
- )
- self.send_notification(
- message="Clicking ship to address, hopefully this works. VERIFY ASAP!",
- page_name="choose-shipping",
- take_screenshot=self.take_screenshots,
- )
- try:
- element.click()
- log.info("Clicked button.")
- self.wait_for_page_change(page_title=title)
- return
- except sel_exceptions.WebDriverException:
- log.error("Could not click ship to address button")
+ if self.handle_shipping_page():
+ return
if self.get_cart_count() == 0:
log.info("It appears you have nothing in your cart.")
@@ -1056,23 +1055,29 @@ def navigate_pages(self, test):
log.error(
f"'{title}' is not a known page title. Please create issue indicating the title with a screenshot of page"
)
- self.send_notification(
- f"Encountered Unknown Page Title: `{title}",
- "unknown-title",
- self.take_screenshots,
- )
- self.save_page_source("unknown-title")
- log.info("going to try and redirect to cart page")
+ # give user 30 seconds to respond
+ self.handle_unknown_title(title=title)
+ # check if page title changed, if not, then continue doing other checks:
+ if self.driver.title != title:
+ log.info(
+ "FairGame thinks user intervened in time, will now continue running"
+ )
+ return
+ else:
+ log.warning(
+ "FairGame does not think the user intervened in time, will attempt other methods to continue"
+ )
+ log.info("Going to try and redirect to cart page")
try:
- self.driver.get(AMAZON_URLS["CART_URL"])
+ with self.wait_for_page_content_change(timeout=10):
+ self.driver.get(AMAZON_URLS["CART_URL"])
except sel_exceptions.WebDriverException:
log.error(
"failed to load cart URL, refreshing and returning to handler"
)
- self.driver.refresh()
- time.sleep(3)
+ with self.wait_for_page_content_change(timeout=10):
+ self.driver.refresh()
return
- self.wait_for_page_change(page_title=title)
time.sleep(1) # wait a second for page to load
# verify cart quantity is not zero
# note, not using greater than 0, in case there is an error,
@@ -1085,41 +1090,94 @@ def navigate_pages(self, test):
log.info("trying to click proceed to checkout")
timeout = self.get_timeout()
- button = []
while True:
try:
- button = self.driver.find_element_by_xpath(
- '//*[@id="sc-buy-box-ptc-button"]'
- )
+ button = self.get_amazon_element(key="PTC")
break
except sel_exceptions.NoSuchElementException:
- pass
+ button = None
if time.time() > timeout:
- log.error(
- "Could not find and click button, refreshing and returning to handler"
- )
- self.driver.refresh()
- time.sleep(3)
+ log.error("Could not find and click button")
break
if button:
- try:
- current_title = self.driver.title
- log.info("Found ptc button, attempting to click.")
- button.click()
- log.info("Clicked ptc button")
- self.wait_for_page_change(page_title=current_title)
- except sel_exceptions.WebDriverException:
- log.info(
- "Could not click button - refreshing and returning to checkout handler"
- )
- self.driver.refresh()
- time.sleep(3)
+ if self.do_button_click(
+ button=button,
+ clicking_text="Found ptc button, attempting to click.",
+ clicked_text="Clicked ptc button",
+ fail_text="Could not click button",
+ ):
+ return
+ else:
+ with self.wait_for_page_content_change():
+ self.driver.refresh()
+ return
+
+ # if we made it this far, all attempts to handle page failed, get current page info and return to handler
+ log.error(
+ "FairGame could not navigate current page, refreshing and returning to handler"
+ )
+ self.save_page_source(page="unknown")
+ self.save_screenshot(page="unknown")
+ with self.wait_for_page_content_change():
+ self.driver.refresh()
+ return
+
+ def handle_unknown_title(self, title):
+ if not self.unknown_title_notification_sent:
+ self.notification_handler.play_alarm_sound()
+ self.send_notification(
+ "User interaction required for checkout! You have 30 seconds!",
+ title,
+ self.take_screenshots,
+ )
+ self.unknown_title_notification_sent = True
+ for i in range(30, 0, -1):
+ log.warning(f"{i}...")
+ time.sleep(1)
+ return
+
+ # Method to try and click the handle shipping page
+ def handle_shipping_page(self):
+ element = None
+ try:
+ element = self.get_amazon_element(key="ADDRESS_SELECT")
+ except sel_exceptions.NoSuchElementException:
+ pass
+ if element:
+ log.warning("FairGame thinks it needs to pick a shipping address.")
+ log.warning("It will click whichever ship to this address button it found.")
+ log.warning("If this works, VERIFY THE ADDRESS IT SHIPPED TO IMMEDIATELY!")
+ self.send_notification(
+ message="Clicking ship to address, hopefully this works. VERIFY ASAP!",
+ page_name="choose-shipping",
+ take_screenshot=self.take_screenshots,
+ )
+ if self.do_button_click(
+ button=element, fail_text="Could not click ship to address button"
+ ):
+ return True
+
+ # if we make it this far, it failed to click button
+ log.error("FairGame cannot find a button to click on the shipping page")
+ self.save_screenshot(page="shipping-select-error")
+ self.save_page_source(page="shipping-select-error")
+ return False
+
+ def get_amazon_element(self, key):
+ return self.driver.find_element_by_xpath(
+ join_xpaths(amazon_config["XPATHS"][key])
+ )
+
+ def get_amazon_elements(self, key):
+ return self.driver.find_elements_by_xpath(
+ join_xpaths(amazon_config["XPATHS"][key])
+ )
# returns negative number if cart element does not exist, returns number if cart exists
def get_cart_count(self):
# check if cart number is on the page, if cart items = 0
try:
- element = self.driver.find_element_by_xpath('//*[@id="nav-cart-count"]')
+ element = self.get_amazon_element(key="CART")
except sel_exceptions.NoSuchElementException:
return -1
if element:
@@ -1138,10 +1196,7 @@ def handle_prime_signup(self):
) # just doing manual wait, sign up for prime if you don't want to deal with this
button = None
try:
- button = self.driver.find_element_by_xpath(
- # '//*[@class="a-button a-button-base no-thanks-button"]'
- '//*[contains(@class, "no-thanks-button") or contains(@class, "prime-nothanks-button") or contains(@class, "prime-no-button")]'
- )
+ button = self.get_amazon_element(key="PRIME_NO_THANKS")
except sel_exceptions.NoSuchElementException:
log.error("could not find button")
log.info("sign up for Prime and this won't happen anymore")
@@ -1152,52 +1207,86 @@ def handle_prime_signup(self):
self.take_screenshots,
)
if button:
- current_page = self.driver.title
- button.click()
- self.wait_for_page_change(current_page)
- else:
- log.error("Prime offer page popped up, user intervention required")
- self.notification_handler.play_alarm_sound()
- self.notification_handler.send_notification(
- "Prime offer page popped up, user intervention required"
- )
- timeout = self.get_timeout(timeout=60)
- while self.driver.title in amazon_config["PRIME_TITLES"]:
- if time.time() > timeout:
- log.info(
- "user did not intervene in time, will try and refresh page"
- )
+ if self.do_button_click(
+ button=button,
+ clicking_text="Attempting to click No Thanks button on Prime Signup Page",
+ fail_text="Failed to click No Thanks button on Prime Signup Page",
+ ):
+ return
+
+ # If we get to this point, there was either no button, or we couldn't click it (exception hit above)
+ log.error("Prime offer page popped up, user intervention required")
+ self.notification_handler.play_alarm_sound()
+ self.notification_handler.send_notification(
+ "Prime offer page popped up, user intervention required"
+ )
+ timeout = self.get_timeout(timeout=60)
+ while self.driver.title in amazon_config["PRIME_TITLES"]:
+ if time.time() > timeout:
+ log.info("user did not intervene in time, will try and refresh page")
+ with self.wait_for_page_content_change():
self.driver.refresh()
- time.sleep(DEFAULT_MAX_WEIRD_PAGE_DELAY)
- break
+ break
+ time.sleep(0.5)
+
+ def do_button_click(
+ self,
+ button,
+ clicking_text="Clicking button",
+ clicked_text="Button clicked",
+ fail_text="Could not click button",
+ log_debug=False,
+ ):
+ try:
+ with self.wait_for_page_content_change():
+ log.info(clicking_text)
+ button.click()
+ log.info(clicked_text)
+ return True
+ except sel_exceptions.WebDriverException as e:
+ if log_debug:
+ log.debug(fail_text)
+ log.debug(e)
+ else:
+ log.error(fail_text)
+ log.error(e)
+ return False
@debug
def handle_home_page(self):
log.info("On home page, trying to get back to checkout")
button = None
- try:
- button = self.driver.find_element_by_xpath('//*[@id="nav-cart"]')
- except sel_exceptions.NoSuchElementException:
- log.info("Could not find cart button")
+ tries = 0
+ maxTries = 10
+ while not button and tries < maxTries:
+ try:
+ button = self.get_amazon_element("CART_BUTTON")
+ except sel_exceptions.NoSuchElementException:
+ pass
+ tries += 1
+ time.sleep(0.5)
current_page = self.driver.title
if button:
- button.click()
- self.wait_for_page_change(current_page)
+ if self.do_button_click(button=button):
+ return
+ else:
+ log.info("Failed to click on cart button")
else:
- self.send_notification(
- "Could not click cart button, user intervention required",
- "home-page-error",
- self.take_screenshots,
- )
- timeout = self.get_timeout(timeout=300)
- while self.driver.title == current_page:
- time.sleep(0.25)
- if time.time() > timeout:
- log.info(
- "user failed to intervene in time, returning to stock check"
- )
- self.try_to_checkout = False
- break
+ log.info("Could not find cart button after " + str(maxTries) + " tries")
+
+ # no button found or could not interact with the button
+ self.send_notification(
+ "Could not click cart button, user intervention required",
+ "home-page-error",
+ self.take_screenshots,
+ )
+ timeout = self.get_timeout(timeout=300)
+ while self.driver.title == current_page:
+ time.sleep(0.25)
+ if time.time() > timeout:
+ log.info("user failed to intervene in time, returning to stock check")
+ self.try_to_checkout = False
+ break
@debug
def handle_cart(self):
@@ -1211,16 +1300,20 @@ def handle_cart(self):
button = None
while True:
try:
- button = self.driver.find_element_by_xpath(
- '//*[@id="hlb-ptc-btn-native"] | //input[@name="proceedToRetailCheckout"]'
- )
+ button = self.get_amazon_element(key="PTC")
break
except sel_exceptions.NoSuchElementException:
- try:
- button = self.driver.find_element_by_xpath('//*[@id="hlb-ptc-btn"]')
- break
- except sel_exceptions.NoSuchElementException:
- pass
+ if self.shipping_bypass:
+ try:
+ button = self.get_amazon_element(key="ADDRESS_SELECT")
+ break
+ except sel_exceptions.NoSuchElementException:
+ pass
+ if self.get_cart_count() == 0:
+ log.info("You have no items in cart. Going back to stock check.")
+ self.try_to_checkout = False
+ break
+
if time.time() > timeout:
log.info("couldn't find buttons to proceed to checkout")
self.save_page_source("ptc-error")
@@ -1229,20 +1322,20 @@ def handle_cart(self):
"ptc-error",
self.take_screenshots,
)
- if self.get_cart_count() == 0:
- log.info("It appears this is because you have no items in cart.")
- log.info(
- "It is likely that the product went out of stock before you could checkout"
- )
- log.info("Going back to stock check.")
- self.try_to_checkout = False
- else:
- log.info("Refreshing page to try again")
+ # if self.get_cart_count() == 0:
+ # log.info("It appears this is because you have no items in cart.")
+ # log.info(
+ # "It is likely that the product went out of stock before you could checkout"
+ # )
+ # log.info("Going back to stock check.")
+ # self.try_to_checkout = False
+ # else:
+ log.info("Refreshing page to try again")
+ with self.wait_for_page_content_change():
self.driver.refresh()
- self.checkout_retry += 1
+ self.checkout_retry += 1
return
- current_page = self.driver.title
if button:
log.info("Found Checkout Button")
if self.detailed:
@@ -1251,15 +1344,13 @@ def handle_cart(self):
page_name="ptc",
take_screenshot=self.take_screenshots,
)
- try:
- button.click()
- log.info("Clicked Proceed to Checkout Button")
- self.wait_for_page_change(page_title=current_page, timeout=7)
- except sel_exceptions.WebDriverException:
+ if self.do_button_click(button=button):
+ return
+ else:
log.error("Problem clicking Proceed to Checkout button.")
log.info("Refreshing page to try again")
- self.driver.refresh()
- self.wait_for_page_change(page_title=current_page)
+ with self.wait_for_page_content_change():
+ self.driver.refresh()
self.checkout_retry += 1
@debug
@@ -1271,17 +1362,21 @@ def handle_checkout(self, test):
try:
button = self.driver.find_element_by_xpath(self.button_xpaths[0])
except sel_exceptions.NoSuchElementException:
- pass
- self.button_xpaths.append(self.button_xpaths.pop(0))
+ if self.shipping_bypass:
+ try:
+ button = self.get_amazon_element(key="ADDRESS_SELECT")
+ except sel_exceptions.NoSuchElementException:
+ pass
+ self.button_xpaths.append(self.button_xpaths.pop(0))
if button:
if button.is_enabled() and button.is_displayed():
break
if time.time() > timeout:
- log.error("couldn't find buttons to proceed to checkout")
- self.save_page_source("ptc-error")
+ log.error("couldn't find button to place order")
+ self.save_page_source("pyo-error")
self.send_notification(
- "Error in checkout. Please check browser window.",
- "ptc-error",
+ "Error in placing order. Please check browser window.",
+ "pyo-error",
self.take_screenshots,
)
log.info("Refreshing page to try again")
@@ -1299,13 +1394,13 @@ def handle_checkout(self, test):
self.asin_list = []
else:
log.info(f"Clicking Button {button.text} to place order")
- button.click()
- self.wait_for_page_change(page_title=previous_title)
+ self.do_button_click(button=button)
@debug
def handle_order_complete(self):
log.info("Order Placed.")
self.send_notification("Order placed.", "order-placed", self.take_screenshots)
+ self.notification_handler.play_purchase_sound()
self.great_success = True
if self.single_shot:
self.asin_list = []
@@ -1345,26 +1440,66 @@ def handle_captcha(self, check_presence=True):
log.info(
f"Failed to solve {captcha.image_link}, lets reload and get a new captcha."
)
- self.driver.refresh()
- time.sleep(3)
+ if self.wait_on_captcha_fail:
+ log.info(
+ "Will wait up to 60 seconds for user to solve captcha"
+ )
+ self.send(
+ "User Intervention Required - captcha check",
+ "captcha",
+ self.take_screenshots,
+ )
+ with self.wait_for_page_content_change():
+ timeout = self.get_timeout(timeout=60)
+ while (
+ time.time() < timeout
+ and self.driver.title == current_page
+ ):
+ time.sleep(0.5)
+ # check above is not true, then we must have passed captcha, return back to nav handler
+ # Otherwise refresh page to try again - either way, returning to nav page handler
+ if (
+ time.time() > timeout
+ and self.driver.title == current_page
+ ):
+ log.info(
+ "User intervention did not occur in time - will attempt to refresh page and try again"
+ )
+ self.driver.refresh()
+ return False
+ else:
+ return True
+ # Solved (!?)
else:
- self.send_notification(
- "Solving catpcha", "captcha", self.take_screenshots
- )
- self.driver.find_element_by_xpath(
- '//*[@id="captchacharacters"]'
- ).send_keys(solution + Keys.RETURN)
- self.wait_for_page_change(page_title=current_page)
+ # take screenshot if user asked for detailed
+ if self.detailed:
+ self.send_notification(
+ "Solving catpcha", "captcha", self.take_screenshots
+ )
+ try:
+ captcha_field = self.driver.find_element_by_xpath(
+ '//*[@id="captchacharacters"]'
+ )
+ except sel_exceptions.NoSuchElementException:
+ log.debug("Could not locate captcha")
+ captcha_field = None
+ if captcha_field:
+ with self.wait_for_page_content_change():
+ captcha_field.send_keys(solution + Keys.RETURN)
+ return True
+ else:
+ return False
except Exception as e:
log.debug(e)
- log.info("Error trying to solve captcha. Refresh and retry.")
- self.driver.refresh()
- time.sleep(3)
+ log.debug("Error trying to solve captcha. Refresh and retry.")
+ with self.wait_for_page_content_change():
+ self.driver.refresh()
+ return False
except sel_exceptions.NoSuchElementException:
- log.error("captcha page does not contain captcha element")
- log.error("refreshing")
- self.driver.refresh()
- time.sleep(3)
+ log.debug("captcha page does not contain captcha element")
+ with self.wait_for_page_content_change():
+ self.driver.refresh()
+ return False
@debug
def handle_business_po(self):
@@ -1416,14 +1551,22 @@ def save_page_source(self, page):
f.write(page_source)
@contextmanager
- def wait_for_page_content_change(self, timeout=30):
+ def wait_for_page_content_change(self, timeout=5):
"""Utility to help manage selenium waiting for a page to load after an action, like a click"""
old_page = self.driver.find_element_by_tag_name("html")
yield
- WebDriverWait(self.driver, timeout).until(EC.staleness_of(old_page))
- WebDriverWait(self.driver, timeout).until(
- EC.presence_of_element_located((By.XPATH, "//title"))
- )
+ try:
+ WebDriverWait(self.driver, timeout).until(EC.staleness_of(old_page))
+ WebDriverWait(self.driver, timeout).until(
+ EC.presence_of_element_located((By.XPATH, "//title"))
+ )
+ except sel_exceptions.TimeoutException:
+ log.info("Timed out reloading page, trying to continue anyway")
+ pass
+ except Exception as e:
+ log.error(f"Trying to recover from error: {e}")
+ pass
+ return None
def wait_for_page_change(self, page_title, timeout=3):
time_to_end = self.get_timeout(timeout=timeout)
@@ -1630,18 +1773,50 @@ def get_timestamp_filename(name, extension):
return name + "_" + date + "." + extension
-def get_shipping_costs(tree, free_shipping_string) -> Price:
+def get_shipping_costs(tree, free_shipping_string):
+ # This version expects to find the shipping pricing within a div with the explicit ID 'delivery-message'
+ shipping_xpath = ".//div[@id='delivery-message']"
+ shipping_nodes = tree.xpath(shipping_xpath)
+ count = len(shipping_nodes)
+ if count > 0:
+ # Get the text out of the div and evaluate it
+ shipping_node = shipping_nodes[0]
+ if shipping_node.text:
+ shipping_span_text = shipping_node.text.strip()
+ if any(
+ shipping_span_text.upper() in free_message
+ for free_message in amazon_config["FREE_SHIPPING"]
+ ):
+ # We found some version of "free" inside the span.. but this relies on a match
+ log.info(
+ f"Assuming free shipping based on this message: '{shipping_span_text}'"
+ )
+ return FREE_SHIPPING_PRICE
+ else:
+ # will it parse?
+ shipping_cost: Price = parse_price(shipping_span_text)
+ if shipping_cost.currency is not None:
+ log.debug(
+ f"Found parseable price with currency symbol: {shipping_cost.currency}"
+ )
+ return shipping_cost
+ # Try the alternative method...
+ return get_alt_shipping_costs(tree, free_shipping_string)
+
+
+def get_alt_shipping_costs(tree, free_shipping_string) -> Price:
# Assume Free Shipping and change otherwise
# Shipping collection xpath:
# .//div[starts-with(@id, 'aod-bottlingDepositFee-')]/following-sibling::span
- shipping_nodes = tree.xpath(
+ shipping_xpath = (
".//div[starts-with(@id, 'aod-bottlingDepositFee-')]/following-sibling::*[1]"
)
+ shipping_nodes = tree.xpath(shipping_xpath)
count = len(shipping_nodes)
log.debug(f"Found {count} shipping nodes.")
if count == 0:
- log.warning("No shipping nodes found. Assuming zero.")
+ log.warning("No shipping nodes (standard or alt) found. Assuming zero.")
return FREE_SHIPPING_PRICE
elif count > 1:
log.warning("Found multiple shipping nodes. Using the first.")
@@ -1793,3 +1968,7 @@ def wait_for_element_by_xpath(d, xpath, timeout=10):
return False
return True
+
+
+def join_xpaths(xpath_list, separator=" | "):
+ return separator.join(xpath_list)
diff --git a/stores/bestbuy.py b/stores/bestbuy.py
deleted file mode 100644
index a0a7cddd..00000000
--- a/stores/bestbuy.py
+++ /dev/null
@@ -1,373 +0,0 @@
-# FairGame - Automated Purchasing Program
-# Copyright (C) 2021 Hari Nagarajan
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-#
-# The author may be contacted through the project's GitHub, at:
-# https://github.com/Hari-Nagarajan/fairgame
-
-import json
-import webbrowser
-from time import sleep
-
-from chromedriver_py import binary_path # this will get you the path variable
-from selenium import webdriver
-from selenium.webdriver.chrome.options import Options
-from selenium.webdriver.support.ui import WebDriverWait
-
-try:
- from Crypto.PublicKey import RSA
- from Crypto.Cipher import PKCS1_OAEP
-except:
- from Cryptodome.PublicKey import RSA
- from Cryptodome.Cipher import PKCS1_OAEP
-
-import requests
-from requests.adapters import HTTPAdapter
-from requests.packages.urllib3.util.retry import Retry
-
-from utils.json_utils import find_values
-from utils.logger import log
-from utils.selenium_utils import enable_headless
-
-BEST_BUY_PDP_URL = "https://api.bestbuy.com/click/5592e2b895800000/{sku}/pdp"
-BEST_BUY_CART_URL = "https://api.bestbuy.com/click/5592e2b895800000/{sku}/cart"
-
-BEST_BUY_ADD_TO_CART_API_URL = "https://www.bestbuy.com/cart/api/v1/addToCart"
-BEST_BUY_CHECKOUT_URL = "https://www.bestbuy.com/checkout/c/orders/{order_id}/"
-
-DEFAULT_HEADERS = {
- "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
- "accept-encoding": "gzip, deflate, br",
- "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.102 Safari/537.36",
- "origin": "https://www.bestbuy.com",
-}
-
-options = Options()
-options.page_load_strategy = "eager"
-options.add_experimental_option("excludeSwitches", ["enable-automation"])
-options.add_experimental_option("useAutomationExtension", False)
-prefs = {"profile.managed_default_content_settings.images": 2}
-options.add_experimental_option("prefs", prefs)
-options.add_argument("user-data-dir=.profile-bb")
-
-
-class BestBuyHandler:
- def __init__(self, sku_id, notification_handler, headless=False):
- self.notification_handler = notification_handler
- self.sku_id = sku_id
- self.session = requests.Session()
- self.auto_buy = False
- self.account = {"username": "", "password": ""}
-
- adapter = HTTPAdapter(
- max_retries=Retry(
- total=3,
- backoff_factor=1,
- status_forcelist=[429, 500, 502, 503, 504],
- method_whitelist=["HEAD", "GET", "OPTIONS", "POST"],
- )
- )
- self.session.mount("https://", adapter)
- self.session.mount("http://", adapter)
-
- response = self.session.get(
- BEST_BUY_PDP_URL.format(sku=self.sku_id), headers=DEFAULT_HEADERS
- )
- log.info(f"PDP Request: {response.status_code}")
- self.product_url = response.url
- log.info(f"Product URL: {self.product_url}")
-
- self.session.get(self.product_url)
- log.info(f"Product URL Request: {response.status_code}")
-
- if self.auto_buy:
- log.info("Loading headless driver.")
- if headless:
- enable_headless() # TODO - check if this still messes up the cookies.
- options.add_argument(
- "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.102 Safari/537.36"
- )
-
- self.driver = webdriver.Chrome(
- executable_path=binary_path,
- options=options,
- )
- log.info("Loading https://www.bestbuy.com.")
- self.login()
-
- self.driver.get(self.product_url)
- cookies = self.driver.get_cookies()
-
- [
- self.session.cookies.set_cookie(
- requests.cookies.create_cookie(
- domain=cookie["domain"],
- name=cookie["name"],
- value=cookie["value"],
- )
- )
- for cookie in cookies
- ]
-
- # self.driver.quit()
-
- log.info("Calling location/v1/US/approximate")
- log.info(
- self.session.get(
- "https://www.bestbuy.com/location/v1/US/approximate",
- headers=DEFAULT_HEADERS,
- ).status_code
- )
-
- log.info("Calling basket/v1/basketCount")
- log.info(
- self.session.get(
- "https://www.bestbuy.com/basket/v1/basketCount",
- headers={
- "x-client-id": "browse",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.102 Safari/537.36",
- "Accept": "application/json",
- },
- ).status_code
- )
-
- def login(self):
- self.driver.get("https://www.bestbuy.com/identity/global/signin")
- self.driver.find_element_by_xpath('//*[@id="fld-e"]').send_keys(
- self.account["username"]
- )
- self.driver.find_element_by_xpath('//*[@id="fld-p1"]').send_keys(
- self.account["password"]
- )
- self.driver.find_element_by_xpath(
- "/html/body/div[1]/div/section/main/div[1]/div/div/div/div/form/div[3]/div/label/div/i"
- ).click()
- self.driver.find_element_by_xpath(
- "/html/body/div[1]/div/section/main/div[1]/div/div/div/div/form/div[4]/button"
- ).click()
- WebDriverWait(self.driver, 10).until(
- lambda x: "Official Online Store" in self.driver.title
- )
-
- def run_item(self):
- while not self.in_stock():
- sleep(5)
- log.info(f"Item {self.sku_id} is in stock!")
- if self.auto_buy:
- self.auto_checkout()
- else:
- cart_url = self.add_to_cart()
- self.notification_handler.send_notification(
- f"SKU: {self.sku_id} in stock: {cart_url}"
- )
- sleep(5)
-
- def in_stock(self):
- log.info("Checking stock")
- url = "https://www.bestbuy.com/api/tcfb/model.json?paths=%5B%5B%22shop%22%2C%22scds%22%2C%22v2%22%2C%22page%22%2C%22tenants%22%2C%22bbypres%22%2C%22pages%22%2C%22globalnavigationv5sv%22%2C%22header%22%5D%2C%5B%22shop%22%2C%22buttonstate%22%2C%22v5%22%2C%22item%22%2C%22skus%22%2C{}%2C%22conditions%22%2C%22NONE%22%2C%22destinationZipCode%22%2C%22%2520%22%2C%22storeId%22%2C%22%2520%22%2C%22context%22%2C%22cyp%22%2C%22addAll%22%2C%22false%22%5D%5D&method=get".format(
- self.sku_id
- )
- response = self.session.get(url, headers=DEFAULT_HEADERS)
- log.info(f"Stock check response code: {response.status_code}")
- try:
- response_json = response.json()
- item_json = find_values(
- json.dumps(response_json), "buttonStateResponseInfos"
- )
- item_state = item_json[0][0]["buttonState"]
- log.info(f"Item state is: {item_state}")
- if item_json[0][0]["skuId"] == self.sku_id and item_state in [
- "ADD_TO_CART",
- "PRE_ORDER",
- ]:
- return True
- else:
- return False
- except Exception as e:
- log.warning("Error parsing json. Using string search to determine state.")
- log.info(response_json)
- log.error(e)
- if "ADD_TO_CART" in response.text:
- log.info("Item is in stock!")
- return True
- else:
- log.info("Item is out of stock")
- return False
-
- def add_to_cart(self):
- webbrowser.open_new(BEST_BUY_CART_URL.format(sku=self.sku_id))
- return BEST_BUY_CART_URL.format(sku=self.sku_id)
-
- def auto_checkout(self):
- self.auto_add_to_cart()
- self.start_checkout()
- self.driver.get("https://www.bestbuy.com/checkout/c/r/fast-track")
-
- def auto_add_to_cart(self):
- log.info("Attempting to auto add to cart...")
-
- body = {"items": [{"skuId": self.sku_id}]}
- headers = {
- "Accept": "application/json",
- "authority": "www.bestbuy.com",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.102 Safari/537.36",
- "Content-Type": "application/json; charset=UTF-8",
- "Sec-Fetch-Site": "same-origin",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Dest": "empty",
- "origin": "https://www.bestbuy.com",
- "referer": self.product_url,
- "Content-Length": str(len(json.dumps(body))),
- }
- # [
- # log.info({'name': c.name, 'value': c.value, 'domain': c.domain, 'path': c.path})
- # for c in self.session.cookies
- # ]
- log.info("Making request")
- response = self.session.post(
- BEST_BUY_ADD_TO_CART_API_URL, json=body, headers=headers, timeout=5
- )
- log.info(response.status_code)
- if (
- response.status_code == 200
- and response.json()["cartCount"] > 0
- and self.sku_id in response.text
- ):
- log.info(f"Added {self.sku_id} to cart!")
- log.info(response.json())
- else:
- log.info(response.status_code)
- log.info(response.json())
-
- def start_checkout(self):
- headers = {
- "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
- "accept-encoding": "gzip, deflate, br",
- "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
- "upgrade-insecure-requests": "1",
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.92 Safari/537.36",
- }
- while True:
- log.info("Starting Checkout")
- response = self.session.post(
- "https://www.bestbuy.com/cart/d/checkout", headers=headers, timeout=5
- )
- if response.status_code == 200:
- response_json = response.json()
- log.info(response_json)
- self.order_id = response_json["updateData"]["order"]["id"]
- self.item_id = response_json["updateData"]["order"]["lineItems"][0][
- "id"
- ]
- log.info(f"Started Checkout for order id: {self.order_id}")
- log.info(response_json)
- if response_json["updateData"]["redirectUrl"]:
- self.session.get(
- response_json["updateData"]["redirectUrl"], headers=headers
- )
- return
- log.info("Error Starting Checkout")
- sleep(5)
-
- def submit_shipping(self):
- log.info("Starting Checkout")
- headers = {
- "accept": "application/json, text/javascript, */*; q=0.01",
- "accept-encoding": "gzip, deflate, br",
- "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
- "content-type": "application/json",
- "origin": "https://www.bestbuy.com",
- "referer": "https://www.bestbuy.com/cart",
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.92 Safari/537.36",
- "x-user-interface": "DotCom-Optimized",
- "x-order-id": self.order_id,
- }
- while True:
- log.info("Submitting Shipping")
- body = {"selected": "SHIPPING"}
- response = self.session.put(
- "https://www.bestbuy.com/cart/item/{item_id}/fulfillment".format(
- item_id=self.item_id
- ),
- headers=headers,
- json=body,
- )
- response_json = response.json()
- log.info(response.status_code)
- log.info(response_json)
- if (
- response.status_code == 200
- and response_json["order"]["id"] == self.order_id
- ):
- log.info("Submitted Shipping")
- return True
- else:
- log.info("Error Submitting Shipping")
-
- def submit_payment(self, tas_data):
- body = {
- "items": [
- {
- "id": self.item_id,
- "type": "DEFAULT",
- "selectedFulfillment": {"shipping": {"address": {}}},
- "giftMessageSelected": False,
- }
- ]
- }
- headers = {
- "accept": "application/com.bestbuy.order+json",
- "accept-encoding": "gzip, deflate, br",
- "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
- "content-type": "application/json",
- "origin": "https://www.bestbuy.com",
- "referer": "https://www.bestbuy.com/checkout/r/fulfillment",
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.92 Safari/537.36",
- "x-user-interface": "DotCom-Optimized",
- }
- r = self.session.patch(
- "https://www.bestbuy.com/checkout/d/orders/{}/".format(self.order_id),
- json=body,
- headers=headers,
- )
- [
- log.info(
- {"name": c.name, "value": c.value, "domain": c.domain, "path": c.path}
- )
- for c in self.session.cookies
- ]
- log.info(r.status_code)
- log.info(r.text)
-
- def get_tas_data(self):
- headers = {
- "accept": "*/*",
- "accept-encoding": "gzip, deflate, br",
- "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
- "content-type": "application/json",
- "referer": "https://www.bestbuy.com/checkout/r/payment",
- "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.92 Safari/537.36",
- }
- while True:
- try:
- log.info("Getting TAS Data")
- r = requests.get(
- "https://www.bestbuy.com/api/csiservice/v2/key/tas", headers=headers
- )
- log.info("Got TAS Data")
- return json.loads(r.text)
- except Exception as e:
- sleep(5)
diff --git a/stores/nvidia.py b/stores/nvidia.py
deleted file mode 100644
index 7ac1137b..00000000
--- a/stores/nvidia.py
+++ /dev/null
@@ -1,271 +0,0 @@
-# FairGame - Automated Purchasing Program
-# Copyright (C) 2021 Hari Nagarajan
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-#
-# The author may be contacted through the project's GitHub, at:
-# https://github.com/Hari-Nagarajan/fairgame
-
-import concurrent
-import json
-import webbrowser
-from concurrent.futures.thread import ThreadPoolExecutor
-from datetime import datetime
-from time import sleep
-
-import browser_cookie3
-import requests
-from spinlog import Spinner
-
-from utils.http import TimeoutHTTPAdapter
-from utils.logger import log
-
-NVIDIA_CART_URL = (
- "https://store.nvidia.com/store?Action=DisplayHGOP2LandingPage&SiteID=nvidia"
-)
-NVIDIA_TOKEN_URL = "https://store.nvidia.com/store/nvidia/SessionToken"
-NVIDIA_STOCK_API = "https://api-prod.nvidia.com/direct-sales-shop/DR/products/{locale}/{currency}/{product_id}"
-NVIDIA_ADD_TO_CART_API = "https://api-prod.nvidia.com/direct-sales-shop/DR/add-to-cart"
-
-GPU_DISPLAY_NAMES = {
- "2060S": "NVIDIA GEFORCE RTX 2060 SUPER",
- "3080": "NVIDIA GEFORCE RTX 3080",
- "3090": "NVIDIA GEFORCE RTX 3090",
-}
-
-CURRENCY_LOCALE_MAP = {
- "en_us": "USD",
- "en_gb": "GBP",
- "de_de": "EUR",
- "fr_fr": "EUR",
- "it_it": "EUR",
- "es_es": "EUR",
- "nl_nl": "EUR",
- "sv_se": "SEK",
- "de_at": "EUR",
- "fr_be": "EUR",
- "da_dk": "DKK",
- "cs_cz": "CZK",
-}
-
-DEFAULT_HEADERS = {
- "Accept": "application/json",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.102 Safari/537.36",
-}
-CART_SUCCESS_CODES = {201, requests.codes.ok}
-
-
-class ProductIDChangedException(Exception):
- def __init__(self):
- super().__init__("Product IDS changed. We need to re run.")
-
-
-PRODUCT_IDS_FILE = "stores/store_data/nvidia_product_ids.json"
-PRODUCT_IDS = json.load(open(PRODUCT_IDS_FILE))
-
-
-class NvidiaBuyer:
- def __init__(
- self, gpu, notification_handler, locale="en_us", test=False, interval=5
- ):
- self.product_ids = set([])
- self.cli_locale = locale.lower()
- self.locale = self.map_locales()
- self.session = requests.Session()
- self.gpu = gpu
- self.enabled = True
- self.auto_buy_enabled = False
- self.attempt = 0
- self.started_at = datetime.now()
- self.test = test
- self.interval = interval
-
- self.gpu_long_name = GPU_DISPLAY_NAMES[gpu]
-
- self.cj = browser_cookie3.load(".nvidia.com")
- self.session.cookies = self.cj
-
- # Disable auto_buy_enabled if the user does not provide a bool.
- if type(self.auto_buy_enabled) != bool:
- self.auto_buy_enabled = False
-
- adapter = TimeoutHTTPAdapter()
- self.session.mount("https://", adapter)
- self.session.mount("http://", adapter)
- self.notification_handler = notification_handler
-
- self.get_product_ids()
-
- def map_locales(self):
- if self.cli_locale == "de_at":
- return "de_de"
- if self.cli_locale == "fr_be":
- return "fr_fr"
- if self.cli_locale == "da_dk":
- return "en_gb"
- if self.cli_locale == "cs_cz":
- return "en_gb"
- return self.cli_locale
-
- def get_product_ids(self):
- if isinstance(PRODUCT_IDS[self.cli_locale][self.gpu], list):
- self.product_ids = PRODUCT_IDS[self.cli_locale][self.gpu]
- if isinstance(PRODUCT_IDS[self.cli_locale][self.gpu], str):
- self.product_ids = [PRODUCT_IDS[self.cli_locale][self.gpu]]
-
- def run_items(self):
- log.info(
- f"We have {len(self.product_ids)} product IDs for {self.gpu_long_name}"
- )
- log.info(f"Product IDs: {self.product_ids}")
- try:
- with ThreadPoolExecutor(max_workers=len(self.product_ids)) as executor:
- product_futures = [
- executor.submit(self.buy, product_id)
- for product_id in self.product_ids
- ]
- concurrent.futures.wait(product_futures)
- for fut in product_futures:
- log.debug(f"Future Result: {fut.result()}")
- except ProductIDChangedException as ex:
- log.warning("Product IDs changed.")
- self.product_ids = set([])
- self.get_product_ids()
- self.run_items()
-
- def buy(self, product_id):
- try:
- log.info(f"Stock Check {product_id} at {self.interval} second intervals.")
- while not self.is_in_stock(product_id):
- self.attempt = self.attempt + 1
- time_delta = str(datetime.now() - self.started_at).split(".")[0]
- with Spinner.get(
- f"Stock Check ({self.attempt}, have been running for {time_delta})..."
- ) as s:
- sleep(self.interval)
- if self.enabled:
- cart_success = self.add_to_cart(product_id)
- if cart_success:
- log.info(f"{self.gpu_long_name} added to cart.")
- self.enabled = False
- webbrowser.open(NVIDIA_CART_URL)
- self.notification_handler.send_notification(
- f" {self.gpu_long_name} with product ID: {product_id} in "
- f"stock: {NVIDIA_CART_URL}"
- )
- else:
- self.notification_handler.send_notification(
- f" ERROR: Attempted to add {self.gpu_long_name} to cart but couldn't, check manually!"
- )
- self.buy(product_id)
- except requests.exceptions.RequestException as e:
- log.warning("Connection error while calling Nvidia API. API may be down.")
- log.info(
- f"Got an unexpected reply from the server, API may be down, nothing we can do but try again"
- )
- self.buy(product_id)
-
- def is_in_stock(self, product_id):
- try:
- response = self.session.get(
- NVIDIA_STOCK_API.format(
- product_id=product_id,
- locale=self.locale,
- currency=CURRENCY_LOCALE_MAP.get(self.locale, "USD"),
- cookies=self.cj,
- ),
- headers=DEFAULT_HEADERS,
- )
- log.debug(f"Stock check response code: {response.status_code}")
- if response.status_code != 200:
- log.debug(response.text)
- if "PRODUCT_INVENTORY_IN_STOCK" in response.text:
- return True
- else:
- return False
- except requests.exceptions.RequestException as e:
- log.info(
- f"Got an unexpected reply from the server, API may be down, nothing we can do but try again"
- )
- return False
-
- def add_to_cart(self, product_id):
- try:
- success, token = self.get_session_token()
- if not success:
- return False
- log.info(f"Session token: {token}")
-
- data = {"products": [{"productId": product_id, "quantity": 1}]}
- headers = DEFAULT_HEADERS.copy()
- headers["locale"] = self.locale
- headers["nvidia_shop_id"] = token
- headers["Content-Type"] = "application/json"
- response = self.session.post(
- url=NVIDIA_ADD_TO_CART_API,
- headers=headers,
- data=json.dumps(data),
- cookies=self.cj,
- )
- if response.status_code == 200:
- response_json = response.json()
- print(response_json)
- if "successfully" in response_json["message"]:
- return True
- else:
- log.error(response.text)
- log.error(
- f"Add to cart failed with {response.status_code}. This is likely an error with nvidia's API."
- )
- return False
- except requests.exceptions.RequestException as e:
- log.info(e)
- log.info(
- f"Got an unexpected reply from the server, API may be down, nothing we can do but try again"
- )
- return False
-
- def get_session_token(self):
- """
- Ok now this works, but I dont know when the cookies expire so might be unstable.
- :return:
- """
-
- params = {"format": "json", "locale": self.locale}
- headers = DEFAULT_HEADERS.copy()
- headers["locale"] = self.locale
- headers["cookie"] = "; ".join(
- [f"{cookie.name}={cookie.value}" for cookie in self.session.cookies]
- )
-
- try:
- response = self.session.get(
- NVIDIA_TOKEN_URL,
- headers=headers,
- params=params,
- cookies=self.cj,
- )
- if response.status_code == 200:
- response_json = response.json()
- if "session_token" not in response_json:
- log.error("Error getting session token.")
- return False, ""
- return True, response_json["session_token"]
- else:
- log.debug(f"Get Session Token: {response.status_code}")
- except requests.exceptions.RequestException as e:
- log.info(
- f"Got an unexpected reply from the server, API may be down, nothing we can do but try again"
- )
- return False
diff --git a/utils/encryption.py b/utils/encryption.py
index 92516cda..046d296a 100644
--- a/utils/encryption.py
+++ b/utils/encryption.py
@@ -20,11 +20,13 @@
import getpass as getpass
import stdiomask
import json
+import math
import os
from base64 import b64encode, b64decode
from Crypto.Cipher import ChaCha20_Poly1305
from Crypto.Random import get_random_bytes
from Crypto.Protocol.KDF import scrypt
+from psutil import virtual_memory
from utils.logger import log
@@ -33,7 +35,7 @@ def encrypt(pt, password):
"""Encryption function to securely store user credentials, uses ChaCha_Poly1305
with a user defined SCrypt key."""
salt = get_random_bytes(32)
- key = scrypt(password, salt, key_len=32, N=2 ** 20, r=8, p=1)
+ key = scrypt(password, salt, key_len=32, N=get_scrypt_cost_factor(), r=8, p=1)
nonce = get_random_bytes(12)
cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
ct, tag = cipher.encrypt_and_digest(pt)
@@ -50,8 +52,9 @@ def decrypt(ct, password):
b64Ct = json.loads(ct)
json_k = ["nonce", "salt", "ct", "tag"]
json_v = {k: b64decode(b64Ct[k]) for k in json_k}
-
- key = scrypt(password, json_v["salt"], key_len=32, N=2 ** 20, r=8, p=1)
+ key = scrypt(
+ password, json_v["salt"], key_len=32, N=get_scrypt_cost_factor(), r=8, p=1
+ )
cipher = ChaCha20_Poly1305.new(key=key, nonce=json_v["nonce"])
ptData = cipher.decrypt_and_verify(json_v["ct"], json_v["tag"])
@@ -109,6 +112,15 @@ def load_encrypted_config(config_path, encrypted_pass=None):
)
+def get_scrypt_cost_factor(mem_percentage=0.5):
+ # Returns scrypt cost factor 'N' param based off of system memory
+ # Max value is 2 ** 20
+ mem = math.floor(virtual_memory().total * mem_percentage / 1024)
+ # Value must be a power of 2
+ exponent = math.floor(math.log(mem, 2))
+ return min(2 ** 20, 2 ** exponent)
+
+
# def main():
#
# password = getpass.getpass(prompt="Password: ")
diff --git a/utils/version.py b/utils/version.py
index 02bf3ab6..8a320d0f 100644
--- a/utils/version.py
+++ b/utils/version.py
@@ -27,7 +27,7 @@
# See https://www.python.org/dev/peps/pep-0440/ for specification
# See https://www.python.org/dev/peps/pep-0440/#examples-of-compliant-version-schemes for examples
-__VERSION = "0.6.0"
+__VERSION = "0.6.1"
version = Version(__VERSION)
@@ -44,7 +44,7 @@ def is_latest():
def get_latest_version():
try:
- r = requests.get(_LATEST_URL, timeout=5)
+ r = requests.get(_LATEST_URL)
data = r.json()
latest_version = parse(str(data["tag_name"]))
except InvalidVersion: