#!/usr/bin/python3 """Crawl newegg.com for storage prices.""" import collections import configparser import decimal import sqlite3 import logging import os import random import re import sys import time import typing import urllib.parse from collections import defaultdict from datetime import date from decimal import Decimal from typing import Optional import daiquiri import lxml.html from jinja2 import Environment, FileSystemLoader from playwright.sync_api import sync_playwright daiquiri.setup(level=logging.INFO) logger = daiquiri.getLogger(__name__) user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" re_page = re.compile(r"Page \d+/(\d+)") re_size1 = re.compile(r"\b([0-9.]+) ?([TGtg])[Bb]\b(?!/s)") re_size2 = re.compile(r"\b([0-9.]+) ?([TGtg])[Bb]?\b(?!/s)") re_pack = re.compile(r"\b([0-9]+) Pack", re.I) root_dir = os.path.dirname(sys.argv[0]) data_root = os.path.join(root_dir, "data") def exists_or_create_dir(d: str) -> None: """Create a directory if it doesn't already exist.""" if not os.path.exists(d): os.makedirs(d) def random_sleep() -> None: """Sleep for a random amount of time between 20 and 90 seconds.""" time.sleep(random.randint(20, 90)) def get_product_list(n: str, page: Optional[int] = None) -> str: """Get product list using Playwright to handle bot detection.""" params: dict[str, str] = { "N": n, "IsNodeId": "1", "Order": "RELEASE", "PageSize": "96", } if page is not None: params["page"] = str(page) url = "https://www.newegg.com/p/pl?" + urllib.parse.urlencode(params) logger.info("fetching", url=url) with sync_playwright() as p: browser = p.chromium.launch( headless=True, args=["--disable-blink-features=AutomationControlled"], ) context = browser.new_context( user_agent=user_agent, viewport={"width": 1280, "height": 800}, ) context.add_init_script( "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" ) pg = context.new_page() pg.goto(url, wait_until="networkidle", timeout=60000) if "areyouahuman" in pg.url: logger.info("bot detection triggered, simulating mouse movement...") for i in range(30): x = 100 + i * 30 + random.randint(-5, 5) y = 300 + random.randint(-20, 20) pg.mouse.move(x, y) time.sleep(0.04 + random.random() * 0.04) try: frame = pg.frame_locator('iframe[title*="reCAPTCHA"]').first frame.locator("#recaptcha-anchor").click(timeout=5000) pg.wait_for_url(lambda u: "areyouahuman" not in u, timeout=30000) except Exception: logger.warning("CAPTCHA bypass failed") html = pg.content() browser.close() return html # RSS URL: https://www.newegg.com/Product/RSS.aspx?Submit=ENE&N=8000%204814%20600003489&IsNodeId=1&ShowDeactivatedMark=False&Order=RELEASE # ^ can include order=RELEASE # seller = newegg: 8000 # condition = new: 4814 # form factor = 2.5": 600003490 # form factor = 3.5": 600003489 # Desktop Internal Hard Drives: 100167523 # Laptop Internal Hard Drives: 100167524 # Desktop External Hard Drives: 100167525 # Portable External Hard Drives: 100167526 # Internal SSD: 100011693 # form factor = 2.5" SSD: 600038463 601330896 # form factor = M.2: 601193224 601193225 601292490 # SATA: 600038506 600038510 600038519 # PCI Express: 600640786 601296941 601301243 filter_params = [ ("internal_35", '3.5" Internal HDD', "100167523 8000 4814 600003489"), ("portable_25", '2.5" Portable HDD', "100167526 8000 4818 600003490"), # ('portable_35', '3.5" portable drives', '100167526 8000 4818 600003489'), ("external_35", '3.5" External HDD', "100167525 8000 4818 600003489"), ( "ssd_sata", "SATA SSD", "100011693 8000 4814 600038506 600038510 600038519", ), ( "ssd_pcie", "NVMe SSD", "100011693 8000 4814 600640786 601296941 601301243", ), ] def page_filename(d: str, name: str, page: int) -> str: """Get page filename.""" return os.path.join(d, f"{name}_page{page:02d}.html") def save_page(html: str, d: str, name: str, page: int) -> None: """Save page.""" open(page_filename(d, name, page), "w").write(html) def get_pages() -> None: """Get pages.""" today_dir = os.path.join(data_root, str(date.today())) exists_or_create_dir(today_dir) download = False for name, label, filter_param in filter_params: filename = page_filename(today_dir, name, 1) print(filename) if os.path.exists(filename): page_content = open(filename).read() else: logger.info(f"get {name}", label=label, page=1) if download: random_sleep() page_content = get_product_list(filter_param) download = True save_page(page_content, today_dir, name, 1) page_content = page_content.replace("", "") page_count = get_page_count(page_content) logger.info(f"{name} page count: {page_count}") for page_num in range(2, page_count + 1): filename = page_filename(today_dir, name, page_num) if os.path.exists(filename): continue logger.info(f"get {name}", label=label, page=page_num) if download: random_sleep() html = get_product_list(filter_param, page=page_num) download = True save_page(html, today_dir, name, page_num) def get_page_count(html: str) -> int: """Get page count.""" m = re_page.search(html) assert m return int(m.group(1)) def hidden_price(item: lxml.html.HtmlElement) -> bool: """Hidden price.""" price_action = item.find('.//li[@class="price-map"]/a') hidden = ["See price in cart", "See Price after Checkout"] return price_action is not None and price_action.text in hidden def out_of_stock(item: lxml.html.HtmlElement) -> bool: """Item is out of stock.""" cur_price = item.find('.//li[@class="price-current"]') if cur_price is None: cur_price = item.find('.//li[@class="price-current "]') promo = item.find('.//p[@class="item-promo"]') btn_message = item.find('.//span[@class="btn btn-message "]') if cur_price is None: print(lxml.html.tostring(item, pretty_print=True, encoding="unicode")) assert cur_price is not None return ( len(cur_price) == 0 and (promo is not None and promo.text_content() == "OUT OF STOCK") or (btn_message is not None and btn_message.text == "Out Of Stock") ) class Item(typing.TypedDict): """Item.""" price: Decimal title: str size: str size_gb: Decimal number: str price_per_tb: Decimal def parse_page(filename: str) -> list[Item]: """Parse page.""" root = lxml.html.parse(filename).getroot() items: list[Item] = [] for item in root.xpath("//div[contains(@class, 'item-container')]"): title_link = item.find('.//a[@class="item-title"]') href = title_link.get("href") item_number = href[href.find("Item=") + 5 :] title = title_link.text_content() # compare = item.find('.//div[@class="item-compare-box"]//input') # if compare is None: # continue # item_number = compare.get('neg-itemnumber') if not item_number: print(lxml.html.tostring(item, pretty_print=True, encoding="unicode")) assert item_number if hidden_price(item) or out_of_stock(item): continue dollars = item.find('.//li[@class="price-current"]/strong') if dollars is not None and dollars.text == "COMING SOON": continue if dollars is None: dollars = item.find('.//li[@class="price-current "]/strong') if dollars is None: price_was = item.find('.//span[@class="price-was-data"]') if price_was is not None: continue if dollars is None: print(item_number, "//", title) print(lxml.html.tostring(item, pretty_print=True, encoding="unicode")) cents = dollars.getnext() price_str = dollars.text + ("" if cents is None else cents.text) try: price = Decimal(price_str.replace(",", "")) except decimal.InvalidOperation: print(repr(price_str)) raise m = re_size1.search(title) if not m: m = re_size2.search(title) if not m: continue size = m.group(1) + m.group(2) + "B" size_gb = Decimal(m.group(1)) if m.group(2) == "T": size_gb *= 1000 items.append( { "price": price, "title": title, "size": size, "size_gb": size_gb, "number": item_number, "price_per_tb": (price / size_gb) * 1000, } ) return items def build_file_map(data_dir: str) -> dict[str, list[tuple[int, str]]]: """Build file map.""" files = defaultdict(list) for f in sorted(os.listdir(data_dir)): pos = f.rfind("_page") name = f[:pos] page = int(f[pos + 5 : pos + 7]) files[name].append((page, f)) return files def get_data_dir(today: date) -> str: """Get data dir.""" data_dir = os.path.join(data_root, str(today)) if not os.path.exists(data_dir): alt = max(x for x in os.listdir(data_root) if x[0].isdigit()) print(f"Today dir ({today}) doesn't exist. Using most recent data ({alt}).") print() data_dir = os.path.join(data_root, alt) return data_dir class Grouped(typing.TypedDict): """Grouped items.""" name: str label: str items: list[Item] def group_items( today: date, ) -> collections.abc.Iterator[Grouped]: """Group items.""" data_dir = get_data_dir(today) files = build_file_map(data_dir) for name, label, filter_param in filter_params: logger.info(f"category: {label} ({name})") seen = set() items = [] for page_num, f in files[name]: for item in parse_page(os.path.join(data_dir, f)): if item["number"] in seen: logger.info("duplicate", item_number=item["number"]) continue seen.add(item["number"]) items.append(item) items.sort(key=lambda i: i["price_per_tb"]) yield {"name": name, "label": label, "items": items} db_path = os.path.join(data_root, "prices.db") sqlite3.register_adapter(Decimal, str) def init_db(conn: sqlite3.Connection) -> None: """Create tables if they don't exist.""" conn.execute(""" CREATE TABLE IF NOT EXISTS price ( item_number TEXT NOT NULL, title TEXT NOT NULL, size_gb NUMERIC NOT NULL, price NUMERIC NOT NULL, category TEXT NOT NULL, seen_at DATE NOT NULL, PRIMARY KEY (item_number, seen_at) ) """) conn.commit() def record_prices(data: list[Grouped], today: date) -> None: """Record today's prices to the database.""" conn = sqlite3.connect(db_path) init_db(conn) for cat in data: for item in cat["items"]: conn.execute( """ INSERT OR REPLACE INTO price (item_number, title, size_gb, price, category, seen_at) VALUES (?, ?, ?, ?, ?, ?) """, (item["number"], item["title"], item["size_gb"], item["price"], cat["name"], today), ) conn.commit() conn.close() logger.info("prices recorded", db=db_path, date=today) def get_build_root() -> str: """Read build_dir from ~/.config/newegg-hdd/config, fall back to output/.""" config_path = os.path.expanduser("~/.config/newegg-hdd/config") if os.path.exists(config_path): config = configparser.ConfigParser() config.read(config_path) if config.has_option("newegg-hdd", "build_dir"): return config.get("newegg-hdd", "build_dir") return os.path.join(root_dir, "output") def build() -> None: """Build.""" build_root = get_build_root() today = date.today() templates_dir = os.path.join(root_dir, "templates") env = Environment(loader=FileSystemLoader(templates_dir)) data = list(group_items(today)) record_prices(data, today) index = os.path.join(build_root, "index.html") index_template = env.get_template("index.html") page = index_template.render(best=data, today=today) open(index, "w").write(page) list_template = env.get_template("item_list.html") for cat in data: page = list_template.render(items=cat["items"], today=today, what=cat["label"]) exists_or_create_dir(os.path.join(build_root, cat["name"])) filename = os.path.join(build_root, cat["name"], "index.html") open(filename, "w").write(page) if __name__ == "__main__": get_pages() build()