fetch.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. #!/usr/bin/env python3
  2. """Fetch holidays from gov.cn """
  3. import argparse
  4. import json
  5. import re
  6. from datetime import date, timedelta
  7. from itertools import chain
  8. from typing import Iterator, List, Optional, Tuple
  9. import bs4
  10. import requests
  11. PAPER_EXCLUDE = [
  12. "http://www.gov.cn/zhengce/zhengceku/2014-09/29/content_9102.htm",
  13. "http://www.gov.cn/zhengce/zhengceku/2015-02/09/content_9466.htm",
  14. ]
  15. PAPER_INCLUDE = {
  16. 2015: ["http://www.gov.cn/zhengce/zhengceku/2015-05/13/content_9742.htm"]
  17. }
  18. PRE_PARSED_PAPERS = {
  19. "http://www.gov.cn/zhengce/zhengceku/2015-05/13/content_9742.htm": [
  20. {
  21. "name": "抗日战争暨世界反法西斯战争胜利70周年纪念日",
  22. "date": date(2015, 9, 3),
  23. "isOffDay": True,
  24. },
  25. {
  26. "name": "抗日战争暨世界反法西斯战争胜利70周年纪念日",
  27. "date": date(2015, 9, 4),
  28. "isOffDay": True,
  29. },
  30. {
  31. "name": "抗日战争暨世界反法西斯战争胜利70周年纪念日",
  32. "date": date(2015, 9, 5),
  33. "isOffDay": True,
  34. },
  35. {
  36. "name": "抗日战争暨世界反法西斯战争胜利70周年纪念日",
  37. "date": date(2015, 9, 6),
  38. "isOffDay": False,
  39. },
  40. ],
  41. "http://www.gov.cn/zhengce/zhengceku/2020-01/27/content_5472352.htm": [
  42. {
  43. "name": "春节",
  44. "date": date(2020, 1, 31),
  45. "isOffDay": True,
  46. },
  47. {
  48. "name": "春节",
  49. "date": date(2020, 2, 1),
  50. "isOffDay": True,
  51. },
  52. {
  53. "name": "春节",
  54. "date": date(2020, 2, 2),
  55. "isOffDay": True,
  56. },
  57. {
  58. "name": "春节",
  59. "date": date(2020, 2, 3),
  60. "isOffDay": False,
  61. },
  62. ],
  63. }
  64. def _raise_for_status_200(resp: requests.Response):
  65. resp.raise_for_status()
  66. if resp.status_code != 200:
  67. raise requests.HTTPError(
  68. "request failed: %d: %s" % (resp.status_code, resp.request.url),
  69. response=resp,
  70. )
  71. def _get_paper_urls(year: int) -> Iterator[str]:
  72. has_next_page = True
  73. page_index = 0
  74. while has_next_page:
  75. resp = requests.get(
  76. "https://sousuo.www.gov.cn/search-gov/data",
  77. params={
  78. "t": "zhengcelibrary_gw",
  79. "p": page_index,
  80. "n": 5,
  81. "q": "假期 %d" % (year,),
  82. "pcodeJiguan": "国办发明电",
  83. "puborg": "国务院办公厅",
  84. "filetype": "通知",
  85. "sort": "pubtime",
  86. },
  87. )
  88. _raise_for_status_200(resp)
  89. data = resp.json()
  90. if data["code"] == 1001:
  91. # no match
  92. return
  93. assert data["code"] == 200, "%s: %s: %s" % (
  94. resp.url,
  95. data["code"],
  96. data["msg"],
  97. )
  98. for i in data["searchVO"]["listVO"]:
  99. if str(year) in i["title"]:
  100. yield i["url"]
  101. page_index += 1
  102. has_next_page = page_index < data["searchVO"]["totalpage"]
  103. def get_paper_urls(year: int) -> List[str]:
  104. """Find year related paper urls.
  105. Args:
  106. year (int): eg. 2018
  107. Returns:
  108. List[str]: Urls, sort by publish time.
  109. """
  110. ret = [i for i in _get_paper_urls(year) if i not in PAPER_EXCLUDE]
  111. ret += PAPER_INCLUDE.get(year, [])
  112. ret.sort()
  113. if not ret and date.today().year >= year:
  114. raise RuntimeError("could not found papers for %d" % (year,))
  115. return ret
  116. def get_paper(url: str) -> str:
  117. """Extract paper text from url.
  118. Args:
  119. url (str): Paper url.
  120. Returns:
  121. str: Extracted paper text.
  122. """
  123. response = requests.get(url)
  124. _raise_for_status_200(response)
  125. response.encoding = "utf-8"
  126. soup = bs4.BeautifulSoup(response.text, features="html.parser")
  127. container = soup.find(id="UCAP-CONTENT")
  128. assert isinstance(
  129. container, bs4.Tag
  130. ), f"Can not get paper container from url: {url}"
  131. p = bs4.BeautifulSoup(
  132. container.decode().replace("<br/>", "</p><p>"), features="html.parser"
  133. ).find_all("p")
  134. ret = "\n".join((i.get_text().strip() for i in p))
  135. assert ret, f"can not get paper content from url: {url}"
  136. return ret
  137. def get_rules(paper: str) -> Iterator[Tuple[str, str]]:
  138. """Extract rules from paper.
  139. Args:
  140. paper (str): Paper text
  141. Raises:
  142. NotImplementedError: When find no rules.
  143. Returns:
  144. Iterator[Tuple[str, str]]: (name, description)
  145. """
  146. lines: list = paper.splitlines()
  147. lines = sorted(set(lines), key=lines.index)
  148. count = 0
  149. for i in chain(get_normal_rules(lines), get_patch_rules(lines)):
  150. count += 1
  151. yield i
  152. if not count:
  153. raise NotImplementedError(lines)
  154. def get_normal_rules(lines: Iterator[str]) -> Iterator[Tuple[str, str]]:
  155. """Get normal holiday rule for a year
  156. Args:
  157. lines (Iterator[str]): paper content
  158. Returns:
  159. Iterator[Tuple[str, str]]: (name, description)
  160. """
  161. for i in lines:
  162. match = re.match(r"[一二三四五六七八九十]、(.+?):(.+)", i)
  163. if match:
  164. yield match.groups()
  165. def get_patch_rules(lines: Iterator[str]) -> Iterator[Tuple[str, str]]:
  166. """Get holiday patch rule for existed holiday
  167. Args:
  168. lines (Iterator[str]): paper content
  169. Returns:
  170. Iterator[Tuple[str, str]]: (name, description)
  171. """
  172. name = None
  173. for i in lines:
  174. match = re.match(r".*\d+年([^和、]{2,})(?:假期|放假).*安排", i)
  175. if match:
  176. name = match.group(1)
  177. if not name:
  178. continue
  179. match = re.match(r"^[一二三四五六七八九十]、(.+)$", i)
  180. if not match:
  181. continue
  182. description = match.group(1)
  183. if re.match(r".*\d+月\d+日.*", description):
  184. yield name, description
  185. def _cast_int(value):
  186. return int(value) if value else None
  187. class DescriptionParser:
  188. """Parser for holiday shift description."""
  189. def __init__(self, description: str, year: int):
  190. self.description = description
  191. self.year = year
  192. self.date_history = list()
  193. def parse(self) -> Iterator[dict]:
  194. """Generator for description parsing result.
  195. Args:
  196. year (int): Context year
  197. """
  198. del self.date_history[:]
  199. for i in re.split("[,。;]", self.description):
  200. for j in SentenceParser(self, i).parse():
  201. yield j
  202. if not self.date_history:
  203. raise NotImplementedError(self.description)
  204. def get_date(self, year: Optional[int], month: Optional[int], day: int) -> date:
  205. """Get date in context.
  206. Args:
  207. year (Optional[int]): year
  208. month (int): month
  209. day (int): day
  210. Returns:
  211. date: Date result
  212. """
  213. assert day, "No day specified"
  214. # Special case: month inherit
  215. if month is None:
  216. month = self.date_history[-1].month
  217. # Special case: 12 month may mean previous year
  218. if (
  219. year is None
  220. and month == 12
  221. and self.date_history
  222. and max(self.date_history) < date(year=self.year, month=2, day=1)
  223. ):
  224. year = self.year - 1
  225. year = year or self.year
  226. return date(year=year, month=month, day=day)
  227. class SentenceParser:
  228. """Parser for holiday shift description sentence."""
  229. def __init__(self, parent: DescriptionParser, sentence):
  230. self.parent = parent
  231. self.sentence = sentence
  232. def extract_dates(self, text: str) -> Iterator[date]:
  233. """Extract date from text.
  234. Args:
  235. text (str): Text to extract
  236. Returns:
  237. Iterator[date]: Extracted dates.
  238. """
  239. count = 0
  240. text = text.replace("(", "(").replace(")", ")")
  241. for i in chain(
  242. *(method(self, text) for method in self.date_extraction_methods)
  243. ):
  244. count += 1
  245. is_seen = i in self.parent.date_history
  246. self.parent.date_history.append(i)
  247. if is_seen:
  248. continue
  249. yield i
  250. if not count:
  251. raise NotImplementedError(text)
  252. def _extract_dates_1(self, value: str) -> Iterator[date]:
  253. match = re.findall(r"(?:(\d+)年)?(?:(\d+)月)?(\d+)日", value)
  254. for groups in match:
  255. groups = [_cast_int(i) for i in groups]
  256. assert len(groups) == 3, groups
  257. yield self.parent.get_date(year=groups[0], month=groups[1], day=groups[2])
  258. def _extract_dates_2(self, value: str) -> Iterator[date]:
  259. value = re.sub(r"(.+?)", "", value)
  260. match = re.findall(
  261. r"(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:至|-|—)(?:(\d+)年)?(?:(\d+)月)?(\d+)日", value
  262. )
  263. for groups in match:
  264. groups = [_cast_int(i) for i in groups]
  265. assert len(groups) == 6, groups
  266. start = self.parent.get_date(year=groups[0], month=groups[1], day=groups[2])
  267. end = self.parent.get_date(year=groups[3], month=groups[4], day=groups[5])
  268. for i in range((end - start).days + 1):
  269. yield start + timedelta(days=i)
  270. def _extract_dates_3(self, value: str) -> Iterator[date]:
  271. value = re.sub(r"(.+?)", "", value)
  272. match = re.findall(
  273. r"(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:([^)]+))?"
  274. r"(?:、(?:(\d+)年)?(?:(\d+)月)?(\d+)日(?:([^)]+))?)+",
  275. value,
  276. )
  277. for groups in match:
  278. groups = [_cast_int(i) for i in groups]
  279. assert not (len(groups) % 3), groups
  280. for i in range(0, len(groups), 3):
  281. yield self.parent.get_date(
  282. year=groups[i], month=groups[i + 1], day=groups[i + 2]
  283. )
  284. date_extraction_methods = [_extract_dates_1, _extract_dates_2, _extract_dates_3]
  285. def parse(self) -> Iterator[dict]:
  286. """Parse days with memory
  287. Args:
  288. memory (set): Date memory
  289. Returns:
  290. Iterator[dict]: Days without name field.
  291. """
  292. for method in self.parsing_methods:
  293. for i in method(self):
  294. yield i
  295. def _parse_rest_1(self):
  296. match = re.match(r"(.+)(放假|补休|调休|公休)+(?:\d+天)?$", self.sentence)
  297. if match:
  298. for i in self.extract_dates(match.group(1)):
  299. yield {"date": i, "isOffDay": True}
  300. def _parse_work_1(self):
  301. match = re.match("(.+)上班$", self.sentence)
  302. if match:
  303. for i in self.extract_dates(match.group(1)):
  304. yield {"date": i, "isOffDay": False}
  305. def _parse_shift_1(self):
  306. match = re.match("(.+)调至(.+)", self.sentence)
  307. if match:
  308. for i in self.extract_dates(match.group(1)):
  309. yield {"date": i, "isOffDay": False}
  310. for i in self.extract_dates(match.group(2)):
  311. yield {"date": i, "isOffDay": True}
  312. parsing_methods = [
  313. _parse_rest_1,
  314. _parse_work_1,
  315. _parse_shift_1,
  316. ]
  317. def parse_paper(year: int, url: str) -> Iterator[dict]:
  318. """Parse one paper
  319. Args:
  320. year (int): Year
  321. url (str): Paper url
  322. Returns:
  323. Iterator[dict]: Days
  324. """
  325. if url in PRE_PARSED_PAPERS:
  326. yield from PRE_PARSED_PAPERS[url]
  327. return
  328. paper = get_paper(url)
  329. rules = get_rules(paper)
  330. ret = (
  331. {"name": name, **i}
  332. for name, description in rules
  333. for i in DescriptionParser(description, year).parse()
  334. )
  335. try:
  336. for i in ret:
  337. yield i
  338. except NotImplementedError as ex:
  339. raise RuntimeError("Can not parse paper", url) from ex
  340. def fetch_holiday(year: int):
  341. """Fetch holiday data."""
  342. papers = get_paper_urls(year)
  343. days = dict()
  344. for k in (j for i in papers for j in parse_paper(year, i)):
  345. days[k["date"]] = k
  346. return {
  347. "year": year,
  348. "papers": papers,
  349. "days": sorted(days.values(), key=lambda x: x["date"]),
  350. }
  351. def main():
  352. parser = argparse.ArgumentParser()
  353. parser.add_argument("year", type=int)
  354. args = parser.parse_args()
  355. year = args.year
  356. print(
  357. json.dumps(
  358. fetch_holiday(year), indent=4, ensure_ascii=False, cls=CustomJSONEncoder
  359. )
  360. )
  361. class CustomJSONEncoder(json.JSONEncoder):
  362. """Custom json encoder."""
  363. def default(self, o):
  364. # pylint:disable=method-hidden
  365. if isinstance(o, date):
  366. return o.isoformat()
  367. return super().default(o)
  368. if __name__ == "__main__":
  369. main()