commit 67cf7151f918bab8d861e01c25630fe9f07b2f74 Author: Edward Betts Date: Fri Oct 6 18:32:20 2023 +0100 Initial commit diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..379270c --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 Edward Betts + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..a444f08 --- /dev/null +++ b/README.md @@ -0,0 +1,82 @@ +# Newegg hard drive price tracker + +## Overview + +The Newegg hard drive price tracker is a Python tool designed to fetch the prices of various types of hard drives from Newegg's online store. It sorts and displays these drives based on their price per terabyte, making it easier for users to find the most cost-effective storage solutions. The output is available at the [Price per TB](https://edwardbetts.com/price_per_tb/) page. + +## Requirements + +- Python 3.x +- `requests` +- `lxml` +- `jinja2` +- `daiquiri` + +## Files in the Repository + +- `crawl.py`: The main script that crawls Newegg's product listings and extracts relevant data. +- `README.md`: This file, which provides an overview and instructions for the tool. + +## Installation + +1. Clone the repository: + + ```bash + git clone https://git.4angle.com/edward/newegg-hdd.git + ``` + +2. Change into the project directory: + + ```bash + cd newegg-hdd + ``` + +3. Install the required Python packages: + + ```bash + pip install -r requirements.txt + ``` + +## Usage + +### Automated Crawling + +You can set up a cron job to automate the data crawling process. The cron job will execute the `crawl.py` script daily at 08:50. Below is the crontab configuration: + +``` +50 08 * * * chronic ~/src/2019/newegg-hdd/crawl.py +``` + +### Manual Crawling + +Run the `crawl.py` script manually: + +```bash +python crawl.py +``` + +### Output + +After the script runs, you can view the sorted listings at the [Price per TB](https://edwardbetts.com/price_per_tb/) page. + +## Functionality + +The tool performs the following operations: + +- Crawls the Newegg product listings based on predefined filter parameters. +- Downloads and saves HTML pages for later parsing. +- Extracts relevant details, such as price, size, and product number. +- Calculates the price per terabyte for each hard drive. +- Outputs a sorted list of hard drives based on their price per terabyte. + +## License + +This tool is released under the [MIT License](LICENSE). + +## Contributing + +If you find any bugs or have suggestions for improvements, please open an issue on the repository. + +## Contact + +For any queries or issues, please visit the repository at [https://git.4angle.com/edward/newegg-hdd](https://git.4angle.com/edward/newegg-hdd). diff --git a/crawl.py b/crawl.py new file mode 100755 index 0000000..c057eff --- /dev/null +++ b/crawl.py @@ -0,0 +1,339 @@ +#!/usr/bin/python3 +"""Crawl newegg.com for storage prices.""" + +import collections +import decimal +import logging +import os +import random +import re +import sys +import time +import typing +from collections import defaultdict +from datetime import date +from decimal import Decimal +from typing import Optional + +import daiquiri +import lxml.html +import requests +from jinja2 import Environment, FileSystemLoader + +daiquiri.setup(level=logging.INFO) +logger = daiquiri.getLogger(__name__) + +# user_agent = 'Mozilla/5.0 (X11; Linux i586; rv:31.0) Gecko/20100101 Firefox/31.0' +user_agent = "UniversalFeedParser/5.2.0 +http://feedparser.org/" +product_list_url = "https://www.newegg.com/Product/ProductList.aspx" + +re_page = re.compile(r"Page \d+/(\d+)") +re_size1 = re.compile(r"\b([0-9.]+) ?([TGtg])[Bb]\b(?!/s)") +re_size2 = re.compile(r"\b([0-9.]+) ?([TGtg])[Bb]?\b(?!/s)") +re_pack = re.compile(r"\b([0-9]+) Pack", re.I) + +root_dir = os.path.dirname(sys.argv[0]) +data_root = os.path.join(root_dir, "data") + + +def exists_or_create_dir(d: str) -> None: + """Create a directory if it doesn't already exist.""" + if not os.path.exists(d): + os.mkdir(d) + + +def random_sleep() -> None: + """Sleep for a random amount of time between 20 and 90 seconds.""" + time.sleep(random.randint(20, 90)) + + +def get_product_list(n: str, page: Optional[int] = None) -> requests.Response: + """Get product list.""" + params: dict[str, str | int] = { + "Submit": "ENE", + "N": n, + "IsNodeId": 1, + "ActiveSearchResult": "True", + "Order": "RELEASE", + "PageSize": 96, + } + if page is not None: + params["page"] = page + r = requests.get( + product_list_url, + # allow_redirects=False, + params=params, + headers={"User-Agent": user_agent}, + ) + print(r.url) + logger.debug("request", url=r.url) + return r + + +# RSS URL: https://www.newegg.com/Product/RSS.aspx?Submit=ENE&N=8000%204814%20600003489&IsNodeId=1&ShowDeactivatedMark=False&Order=RELEASE +# ^ can include order=RELEASE + +# seller = newegg: 8000 +# condition = new: 4814 +# form factor = 2.5": 600003490 +# form factor = 3.5": 600003489 +# Desktop Internal Hard Drives: 100167523 +# Laptop Internal Hard Drives: 100167524 +# Desktop External Hard Drives: 100167525 +# Portable External Hard Drives: 100167526 +# Internal SSD: 100011693 +# form factor = 2.5" SSD: 600038463 601330896 +# form factor = M.2: 601193224 601193225 601292490 +# SATA: 600038506 600038510 600038519 +# PCI Express: 600640786 601296941 601301243 + + +filter_params = [ + ("internal_35", '3.5" internal drives', "100167523 8000 4814 600003489"), + ("internal_25", '2.5" internal drives', "100167523 8000 4814 600003490"), + ("laptop_25", '2.5" laptop drives', "100167524 8000 4814 600003490"), + ("portable_25", '2.5" portable drives', "100167526 8000 4818 600003490"), + # ('portable_35', '3.5" portable drives', '100167526 8000 4818 600003489'), + ("external_35", '3.5" external drives', "100167525 8000 4818 600003489"), + ( + "ssd_sata", + "SSD with SATA interface", + "100011693 8000 4814 600038506 600038510 600038519", + ), + ( + "ssd_pcie", + "SSD with PCIe interface", + "100011693 8000 4814 600640786 601296941 601301243", + ), +] + + +def page_filename(d: str, name: str, page: int) -> str: + """Get page filename.""" + return os.path.join(d, f"{name}_page{page:02d}.html") + + +def save_page(r: requests.models.Response, d: str, name: str, page: int) -> None: + """Save page.""" + open(page_filename(d, name, page), "w").write(r.text) + + +def get_pages() -> None: + """Get pages.""" + today_dir = os.path.join(data_root, str(date.today())) + exists_or_create_dir(today_dir) + + download = False + for name, label, filter_param in filter_params: + filename = page_filename(today_dir, name, 1) + print(filename) + if os.path.exists(filename): + page_content = open(filename).read() + else: + logger.info(f"get {name}", label=label, page=1) + if download: + random_sleep() + page1 = get_product_list(filter_param) + download = True + page_content = page1.text + save_page(page1, today_dir, name, 1) + page_content = page_content.replace("", "") + page_count = get_page_count(page_content) + logger.info(f"{name} page count: {page_count}") + for page_num in range(2, page_count + 1): + filename = page_filename(today_dir, name, page_num) + if os.path.exists(filename): + continue + logger.info(f"get {name}", label=label, page=page_num) + if download: + random_sleep() + r = get_product_list(filter_param, page=page_num) + download = True + save_page(r, today_dir, name, page_num) + + +def get_page_count(html: str) -> int: + """Get page count.""" + m = re_page.search(html) + assert m + return int(m.group(1)) + + +def hidden_price(item: lxml.html.HtmlElement) -> bool: + """Hidden price.""" + price_action = item.find('.//li[@class="price-map"]/a') + hidden = ["See price in cart", "See Price after Checkout"] + return price_action is not None and price_action.text in hidden + + +def out_of_stock(item: lxml.html.HtmlElement) -> bool: + """Item is out of stock.""" + cur_price = item.find('.//li[@class="price-current"]') + if cur_price is None: + cur_price = item.find('.//li[@class="price-current "]') + promo = item.find('.//p[@class="item-promo"]') + btn_message = item.find('.//span[@class="btn btn-message "]') + if cur_price is None: + print(lxml.html.tostring(item, pretty_print=True, encoding="unicode")) + assert cur_price is not None + return ( + len(cur_price) == 0 + and (promo is not None and promo.text_content() == "OUT OF STOCK") + or (btn_message is not None and btn_message.text == "Out Of Stock") + ) + + +class Item(typing.TypedDict): + """Item.""" + + price: Decimal + title: str + size: str + size_gb: Decimal + number: str + price_per_tb: Decimal + + +def parse_page(filename: str) -> list[Item]: + """Parse page.""" + root = lxml.html.parse(filename).getroot() + + items: list[Item] = [] + for item in root.xpath("//div[contains(@class, 'item-container')]"): + title_link = item.find('.//a[@class="item-title"]') + href = title_link.get("href") + item_number = href[href.find("Item=") + 5 :] + title = title_link.text_content() + + # compare = item.find('.//div[@class="item-compare-box"]//input') + # if compare is None: + # continue + # item_number = compare.get('neg-itemnumber') + if not item_number: + print(lxml.html.tostring(item, pretty_print=True, encoding="unicode")) + assert item_number + + if hidden_price(item) or out_of_stock(item): + continue + dollars = item.find('.//li[@class="price-current"]/strong') + if dollars is not None and dollars.text == "COMING SOON": + continue + if dollars is None: + dollars = item.find('.//li[@class="price-current "]/strong') + if dollars is None: + price_was = item.find('.//span[@class="price-was-data"]') + if price_was is not None: + continue + + if dollars is None: + print(item_number, "//", title) + print(lxml.html.tostring(item, pretty_print=True, encoding="unicode")) + cents = dollars.getnext() + price_str = dollars.text + ("" if cents is None else cents.text) + try: + price = Decimal(price_str.replace(",", "")) + except decimal.InvalidOperation: + print(repr(price_str)) + raise + m = re_size1.search(title) + if not m: + m = re_size2.search(title) + if not m: + continue + size = m.group(1) + m.group(2) + "B" + size_gb = Decimal(m.group(1)) + if m.group(2) == "T": + size_gb *= 1000 + + items.append( + { + "price": price, + "title": title, + "size": size, + "size_gb": size_gb, + "number": item_number, + "price_per_tb": (price / size_gb) * 1000, + } + ) + + return items + + +def build_file_map(data_dir: str) -> dict[str, list[tuple[int, str]]]: + """Build file map.""" + files = defaultdict(list) + for f in sorted(os.listdir(data_dir)): + pos = f.rfind("_page") + name = f[:pos] + page = int(f[pos + 5 : pos + 7]) + files[name].append((page, f)) + return files + + +def get_data_dir(today: date) -> str: + """Get data dir.""" + data_dir = os.path.join(data_root, str(today)) + if not os.path.exists(data_dir): + alt = max(x for x in os.listdir(data_root) if x[0].isdigit()) + print(f"Today dir ({today}) doesn't exist. Using most recent data ({alt}).") + print() + data_dir = os.path.join(data_root, alt) + return data_dir + + +class Grouped(typing.TypedDict): + """Grouped items.""" + + name: str + label: str + items: list[Item] + + +def group_items( + today: date, +) -> collections.abc.Iterator[Grouped]: + """Group items.""" + data_dir = get_data_dir(today) + files = build_file_map(data_dir) + + for name, label, filter_param in filter_params: + logger.info(f"category: {label} ({name})") + seen = set() + items = [] + for page_num, f in files[name]: + for item in parse_page(os.path.join(data_dir, f)): + if item["number"] in seen: + logger.info("duplicate", item_number=item["number"]) + continue + seen.add(item["number"]) + items.append(item) + + items.sort(key=lambda i: i["price_per_tb"]) + yield {"name": name, "label": label, "items": items} + + +def build() -> None: + """Build.""" + build_root = "/var/www/edward/docs/price_per_tb" + today = date.today() + + templates_dir = os.path.join(root_dir, "templates") + env = Environment(loader=FileSystemLoader(templates_dir)) + + data = list(group_items(today)) + index = os.path.join(build_root, "index.html") + index_template = env.get_template("index.html") + page = index_template.render(best=data, today=today) + open(index, "w").write(page) + + list_template = env.get_template("item_list.html") + for cat in data: + page = list_template.render(items=cat["items"], today=today, what=cat["label"]) + exists_or_create_dir(os.path.join(build_root, cat["name"])) + filename = os.path.join(build_root, cat["name"], "index.html") + open(filename, "w").write(page) + + +if __name__ == "__main__": + get_pages() + build() diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..398de8f --- /dev/null +++ b/templates/index.html @@ -0,0 +1,36 @@ +{% extends "page.html" %} + +{% block title %}Price per TB{% endblock %} + +{% block content %} +

{{self.title()}}

+ +List of hard drives available for sale from Newegg.com, sorted by the price per TB.

+ +Built by Edward Betts. + +Comments welcome: edward@4angle.com + +

Last updated: {{ today.strftime('%d %B %Y') }}.

+ + +{% for cat in best %} + + + + + + + +{% for hdd in cat['items'][:16] %} + + + + + + +{% endfor %} + +{% endfor %} +

{{ cat.label }}

Price
per TB
PriceSizeDrive
${{ '%.2f' | format(hdd.price_per_tb) }}${{ hdd.price }}{{ hdd.size }}{{ hdd.title }}
more
+{% endblock %} diff --git a/templates/item_list.html b/templates/item_list.html new file mode 100644 index 0000000..f67446f --- /dev/null +++ b/templates/item_list.html @@ -0,0 +1,35 @@ +{% extends "page.html" %} + +{% block title %}{{what}} sorted by price/TB{% endblock %} + +{% block content %} +

{{self.title()}}

+ +

List of {{ what }} available for sale from Newegg.com, sorted by the price per TB.

+ +

Built by Edward Betts. + +Comments welcome: edward@4angle.com

+ +

back to index

+ +

Last updated: {{ today.strftime('%d %B %Y') }}.

+ + + + + + + + +{% for hdd in items %} + + + + + + +{% endfor %} +
Price
per TB
PriceSizeDrive
${{ '%.2f' | format(hdd.price_per_tb) }}{{ hdd.price }}{{ hdd.size }}{{ hdd.title }} +
+{% endblock %} diff --git a/templates/page.html b/templates/page.html new file mode 100644 index 0000000..31fa3e4 --- /dev/null +++ b/templates/page.html @@ -0,0 +1,42 @@ + + + + + +{% block title %}{% endblock %} + + + + + + + +{% block content %}{% endblock %} + + +