"""Look up stock index on Google.""" import decimal import os import re import urllib.parse from datetime import datetime import lxml.html from playwright.sync_api import Page, Playwright, expect, sync_playwright auth_file = os.path.expanduser("~/lib/auth/google.json") data_loc = os.path.expanduser("~/lib/google_stocks") attr_map = { "52-wk high": "price_52_wk_high", "52-wk low": "price_52_wk_low", "Company Name": "company_name", "High": "price_high", "Low": "price_low", "Open": "price_open", "Prev close": "price_prev_close", "day change": "day_change", "title": "title", "subtitle": "subtitle", } def data_filename(page_type: str, ext: str = "html") -> str: """Filename to use for saving data.""" now_str = datetime.utcnow().strftime("%Y-%m-%d_%H%M%S") return os.path.join(data_loc, now_str + f"_{page_type}.{ext}") def accept_cookies(page: Page) -> None: """Check for the 'Accept all' button and click it if found.""" accept_button_selector = "button:has-text('Accept all')" accept_button = page.locator(accept_button_selector) if accept_button.is_visible(): accept_button.click() def stay_signed_out(page: Page) -> None: """Check for the 'Stay signed out' button and click it if found.""" # Selector for the 'Stay signed out' button button_selector = "text=Stay signed out" # Check for the button and click it if found stay_signed_out_button = page.locator(button_selector) if stay_signed_out_button.is_visible(): stay_signed_out_button.click() class Index: """Stock market index.""" price_52_wk_high: decimal.Decimal price_52_wk_low: decimal.Decimal company_name: str price_high: decimal.Decimal price_low: decimal.Decimal price_open_price: decimal.Decimal price_prev_close: decimal.Decimal day_change: decimal.Decimal percent_change: decimal.Decimal price: decimal.Decimal subtitle: str title: str state: str timestamp: str def __init__(self, name: str): """Init.""" self.name = name with sync_playwright() as playwright: self.run(playwright) @property def search_url(self) -> str: """Search URL.""" q = urllib.parse.quote_plus(self.name) return f"https://www.google.com/search?q={q}&gl=uk" def run(self, playwright: Playwright) -> None: """Run playwright.""" browser = playwright.chromium.launch(headless=True) context = browser.new_context(storage_state=auth_file) page = context.new_page() page.goto(self.search_url, wait_until="domcontentloaded") accept_cookies(page) stay_signed_out(page) if page.locator("#recaptcha, #captcha-form").count() > 0: captcha_url = page.url context.close() browser.close() print("Google is showing a CAPTCHA. Solve it in the browser window...", flush=True) browser = playwright.chromium.launch(headless=False) context = browser.new_context(storage_state=auth_file) page = context.new_page() page.goto(captcha_url, wait_until="domcontentloaded") page.wait_for_selector('[data-attrid="Price"]', timeout=120000) html = page.content() filename = data_filename("serp") with open(filename, "w") as out: out.write(html) self.parse_html(html) context.storage_state(path=auth_file) context.close() browser.close() def parse_html(self, html: str) -> None: """Parse HTML.""" root = lxml.html.fromstring(html) self._set_mapped_attributes(root) tag = self._find_price_tag(root) tag_text = " ".join(tag.text_content().split()) self._set_price_values(tag, tag_text) self._set_state_and_timestamp(tag_text) def _set_mapped_attributes(self, root: lxml.html.HtmlElement) -> None: for attrid_tag in root.findall(".//*[@data-attrid]"): attrid = attrid_tag.get("data-attrid") if attrid not in attr_map: continue setattr(self, attr_map[attrid], attrid_tag.text_content()) def _find_price_tag(self, root: lxml.html.HtmlElement) -> lxml.html.HtmlElement: tag = root.find('.//*[@data-attrid="Price"]') assert tag is not None return tag @staticmethod def _clean_num(s: str) -> str: return s.replace(",", "").replace("\N{MINUS SIGN}", "-").strip() def _extract_price_text(self, tag: lxml.html.HtmlElement, tag_text: str) -> str: re_number = re.compile(r"[0-9][0-9,]*\.[0-9]+") re_quote_triplet = re.compile( r"([0-9][0-9,]*\.[0-9]+)\s*([+\-\N{MINUS SIGN}][0-9][0-9,]*\.[0-9]+)\s*\(([+\-\N{MINUS SIGN}]?[0-9.]+)%\)" ) for node_text in tag.xpath('.//*[@aria-live="polite"]/text()'): m = re_number.search(node_text) if m: return m.group(0) m = re_quote_triplet.search(tag_text) assert m return m.group(1) def _set_price_values(self, tag: lxml.html.HtmlElement, tag_text: str) -> None: re_day_change = re.compile(r"([+\-\N{MINUS SIGN}][0-9][0-9,]*\.[0-9]+)") re_percent = re.compile(r"\(([+\-\N{MINUS SIGN}]?[0-9.]+)%\)") price_text = self._extract_price_text(tag, tag_text) self.price = decimal.Decimal(self._clean_num(price_text)) m = re_day_change.search(tag_text) assert m self.day_change = decimal.Decimal(self._clean_num(m.group(1))) m = re_percent.search(tag_text) assert m percent_change_text = self._clean_num(m.group(1)) percent_change = decimal.Decimal(percent_change_text) if not percent_change_text.startswith(("+", "-")) and self.day_change < 0: percent_change = -percent_change self.percent_change = percent_change def _set_state_and_timestamp(self, tag_text: str) -> None: re_timestamp = re.compile(r"([0-9]{1,2}\s+[A-Za-z]{3},\s+[0-9]{1,2}:[0-9]{2}\s+[A-Z]{2,5})") re_state_time = re.compile( r"\b(Closed|Open|Market open|Market closed|Pre-market|After hours):\s*([0-9]{1,2}\s+[A-Za-z]{3},\s+[0-9]{1,2}:[0-9]{2}\s+[A-Z]{2,5})", re.IGNORECASE, ) m = re_state_time.search(tag_text) if m: self.state = m.group(1).strip().lower() self.timestamp = m.group(2).strip() return tm = re_timestamp.search(tag_text) if not tm: return self.timestamp = tm.group(1).strip() prefix = tag_text[: tm.start()] states = ("Market closed", "Market open", "Pre-market", "After hours", "Closed", "Open") for state in states: if prefix.endswith(state) or state in prefix: self.state = state.lower() break @property def one_line(self) -> str: """Index name, price and price change.""" return f"{self.title}: {self.price} ({self.percent_change}%) {self.timestamp}"