富山県
import datetime import json import pandas as pd COUNTS_FILE = "toyama_counts.csv" PATIENTS_FILE = "toyama_patients.csv" def dumps_json(file_name, json_data): with open(file_name, "w") as fw: json.dump(json_data, fw, ensure_ascii=False, indent=4) JST = datetime.timezone(datetime.timedelta(hours=+9), "JST") # 現在の時刻 dt_now = datetime.datetime.now(JST).strftime("%Y/%m/%d %H:%M") data = {"lastUpdate": dt_now} # データ読み込み df = pd.read_csv(COUNTS_FILE) # 検査実施人数 df_insp = df.loc[:, ("年月日", "検査実施人数")].copy() df_insp.rename(columns={"年月日": "日付", "検査実施人数": "小計"}, inplace=True) data["inspection_persons"] = {"date": dt_now, "data": df_insp.to_dict(orient="recodes")} # 陽性患者数 df_pats = df.loc[:, ("年月日", "陽性人数")].copy() df_pats.rename(columns={"年月日": "日付", "陽性人数": "小計"}, inplace=True) data["patients_summary"] = {"date": dt_now, "data": df_pats.to_dict(orient="recodes")} # 一般相談件数 df_contacts = df.loc[:, ("年月日", "一般相談件数")].copy() df_contacts.rename(columns={"年月日": "日付", "一般相談件数": "小計"}, inplace=True) data["contacts"] = {"date": dt_now, "data": df_contacts.to_dict(orient="recodes")} # 帰国者・接触者相談件数 df_querents = df.loc[:, ("年月日", "帰国者相談件数")].copy() df_querents.rename(columns={"年月日": "日付", "帰国者相談件数": "小計"}, inplace=True) data["querents"] = {"date": dt_now, "data": df_querents.to_dict(orient="recodes")} # 陽性患者の属性 df_kanjya = pd.read_csv(PATIENTS_FILE, index_col="No", dtype={"年代": "object"}) df_kanjya.rename(columns={"年月日": "date"}, inplace=True) df_patients = df_kanjya.loc[:, ("date", "居住地", "年代", "性別")] data["patients"] = {"date": dt_now, "data": df_patients.to_dict(orient="recodes")} # data.json作成 dumps_json("data.json", data)
愛知県のクラスタをスクレイピング・JSON化
import datetime import json import re import pandas as pd import requests from bs4 import BeautifulSoup def dumps_json(file_name, json_data): with open(file_name, "w") as fw: json.dump(json_data, fw, ensure_ascii=False, indent=2) url = "https://www.pref.aichi.jp/site/covid19-aichi/kansensya-kensa.html" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko" } # スクレイピング r = requests.get(url, headers=headers) r.raise_for_status() soup = BeautifulSoup(r.content, "html.parser") table = soup.find("table", summary="愛知県内の発生状況") caption = table.find("caption").get_text(strip=True) # 今日 dt_now = datetime.datetime.now() # 公表日をdatetimeに変換 y = dt_now.year m, d, h = map(int, re.findall("[0-9]{1,2}", caption)) # ※時間は変更してください last_update = datetime.datetime(y, m, d, h, 0) df = pd.read_html(table.prettify())[0].drop("人数.1", axis=1) df.dropna(how="all", inplace=True) df["人数"] = df["人数"].str.rstrip("人").fillna(0).astype(int) df["うち入院"] = df["うち入院"].str.rstrip("人").fillna(0).astype(int) df.rename(columns={"Unnamed: 0": "クラスタ"}, inplace=True) df_cluster = df.loc[~df["クラスタ"].str.endswith("計"), :].copy() df_cluster["クラスタ"] = df_cluster["クラスタ"].str.replace("(.+)", "") df_cluster["クラスタ"] = df_cluster["クラスタ"].str.lstrip("○") df_cluster.set_index("クラスタ", inplace=True) cluster = { "cluster": { "data": df_cluster.to_dict(orient="dict"), "date": last_update.strftime("%Y/%m/%d %H:%M"), } } dumps_json("data_cluster.json", cluster)
愛知県の新型コロナ情報をスクレイピングしてdata.jsonを作成
import datetime import json import re from urllib.parse import urljoin import pandas as pd import requests from bs4 import BeautifulSoup import camelot url = "https://www.pref.aichi.jp/site/covid19-aichi/kansensya-kensa.html" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko", } JST = datetime.timezone(datetime.timedelta(hours=+9), "JST") class CovidDataManager: def __init__(self): dt_now = datetime.datetime.now(JST) self.data = {"lastUpdate": dt_now.strftime("%Y/%m/%d %H:%M")} self.dt_now = dt_now r = requests.get(url, headers=headers) r.raise_for_status() self.soup = BeautifulSoup(r.content, "html.parser") def main_summary(self): df_main = pd.read_csv( "https://docs.google.com/spreadsheets/d/1DdluQBSQSiACG1CaIg4K3K-HVeGGThyecRHSA84lL6I/export?format=csv&gid=0", index_col=0, header=None, ) main_sum = df_main.T.to_dict(orient="recodes")[0] self.data["main_summary"] = { "attr": "検査実施人数", "value": main_sum["検査実施人数"], "children": [ { "attr": "陽性患者数", "value": main_sum["陽性患者数"], "children": [ { "attr": "入院中", "value": main_sum["入院中"], "children": [ {"attr": "軽症・中等症", "value": main_sum["軽症・中等症"]}, {"attr": "重症", "value": main_sum["重症"]}, ], }, {"attr": "退院", "value": main_sum["退院"]}, {"attr": "転院", "value": main_sum["転院"]}, {"attr": "死亡", "value": main_sum["死亡"]}, ], } ], } # 新型コロナウイルス遺伝子検査件数 def inspections_summary(self): table = self.soup.find( "table", summary="愛知県衛生研究所及び名古屋市衛生研究所における新型コロナウイルス遺伝子検査件数" ) caption = table.find("caption").get_text(strip=True) # 公表日をdatetimeに変換 y = self.dt_now.year m, d = map(int, re.findall("[0-9]{1,2}", caption)) # ※時間は変更してください last_update = datetime.datetime(y, m, d, 23, 59) df_tmp = pd.read_html(table.prettify())[0] df = df_tmp[df_tmp["検査日"] != "計"].copy() df["備考"] = df["検査日"].where(df["検査日"].str.contains("~")) df_date = df["検査日"].str.extract( "([0-9]{1,2})月([0-9]{1,2})日((.)曜日)$", expand=True ) df_date.rename(columns={0: "月", 1: "日", 2: "曜日"}, inplace=True) df_date["月"] = df_date["月"].astype(int) df_date["日"] = df_date["日"].astype(int) df_date["date"] = df_date.apply( lambda x: pd.Timestamp( year=datetime.datetime.now().year, month=x["月"], day=x["日"] ), axis=1, ) df["検査日"] = df_date["date"].dt.strftime("%Y-%m-%d") df_insp = df.loc[:, ("検査日", "検査件数(件)")].copy() df_insp.rename(columns={"検査日": "日付", "検査件数(件)": "小計"}, inplace=True) self.data["inspections_summary"] = { "data": df_insp.to_dict(orient="recodes"), "date": last_update.strftime("%Y/%m/%d %H:%M"), } # 県内発生事例一覧 def patients(self): tag = self.soup.find("a", text=re.compile("^県内発生事例一覧")) y = self.dt_now.year m, d, _ = map(int, re.findall("[0-9]+", tag.get_text(strip=True))) last_update = datetime.datetime(y, m, d, 23, 59) link = urljoin(url, tag.get("href")) tables = camelot.read_pdf( link, pages="all", split_text=True, strip_text="\n", line_scale=40 ) df_csv = pd.concat([table.df for table in tables]) df_csv.to_csv("data.csv", index=None, header=None) def my_parser(s): y = self.dt_now.year m, d = map(int, re.findall("[0-9]{1,2}", s)) return pd.Timestamp(year=y, month=m, day=d) df_patient = pd.read_csv("data.csv", parse_dates=["発表日"], date_parser=my_parser) # patients_summary df_pts = ( df_patient["発表日"] .value_counts() .sort_index() .asfreq("D", fill_value=0) .reset_index() ) df_pts["日付"] = df_pts["index"].dt.strftime("%Y-%m-%d") df_pts.rename(columns={"発表日": "小計"}, inplace=True) df_pts.drop("index", axis=1, inplace=True) self.data["patients_summary"] = { "data": df_pts.to_dict(orient="records"), "last_update": last_update.strftime("%Y-%m-%d %H:%M"), } # patients df_patient.set_index("発表日", inplace=True) df_patient["date"] = df_patient.index.strftime("%Y-%m-%d") df_patient["short_date"] = df_patient.index.strftime("%m/%d") df_patient["w"] = (df_patient.index.dayofweek + 1) % 7 df_patient["発表日"] = df_patient.index.strftime("%Y/%m/%d %H:%M") df_patient.fillna("", inplace=True) # 不要の場合は削除 df_patient["short_date"] = df_patient.index.strftime("%m\\/%d") df_patient["No"] = df_patient["No"].astype(str) df_patient["w"] = df_patient["w"].astype(str) self.data["patients"] = { "data": df_patient.to_dict(orient="recodes"), "date": last_update.strftime("%Y/%m/%d %H:%M"), } def export_jsons(self): with open("data.json", "w") as fw: json.dump(self.data, fw, ensure_ascii=False, indent=2) if __name__ == "__main__": dm = CovidDataManager() print("---main_summary---") dm.main_summary() print("---inspections_summary---") dm.inspections_summary() print("---patients---") dm.patients() print("---export jsons---") dm.export_jsons() print("---done---")
愛知県の感染状況のPDFデータをCSVに変換
PDFのURLのスクレイピングは省略してダウンロード
PDFファイルをダウンロード
wget https://www.pref.aichi.jp/uploaded/attachment/328890.pdf -O data.pdf
!apt install python3-tk ghostscript !pip install camelot-py[cv]
import datetime import re import pandas as pd import camelot tables = camelot.read_pdf( "data.pdf", pages="1-end", split_text=True, strip_text="\n", line_scale=40 ) df_csv = pd.concat([table.df for table in tables]) df_csv.to_csv("data.csv", index=None, header=None) def my_parser(s): y = datetime.datetime.now().year m, d = map(int, re.findall("[0-9]{1,2}", s)) return pd.Timestamp(year=y, month=m, day=d) df = pd.read_csv("data.csv", index_col=0, parse_dates=["発表日"], date_parser=my_parser) df