兵庫県のコロナ

Pandasをつかわない縛りでまだ途中

from openpyxl import load_workbook, utils

import datetime
import json
import pathlib

from statistics import mean


def xlsx2table(filename, n=1):

    wb = load_workbook(filename=filename, data_only=True)
    ws = wb.worksheets[0]

    columns = []
    table = []

    for i, row in enumerate(ws.iter_rows()):

        data = [col.value for col in row]

        if i < n:
            columns.append(data)

        else:
            if any(data):

                table.append(data)

    header = ["".join(c).replace("\n", "").strip() for c in zip(*columns)]

    return table, header


def daterange(start, end):

    for n in range((end - start).days + 1):
        yield start + datetime.timedelta(n)


def notint2zero(data):

    result = [i if isinstance(i, int) else 0 for i in data]

    return result


def table2dic(table, index=0, remove=None):

    result = {}

    for i in table:

        if remove:
            del i[remove]

        key = i.pop(index)

        data = notint2zero(i)

        result[key] = data

    return result


def dumps_json(file_name, json_data, dir="data"):

    p = pathlib.Path(dir, file_name)
    p.parent.mkdir(parents=True, exist_ok=True)

    with p.open(mode="w") as fw:
        json.dump(json_data, fw, ensure_ascii=False, indent=4)


"""# 日付"""

JST = datetime.timezone(datetime.timedelta(hours=+9))
dt_now = datetime.datetime.now(JST)

last_update = dt_now.replace(hour=0, minute=0, second=0, microsecond=0)

"""+ age.json
+ age_summary.json
+ clusters.json
+ clusters_summary.json
+ patients.json

+ yousei.xlsx
    + main_summary.json

+ pcr.xlsx
    + inspections.json
    + inspections_summary.json
    + patients_summary.json
    + positive_or_negative.json
    + current_patients.json

# 陽性
"""


table, yousei_header = xlsx2table("yousei.xlsx")

yousei_data = table2dic(table, remove=1)

yousei_date_list = list(yousei_data.keys())

result = []
current_patients = {}

# 2020-03-09
d = [358, 16, 16, 12, 4, 0, 0, 0, 0, 0, 0]

for dt in daterange(yousei_date_list[0], yousei_date_list[-1]):

    v = yousei_data.get(dt, d)

    z = [a - b for a, b in zip(v, d)]

    current_patients[dt] = z[1] - z[9] - z[10]

    d = v[:]

    result.append({"date": dt, "data": v, "diff": z})

main_dict = {k: v for k, v in zip(yousei_header[2:], result[-1]["data"])}
main_dict

main_summary_dict = {
    "attr": "検査実施人数",
    "value": main_dict.get("検査実施人数(累計)"),
    "children": [
        {
            "attr": "陽性患者数",
            "value": main_dict.get("陽性者数(累計)"),
            "children": [
                {
                    "attr": "入院中",
                    "value": main_dict.get("入院中(合計)"),
                    "children": [
                        {"attr": "軽症・中等症", "value": main_dict.get("入院中(中等症以下)")},
                        {"attr": "重症", "value": main_dict.get("入院中(重症)")},
                    ],
                },
                {"attr": "宿泊療養", "value": main_dict.get("宿泊療養")},
                {
                    "attr": "入院・宿泊療養調整等",
                    "value": main_dict.get("入院・宿泊療養調整等"),
                    "children": [{"attr": "入院調整", "value": main_dict.get("うち入院調整")}],
                },
                {"attr": "その他医療機関福祉施設等", "value": main_dict.get("その他医療機関福祉施設等")},
                {"attr": "死亡", "value": main_dict.get("死亡(累計)")},
                {"attr": "退院", "value": main_dict.get("退院(累計)")},
            ],
        }
    ],
    "last_update": last_update.isoformat(),
}

dumps_json("main_summary.json", main_summary_dict)

"""# PCR"""


table, pcr_header = xlsx2table("pcr.xlsx")

pcr_data = table2dic(table)

patients_summary_lst = []
current_patients_lst = []
inspections_lst = []
posi_or_nega_lst = []

labels = []
official = []
unofficial = []
inspections = []
partients = []

pcr_date_list = list(pcr_data.keys())

for dt in daterange(pcr_date_list[0], pcr_date_list[-1]):

    dt_iso = dt.date().isoformat()
    v = pcr_data.get(dt, [0, 0, 0, 0, 0])

    patients_summary_lst.append(
        {
            "日付": dt.replace(tzinfo=JST).isoformat(),
            "小計": v[4],
        }
    )

    current_patients_lst.append(
        {
            "日付": dt.replace(tzinfo=JST).isoformat(),
            "小計": current_patients.get(dt, v[4]),
        }
    )

    inspections_lst.append(
        {
            "判明日": dt_iso,
            "地方衛生研究所等": v[1],
            "民間検査機関等": {"PCR検査": v[2], "抗原検査": v[3]},
            "陽性確認": v[4],
        }
    )

    labels.append(dt_iso)
    official.append(v[1])
    unofficial.append(v[2] + v[3])
    inspections.append(v[0])
    partients.append(v[4])

    try:
        poositive_average = (
            round(mean(partients[-7:]), 1) if len(partients) > 6 else None
        )
        poositive_rate = (
            round(sum(partients[-7:]) / sum(inspections[-7:]) * 100, 1)
            if len(inspections) > 6
            else None
        )

    except ZeroDivisionError:
        positive_average = 0.0
        positive_rate = 0.0

    posi_or_nega_lst.append(
        {
            "日付": dt.replace(tzinfo=JST).isoformat(),
            "陽性数": v[4],
            "陰性数": v[0] - v[4],
            "7日間平均陽性数": poositive_average,
            "7日間平均陽性率": poositive_rate,
        }
    )

patients_summary_dict = {
    "data": patients_summary_lst,
    "last_update": last_update.isoformat(),
}

dumps_json("patients_summary.json", patients_summary_dict)

inspections_summary_dict = {
    "data": {"地方衛生研究所等": official, "民間検査機関等": unofficial},
    "labels": labels,
    "last_update": last_update.isoformat(),
}

dumps_json("inspections_summary.json", inspections_summary_dict)

inspections_dict = {
    "data": inspections_lst,
    "last_update": last_update.isoformat(),
}

dumps_json("inspections.json", inspections_dict)

current_patients_dict = {
    "data": current_patients_lst,
    "last_update": last_update.isoformat(),
}

dumps_json("current_patients.json", current_patients_dict)

posi_or_nega_dict = {
    "data": posi_or_nega_lst,
    "last_update": last_update.isoformat(),
}

dumps_json("positive_or_negative.json", posi_or_nega_dict)

"""# 陽性患者"""


table, header = xlsx2table("kanja.xlsx", 0)

header = [
    "".join([i or "" for i in c]).replace("\n", "").strip() for c in zip(*table[2:4])
]
header

len(header)

temp = []
missing_number = []

for tbl in table[4:]:

    tbl[1] = int(tbl[1])

    if tbl[2] == "欠番":
        missing_number.append(tbl[1])
        continue

    tbl[2] = utils.datetime.from_excel(tbl[2]) if isinstance(tbl[2], int) else tbl[2]
    tbl[8] = utils.datetime.from_excel(tbl[8]) if isinstance(tbl[8], int) else tbl[8]

    tbl[3] = str(tbl[3])
    tbl[3] = tbl[3] + "代" if tbl[3].isdecimal() else tbl[3]

    tbl[11:] = [i is not None for i in tbl[11:]]

    temp.append(tbl)

# 転置
data = [i for i in zip(*temp) if any(i)]
len(data)

import collections

data[0] = [int(i) for i in data[0]]

# 発表日
collections.Counter(data[1])

# 年代
collections.Counter(data[2])

# 性別
collections.Counter(data[3])

collections.Counter(data[10])