123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436 |
- #!/usr/bin/env python3
- """Fetch holidays from gov.cn """
- import argparse
- import json
- import re
- from datetime import date, timedelta
- from itertools import chain
- from typing import Iterator, List, Optional, Tuple
- import bs4
- import requests
- SEARCH_URL = "http://sousuo.gov.cn/s.htm"
- PAPER_EXCLUDE = [
- "http://www.gov.cn/zhengce/content/2014-09/29/content_9102.htm",
- "http://www.gov.cn/zhengce/content/2015-02/09/content_9466.htm",
- ]
- PAPER_INCLUDE = {
- 2015: ["http://www.gov.cn/zhengce/content/2015-05/13/content_9742.htm"]
- }
- PRE_PARSED_PAPERS = {
- "http://www.gov.cn/zhengce/content/2015-05/13/content_9742.htm": [
- {
- "name": "抗日战争暨世界反法西斯战争胜利70周年纪念日",
- "date": date(2015, 9, 3),
- "isOffDay": True,
- },
- {
- "name": "抗日战争暨世界反法西斯战争胜利70周年纪念日",
- "date": date(2015, 9, 4),
- "isOffDay": True,
- },
- {
- "name": "抗日战争暨世界反法西斯战争胜利70周年纪念日",
- "date": date(2015, 9, 5),
- "isOffDay": True,
- },
- {
- "name": "抗日战争暨世界反法西斯战争胜利70周年纪念日",
- "date": date(2015, 9, 6),
- "isOffDay": False,
- },
- ],
- "http://www.gov.cn/zhengce/content/2020-01/27/content_5472352.htm": [
- {
- "name": "春节",
- "date": date(2020, 1, 31),
- "isOffDay": True,
- },
- {
- "name": "春节",
- "date": date(2020, 2, 1),
- "isOffDay": True,
- },
- {
- "name": "春节",
- "date": date(2020, 2, 2),
- "isOffDay": True,
- },
- {
- "name": "春节",
- "date": date(2020, 2, 3),
- "isOffDay": False,
- },
- ],
- }
- def _raise_for_status_200(resp: requests.Response):
- resp.raise_for_status()
- if resp.status_code != 200:
- raise requests.HTTPError(
- "request failed: %d: %s" % (resp.status_code, resp.request.url),
- response=resp,
- )
- def get_paper_urls(year: int) -> List[str]:
- """Find year related paper urls.
- Args:
- year (int): eg. 2018
- Returns:
- List[str]: Urls, newlest first.
- """
- resp = requests.get(
- SEARCH_URL,
- params={
- "t": "paper",
- "advance": "true",
- "title": year,
- "q": "假期",
- "pcodeJiguan": "国办发明电",
- "puborg": "国务院办公厅",
- },
- )
- _raise_for_status_200(resp)
- ret = re.findall(
- r'<li class="res-list".*?<a href="(.+?)".*?</li>', resp.text, flags=re.S
- )
- ret = [i for i in ret if i not in PAPER_EXCLUDE]
- ret += PAPER_INCLUDE.get(year, [])
- ret.sort()
- if not ret and date.today().year >= year:
- raise RuntimeError("could not found papers for %d" % year)
- return ret
- def get_paper(url: str) -> str:
- """Extract paper text from url.
- Args:
- url (str): Paper url.
- Returns:
- str: Extracted paper text.
- """
- assert re.match(
- r"http://www.gov.cn/zhengce/content/\d{4}-\d{2}/\d{2}/content_\d+.htm", url
- ), "Site changed, need human verify"
- response = requests.get(url)
- _raise_for_status_200(response)
- response.encoding = "utf-8"
- soup = bs4.BeautifulSoup(response.text, features="html.parser")
- container = soup.find("td", class_="b12c")
- assert container, f"Can not get paper container from url: {url}"
- ret = container.get_text().replace("\u3000\u3000", "\n")
- assert ret, f"Can not get paper content from url: {url}"
- return ret
- def get_rules(paper: str) -> Iterator[Tuple[str, str]]:
- """Extract rules from paper.
- Args:
- paper (str): Paper text
- Raises:
- NotImplementedError: When find no rules.
- Returns:
- Iterator[Tuple[str, str]]: (name, description)
- """
- lines: list = paper.splitlines()
- lines = sorted(set(lines), key=lines.index)
- count = 0
- for i in chain(get_normal_rules(lines), get_patch_rules(lines)):
- count += 1
- yield i
- if not count:
- raise NotImplementedError(lines)
- def get_normal_rules(lines: Iterator[str]) -> Iterator[Tuple[str, str]]:
- """Get normal holiday rule for a year
- Args:
- lines (Iterator[str]): paper content
- Returns:
- Iterator[Tuple[str, str]]: (name, description)
- """
- for i in lines:
- match = re.match(r"[一二三四五六七八九十]、(.+?):(.+)", i)
- if match:
- yield match.groups()
- def get_patch_rules(lines: Iterator[str]) -> Iterator[Tuple[str, str]]:
- """Get holiday patch rule for existed holiday
- Args:
- lines (Iterator[str]): paper content
- Returns:
- Iterator[Tuple[str, str]]: (name, description)
- """
- name = None
- for i in lines:
- match = re.match(r".*\d+年([^和、]{2,})(?:假期|放假).*安排", i)
- if match:
- name = match.group(1)
- if not name:
- continue
- match = re.match(r"^[一二三四五六七八九十]、(.+)$", i)
- if not match:
- continue
- description = match.group(1)
- if re.match(r".*\d+月\d+日.*", description):
- yield name, description
- def _cast_int(value):
- return int(value) if value else None
- class DescriptionParser:
- """Parser for holiday shift description."""
- def __init__(self, description: str, year: int):
- self.description = description
- self.year = year
- self.date_history = list()
- def parse(self) -> Iterator[dict]:
- """Generator for description parsing result.
- Args:
- year (int): Context year
- """
- del self.date_history[:]
- for i in re.split("[,。;]", self.description):
- for j in SentenceParser(self, i).parse():
- yield j
- if not self.date_history:
- raise NotImplementedError(self.description)
- def get_date(self, year: Optional[int], month: Optional[int], day: int) -> date:
- """Get date in context.
- Args:
- year (Optional[int]): year
- month (int): month
- day (int): day
- Returns:
- date: Date result
- """
- assert day, "No day specified"
- # Special case: month inherit
- if month is None:
- month = self.date_history[-1].month
- # Special case: 12 month may mean previous year
- if (
- year is None
- and month == 12
- and self.date_history
- and max(self.date_history) < date(year=self.year, month=2, day=1)
- ):
- year = self.year - 1
- year = year or self.year
- return date(year=year, month=month, day=day)
- class SentenceParser:
- """Parser for holiday shift description sentence."""
- def __init__(self, parent: DescriptionParser, sentence):
- self.parent = parent
- self.sentence = sentence
- def extract_dates(self, text: str) -> Iterator[date]:
- """Extract date from text.
- Args:
- text (str): Text to extract
- Returns:
- Iterator[date]: Extracted dates.
- """
- count = 0
- text = text.replace("(", "(").replace(")", ")")
- for i in chain(
- *(method(self, text) for method in self.date_extraction_methods)
- ):
- count += 1
- is_seen = i in self.parent.date_history
- self.parent.date_history.append(i)
- if is_seen:
- continue
- yield i
- if not count:
- raise NotImplementedError(text)
- def _extract_dates_1(self, value: str) -> Iterator[date]:
- match = re.findall(r"(?:(\d+)年)?(?:(\d+)月)?(\d+)日", value)
- for groups in match:
- groups = [_cast_int(i) for i in groups]
- assert len(groups) == 3, groups
- yield self.parent.get_date(year=groups[0], month=groups[1], day=groups[2])
- def _extract_dates_2(self, value: str) -> Iterator[date]:
- value = re.sub(r"(.+?)", "", value)
- match = re.findall(
- r"(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:至|-|—)(?:(\d+)年)?(?:(\d+)月)?(\d+)日", value
- )
- for groups in match:
- groups = [_cast_int(i) for i in groups]
- assert len(groups) == 6, groups
- start = self.parent.get_date(year=groups[0], month=groups[1], day=groups[2])
- end = self.parent.get_date(year=groups[3], month=groups[4], day=groups[5])
- for i in range((end - start).days + 1):
- yield start + timedelta(days=i)
- def _extract_dates_3(self, value: str) -> Iterator[date]:
- value = re.sub(r"(.+?)", "", value)
- match = re.findall(
- r"(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:([^)]+))?"
- r"(?:、(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:([^)]+))?)+",
- value,
- )
- for groups in match:
- groups = [_cast_int(i) for i in groups]
- assert not (len(groups) % 3), groups
- for i in range(0, len(groups), 3):
- yield self.parent.get_date(
- year=groups[i], month=groups[i + 1], day=groups[i + 2]
- )
- date_extraction_methods = [_extract_dates_1, _extract_dates_2, _extract_dates_3]
- def parse(self) -> Iterator[dict]:
- """Parse days with memory
- Args:
- memory (set): Date memory
- Returns:
- Iterator[dict]: Days without name field.
- """
- for method in self.parsing_methods:
- for i in method(self):
- yield i
- def _parse_rest_1(self):
- match = re.match(r"(.+)(放假|补休|调休|公休)+(?:\d+天)?$", self.sentence)
- if match:
- for i in self.extract_dates(match.group(1)):
- yield {"date": i, "isOffDay": True}
- def _parse_work_1(self):
- match = re.match("(.+)上班$", self.sentence)
- if match:
- for i in self.extract_dates(match.group(1)):
- yield {"date": i, "isOffDay": False}
- def _parse_shift_1(self):
- match = re.match("(.+)调至(.+)", self.sentence)
- if match:
- for i in self.extract_dates(match.group(1)):
- yield {"date": i, "isOffDay": False}
- for i in self.extract_dates(match.group(2)):
- yield {"date": i, "isOffDay": True}
- parsing_methods = [
- _parse_rest_1,
- _parse_work_1,
- _parse_shift_1,
- ]
- def parse_paper(year: int, url: str) -> Iterator[dict]:
- """Parse one paper
- Args:
- year (int): Year
- url (str): Paper url
- Returns:
- Iterator[dict]: Days
- """
- if url in PRE_PARSED_PAPERS:
- yield from PRE_PARSED_PAPERS[url]
- return
- paper = get_paper(url)
- rules = get_rules(paper)
- ret = (
- {"name": name, **i}
- for name, description in rules
- for i in DescriptionParser(description, year).parse()
- )
- try:
- for i in ret:
- yield i
- except NotImplementedError as ex:
- raise RuntimeError("Can not parse paper", url) from ex
- def fetch_holiday(year: int):
- """Fetch holiday data."""
- papers = get_paper_urls(year)
- days = dict()
- for k in (j for i in papers for j in parse_paper(year, i)):
- days[k["date"]] = k
- return {
- "year": year,
- "papers": papers,
- "days": sorted(days.values(), key=lambda x: x["date"]),
- }
- def main():
- parser = argparse.ArgumentParser()
- parser.add_argument("year", type=int)
- args = parser.parse_args()
- year = args.year
- print(
- json.dumps(
- fetch_holiday(year), indent=4, ensure_ascii=False, cls=CustomJSONEncoder
- )
- )
- class CustomJSONEncoder(json.JSONEncoder):
- """Custom json encoder."""
- def default(self, o):
- # pylint:disable=method-hidden
- if isinstance(o, date):
- return o.isoformat()
- return super().default(o)
- if __name__ == "__main__":
- main()
|