長野県のコロナウイルス感染症の状況をPDFからスクレイピング

!apt install python3-tk ghostscript
!pip install camelot-py[cv]

!pip install jaconv
import datetime
import pathlib
import re
from urllib.parse import urljoin

import camelot
import jaconv
import requests
from bs4 import BeautifulSoup

def fetch_file(url, dir="."):

    r = requests.get(url)
    r.raise_for_status()

    p = pathlib.Path(dir, pathlib.PurePath(url).name)
    p.parent.mkdir(parents=True, exist_ok=True)

    with p.open(mode="wb") as fw:
        fw.write(r.content)
    return p

url = "https://www.pref.nagano.lg.jp/hoken-shippei/kenko/kenko/kansensho/joho/corona-doko.html"

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko"
}

r = requests.get(url, headers=headers)
r.raise_for_status()

soup = BeautifulSoup(r.content, "html.parser")

tag = soup.find("a", text=re.compile("^グラフPDFデータ"), href=re.compile(".pdf"))
link = urljoin(url, tag.get("href"))

p = fetch_file(link)

df = camelot.read_pdf(str(p), pages="1", flavor="stream")[0].df

df

df1 = df[~(df[0] + df[1]).str.startswith("・")]

temp = []

for _, item in df1.iloc[2:].iteritems():
    s = "".join(item.str.cat(sep="").split())
    temp.append(jaconv.z2h(s))

text = "".join(temp)

data = {}

for i in re.finditer(r"(検査実施人数|陰性|陽性者数(累積)|入院中|重症|退院等|死亡)([0-9,]+)人", text):
    data[i.group(1)] = int(i.group(2).replace(",", ""))

m = re.search("うち([0-9,]+)名", text)

if m:
    data["無症状病原体保有者"] = int(m.group(1).replace(",", ""))

txt = jaconv.z2h(df.iloc[1].str.cat(sep=""), kana=False, digit=True, ascii=True)

m_up = re.search("(\d{1,2})月(\d{1,2})日 *(\d{1,2})時現在", txt)

if m_up:
    month, day, hour = map(int, m_up.groups())
    dt_now = datetime.datetime(2020, month, day, hour)
else:
    dt_now = datetime.datetime.now()

data["更新日時"] = dt_now.isoformat()

data

愛媛県内の状況のPDFから検査陽性者の状況をスクレイピング

!apt install python3-tk ghostscript
!pip install camelot-py[cv]

!pip install jaconv
import datetime
import re
import pathlib

import camelot
import jaconv
import requests

def fetch_file(url, dir="."):

    r = requests.get(url)
    r.raise_for_status()

    p = pathlib.Path(dir, pathlib.PurePath(url).name)
    p.parent.mkdir(parents=True, exist_ok=True)

    with p.open(mode="wb") as fw:
        fw.write(r.content)
    return p

p = fetch_file("https://www.pref.ehime.jp/h25500/kansen/documents/kennai_link.pdf")

df2 = camelot.read_pdf(str(p), pages="2", flavor="stream")[0].df.T

data = {}

for _, item in df2.iterrows():
    k = "".join(item.iloc[2:-1].str.cat(sep="").split())
    v = item.iloc[-1].rstrip("人")
    data[k] = int(v)

txt = jaconv.z2h(df2[1].str.cat(sep=""), kana=False, digit=True, ascii=True)

m = re.search("(\d{1,2})月(\d{1,2})日 *(\d{1,2})時現在", txt)

if m:
    month, day, hour = map(int, m.groups())
    dt_now = datetime.datetime(2020, month, day, hour)
else:
    dt_now = datetime.datetime.now()

data["更新日時"] = dt_now.isoformat()

data

PythonからPostでGASからスプレッドシートにデータ追加

スプレッドシート

ウェブアプリケーションとして導入

実行:自分

アクセスできるユーザー:全員

function doPost(e) {
  var ss       = SpreadsheetApp.getActiveSpreadsheet();
  var sheet    = ss.getSheetByName('シート1');
  var PostData = JSON.parse(e.postData.contents);

  // 行の最後に値を追加
  sheet.appendRow(PostData.data);
}

Python

import json

url = ""

headers = {"Content-Type": "application/json"}

json_data = json.dumps({"data": [dt_now.isoformat()] + data})

requests.post(url, json_data, headers=headers)

愛知県の検査陽性者の状況のjpegからOCRでスクレイピング(表抽出・縦線除去)

!add-apt-repository ppa:alex-p/tesseract-ocr -y

!apt update

!apt install tesseract-ocr
!apt install libtesseract-dev

!tesseract -v

!apt install tesseract-ocr-jpn  tesseract-ocr-jpn-vert
!apt install tesseract-ocr-script-jpan tesseract-ocr-script-jpan-vert

!tesseract --list-langs
!pip install pytesseract
import pathlib
import re
from urllib.parse import urljoin

import numpy as np
import requests
from bs4 import BeautifulSoup

import cv2
import pytesseract
from google.colab.patches import cv2_imshow

# スクレイピング
url = "https://www.pref.aichi.jp/site/covid19-aichi/"

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko",
}

r = requests.get(url, headers=headers)
r.raise_for_status()

soup = BeautifulSoup(r.content, "html5lib")
src = soup.find("img", alt=re.compile("検査陽性者$")).get("src")

link = urljoin(url, src)
print(link)

# ダウンロード
def get_file(url, dir="."):

    r = requests.get(url)

    p = pathlib.Path(dir, pathlib.PurePath(url).name)
    p.parent.mkdir(parents=True, exist_ok=True)

    with p.open(mode="wb") as fw:
        fw.write(r.content)

    return p

jpg_path = get_file(link)

# 最新ファイル
src = cv2.imread(str(jpg_path))[2:-2, 2:-2]

# 過去用
src = cv2.imread("2020082918.jpg")[2:-2, 2:-2]

# グレー
gray = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY)

kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
dilated = cv2.dilate(gray, kernel, iterations=1)
diff = cv2.absdiff(dilated, gray)

contour = cv2.bitwise_not(diff)

# JPEGノイズのグレー除去
contour[contour > 200] = 255
# contour[contour < 100] = 0

# リサイズ(横1200固定)
h, w = contour.shape[:2]
wide = int(1200 / w * h)

large = cv2.resize(contour, (1200, wide))

# 楕円形カーネル
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))

# モルフォロジー勾配(物体の境界線)
grad = cv2.morphologyEx(large, cv2.MORPH_GRADIENT, kernel)

# 二値化
_, bw = cv2.threshold(grad, 0.0, 255.0, cv2.THRESH_BINARY | cv2.THRESH_OTSU)

# 矩形カーネル
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (9, 1))

# ノイズ除去
connected = cv2.morphologyEx(bw, cv2.MORPH_CLOSE, kernel)

# cv2.RETR_EXTERNAL 輪郭
# cv2.CHAIN_APPROX_NONE 輪郭全点の情報を保持

contours, hierarchy = cv2.findContours(
    connected.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE
)

# マスク用
mask = np.zeros(bw.shape, dtype=np.uint8)

rects = []

for idx in range(len(contours)):
    # 外接矩形
    x, y, w, h = cv2.boundingRect(contours[idx])

    mask[y : y + h, x : x + w] = 0

    # 輪郭を描画
    cv2.drawContours(mask, contours, idx, (255, 255, 255), -1)

    # 面積割合
    r = float(cv2.countNonZero(mask[y : y + h, x : x + w])) / (w * h)

    # 面積、縦・横長さ(表のみ抽出)
    if r > 0.45 and w > 850 and h > 100:
        rects.append((x, x + w, y, y + h))

# Y下、X左でソート
rects = sorted(rects, key=lambda x: (x[3], x[0]))

# 個数
print(len(rects))

print(rects)

# 一番上の表座標
x1, x2, y1, y2 = rects[0]

# 日付部分を切り出し
dst = large[0:y1, 700:-1].copy()
cv2_imshow(dst)

txt = (
    pytesseract.image_to_string(dst, lang="jpn", config="--psm 6")
    .strip()
    .replace(".", "")
    .replace(",", "")
    .replace(" ", "")
)
print(txt)

# 一番上の表の下23%

y_crop = int((y2 - y1) * 0.23)

# 切り出し
dst = large[y1:y2, x1:x2][-y_crop:-5, 5:-5].copy()

cv2_imshow(dst)

edges = cv2.Canny(dst, 100, 200, apertureSize=3)
lines = cv2.HoughLines(edges, 1, np.pi / 2, 25)

# 縦線削除
# theta 角度
for line in lines:
    for rho, theta in line:

        if theta == 0:

            a = np.cos(theta)
            # b = np.sin(theta)

            x0 = int(a * rho)

            x1, x2 = x0, x0
            y1, y2 = 100, -100

            cv2.line(dst, (x1, y1), (x2, y2), (255, 255, 255), 3)

def data_check(text):
    print(text)

    data = list(map(int, re.findall("\d+", text)))
    print(data)

    if(len(data) == 12):
        if data[2] == data[3] + data[4] + data[5]:
            if data[1] == data[2] + data[6] + data[7] + data[8] + data[9] + data[10] + data[11]:
                print("OK")
            else:
                print("陽性者数の集計があいません")
        else:
            print("入院の集計があいません")
    else:
        print("データ数が足りません")

txt = (
    pytesseract.image_to_string(dst, lang="jpn", config="--psm 3")
    .strip()
    .replace(".", "")
    .replace(",", "")
)
data_check(txt)

txt = (
    pytesseract.image_to_string(dst, lang="jpn", config="--psm 6")
    .strip()
    .replace(".", "")
    .replace(",", "")
)
data_check(txt)

cv2_imshow(dst)

txt = (
    pytesseract.image_to_string(dst, lang="jpn", config="--psm 11")
    .strip()
    .replace(".", "")
    .replace(",", "")
)
data_check(txt)

cv2.imwrite("main.png", dst)

# 座標
x1, x2, y1, y2 = rects[1]
x3, x4, y3, y4 = rects[2]

dst = large[y2:y3, 0:-1].copy()
cv2_imshow(dst)

txt = (
    pytesseract.image_to_string(dst, lang="jpn", config="--psm 6")
    .strip()
    .replace(".", "")
    .replace(",", "")
    .replace(" ", "")
)
print(txt)

img = cv2.resize(src, (1200, wide))

for idx, rect in enumerate(rects):

    x1, x2, y1, y2 = rect

    color = np.random.randint(0, 255, 3).tolist()
    cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
    cv2.putText(img, str(idx), (x2, y2), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 3)
    
cv2_imshow(img)

マイナンバーカード交付状況のExcelファイルをデータラングリング

csv変換時に小数点がおかしくなるので

float_format="%.1f"

で小数点一桁にするが、文字列扱いになるため

quoting=csv.QUOTE_MINIMAL

でダブルクォーテーションを外す

import csv
import datetime

import pandas as pd

def df_conv(df, col_name, col_1="人口", col_2="交付枚数"):

    df.set_axis(col_name, axis=1, inplace=True)

    population_date = df.iloc[-1][col_1].strftime("%Y/%m/%d")
    delivery_date = df.iloc[-1][col_2].strftime("%Y/%m/%d")

    df["人口算出基準日"] = population_date
    df["交付枚数算出基準日"] = delivery_date
    df.insert(0, "算出基準日", delivery_date)

    return df.iloc[:-1]

df0 = pd.read_excel(
    "https://www.soumu.go.jp/main_content/000703058.xlsx", sheet_name=1
).sort_index(ascending=False)

df0

dfg = df0.groupby((df0["Unnamed: 0"] == "時点").cumsum())

# 市区町村別

df1 = df_conv(
    dfg.get_group(1)
    .sort_index()
    .dropna(axis=1, how="all")
    .dropna(how="all")
    .iloc[2:]
    .reset_index(drop=True),
    ["都道府県名", "市区町村名", "総数(人口)", "交付枚数", "人口に対する交付枚数率"],
    "総数(人口)",
    "交付枚数",
)

df1["人口に対する交付枚数率"] = df1["人口に対する交付枚数率"].astype(float).round(3) * 100

df1["市区町村名"] = df1["市区町村名"].replace(r"\s", "", regex=True)
df1["市区町村名"] = df1["市区町村名"].mask(df1["都道府県名"] + df1["市区町村名"] == "兵庫県篠山市", "丹波篠山市")
df1["市区町村名"] = df1["市区町村名"].mask(df1["都道府県名"] + df1["市区町村名"] == "高知県高岡郡梼原町", "高岡郡檮原町")
df1["市区町村名"] = df1["市区町村名"].mask(df1["都道府県名"] + df1["市区町村名"] == "福岡県糟屋郡須惠町", "糟屋郡須恵町")

if pd.Timestamp(df1.iloc[0]["算出基準日"]) < datetime.date(2018, 10, 1):
    df1["市区町村名"] = df1["市区町村名"].mask(
        df1["都道府県名"] + df1["市区町村名"] == "福岡県那珂川市", "筑紫郡那珂川町"
    )
else:
    df1["市区町村名"] = df1["市区町村名"].mask(
        df1["都道府県名"] + df1["市区町村名"] == "福岡県筑紫郡那珂川町", "那珂川市"
    )

df_code = pd.read_csv(
    "https://docs.google.com/spreadsheets/d/e/2PACX-1vSseDxB5f3nS-YQ1NOkuFKZ7rTNfPLHqTKaSag-qaK25EWLcSL0klbFBZm1b6JDKGtHTk6iMUxsXpxt/pub?gid=0&single=true&output=csv",
    dtype={"団体コード": int, "都道府県名": str, "郡名": str, "市区町村名": str},
)

df_code["市区町村名"] = df_code["郡名"].fillna("") + df_code["市区町村名"]
df_code.drop("郡名", axis=1, inplace=True)

df1 = pd.merge(df1, df_code, on=["都道府県名", "市区町村名"], how="left")
df1["団体コード"] = df1["団体コード"].astype("Int64")

df1.to_csv(
    "all_localgovs.csv",
    index=False,
    quoting=csv.QUOTE_MINIMAL,
    float_format="%.1f",
    encoding="utf_8_sig",
)

df1

# 男女・年齢別

df2 = df_conv(
    dfg.get_group(2)
    .sort_index()
    .dropna(axis=1, how="all")
    .dropna(how="all")
    .iloc[3:]
    .reset_index(drop=True),
    [
        "年齢",
        "人口(男)",
        "人口(女)",
        "人口(計)",
        "交付件数(男)",
        "交付件数(女)",
        "交付件数(計)",
        "交付率(男)",
        "交付率(女)",
        "交付率(計)",
        "全体に対する交付件数割合(男)",
        "全体に対する交付件数割合(女)",
        "全体に対する交付件数割合(計)",
    ],
    "人口(男)",
    "交付件数(男)",
)

df2["交付率(男)"] = df2["交付率(男)"].astype(float).round(3) * 100
df2["交付率(女)"] = df2["交付率(女)"].astype(float).round(3) * 100
df2["交付率(計)"] = df2["交付率(計)"].astype(float).round(3) * 100
df2["全体に対する交付件数割合(男)"] = df2["全体に対する交付件数割合(男)"].astype(float).round(3) * 100
df2["全体に対する交付件数割合(女)"] = df2["全体に対する交付件数割合(女)"].astype(float).round(3) * 100
df2["全体に対する交付件数割合(計)"] = df2["全体に対する交付件数割合(計)"].astype(float).round(3) * 100

df2.to_csv(
    "demographics.csv",
    index=False,
    quoting=csv.QUOTE_MINIMAL,
    float_format="%.1f",
    encoding="utf_8_sig",
)

df2

# 都道府県一覧

df3 = df_conv(
    dfg.get_group(3)
    .sort_index()
    .dropna(axis=1, how="all")
    .dropna(how="all")
    .iloc[2:]
    .reset_index(drop=True),
    ["都道府県名", "総数(人口)", "交付枚数", "人口に対する交付枚数率"],
    "総数(人口)",
)

df3["人口に対する交付枚数率"] = df3["人口に対する交付枚数率"].astype(float).round(3) * 100

df3.to_csv(
    "all_prefectures.csv",
    index=False,
    quoting=csv.QUOTE_MINIMAL,
    float_format="%.1f",
    encoding="utf_8_sig",
)

df3

# 団体区分別

df4 = df_conv(
    dfg.get_group(6)
    .sort_index()
    .dropna(axis=1, how="all")
    .dropna(how="all")
    .iloc[2:]
    .reset_index(drop=True),
    ["区分", "人口", "交付枚数", "人口に対する交付枚数率"],
)

df4["人口に対する交付枚数率"] = df4["人口に対する交付枚数率"].astype(float).round(3) * 100

df4 = df4.round({"人口に対する交付枚数率": 1})

df4.to_csv(
    "summary_by_types.csv",
    index=False,
    quoting=csv.QUOTE_MINIMAL,
    float_format="%.1f",
    encoding="utf_8_sig",
)

df4