diff --git a/google_stocks/__init__.py b/google_stocks/__init__.py index 24a6cbb..444bacd 100644 --- a/google_stocks/__init__.py +++ b/google_stocks/__init__.py @@ -9,6 +9,7 @@ from datetime import datetime import lxml.html from playwright.sync_api import Page, Playwright, expect, sync_playwright + auth_file = os.path.expanduser("~/lib/auth/google.json") data_loc = os.path.expanduser("~/lib/google_stocks") @@ -95,9 +96,10 @@ class Index: accept_cookies(page) stay_signed_out(page) - expect(page.get_by_text("Market Summary")).to_be_visible() + expect(page.locator("#center_col")).to_be_visible() html = page.content() + context.storage_state(path=auth_file) filename = data_filename("serp") with open(filename, "w") as out: out.write(html) @@ -113,36 +115,88 @@ class Index: def parse_html(self, html: str) -> None: """Parse HTML.""" root = lxml.html.fromstring(html) + self._set_mapped_attributes(root) - re_percent_change = re.compile(r" *\(([0-9.]+)%\) *") + tag = self._find_price_tag(root) + tag_text = " ".join(tag.text_content().split()) + self._set_price_values(tag, tag_text) + self._set_state_and_timestamp(tag_text) + def _set_mapped_attributes(self, root: lxml.html.HtmlElement) -> None: for attrid_tag in root.findall(".//*[@data-attrid]"): attrid = attrid_tag.get("data-attrid") if attrid not in attr_map: continue setattr(self, attr_map[attrid], attrid_tag.text_content()) + def _find_price_tag(self, root: lxml.html.HtmlElement) -> lxml.html.HtmlElement: tag = root.find('.//*[@data-attrid="Price"]') assert tag is not None + return tag - assert tag[0] is not None and tag[1] is not None - self.price = decimal.Decimal(tag[0].text_content().replace(",", "").strip()) - percent_change_str = tag[1][0].text_content().strip() - percent_change_str = percent_change_str.replace("\N{MINUS SIGN}", "-") + @staticmethod + def _clean_num(s: str) -> str: + return s.replace(",", "").replace("\N{MINUS SIGN}", "-").strip() - assert tag[2][0][0].text and tag[2][0][1].text + def _extract_price_text(self, tag: lxml.html.HtmlElement, tag_text: str) -> str: + re_number = re.compile(r"[0-9][0-9,]*\.[0-9]+") + re_quote_triplet = re.compile( + r"([0-9][0-9,]*\.[0-9]+)\s*([+\-\N{MINUS SIGN}][0-9][0-9,]*\.[0-9]+)\s*\(([+\-\N{MINUS SIGN}]?[0-9.]+)%\)" + ) - self.state = tag[2][0][0].text.rstrip(": ").lower() - self.timestamp = tag[2][0][1].text + for node_text in tag.xpath('.//*[@aria-live="polite"]/text()'): + m = re_number.search(node_text) + if m: + return m.group(0) - self.day_change = decimal.Decimal(percent_change_str) - m = re_percent_change.match(tag[1][1].text_content()) + m = re_quote_triplet.search(tag_text) assert m - percent_change = decimal.Decimal(m.group(1)) - if percent_change_str[0] == "-": + return m.group(1) + + def _set_price_values(self, tag: lxml.html.HtmlElement, tag_text: str) -> None: + re_day_change = re.compile(r"([+\-\N{MINUS SIGN}][0-9][0-9,]*\.[0-9]+)") + re_percent = re.compile(r"\(([+\-\N{MINUS SIGN}]?[0-9.]+)%\)") + + price_text = self._extract_price_text(tag, tag_text) + self.price = decimal.Decimal(self._clean_num(price_text)) + + m = re_day_change.search(tag_text) + assert m + self.day_change = decimal.Decimal(self._clean_num(m.group(1))) + + m = re_percent.search(tag_text) + assert m + percent_change_text = self._clean_num(m.group(1)) + percent_change = decimal.Decimal(percent_change_text) + if not percent_change_text.startswith(("+", "-")) and self.day_change < 0: percent_change = -percent_change self.percent_change = percent_change + def _set_state_and_timestamp(self, tag_text: str) -> None: + re_timestamp = re.compile(r"([0-9]{1,2}\s+[A-Za-z]{3},\s+[0-9]{1,2}:[0-9]{2}\s+[A-Z]{2,5})") + re_state_time = re.compile( + r"\b(Closed|Open|Market open|Market closed|Pre-market|After hours):\s*([0-9]{1,2}\s+[A-Za-z]{3},\s+[0-9]{1,2}:[0-9]{2}\s+[A-Z]{2,5})", + re.IGNORECASE, + ) + + m = re_state_time.search(tag_text) + if m: + self.state = m.group(1).strip().lower() + self.timestamp = m.group(2).strip() + return + + tm = re_timestamp.search(tag_text) + if not tm: + return + + self.timestamp = tm.group(1).strip() + prefix = tag_text[: tm.start()] + states = ("Market closed", "Market open", "Pre-market", "After hours", "Closed", "Open") + for state in states: + if prefix.endswith(state) or state in prefix: + self.state = state.lower() + break + @property def one_line(self) -> str: """Index name, price and price change."""