MLS

github.com

class CellObservation(CellReport, Report, BaseObservation):
    """
    携帯電話基地局観測データを扱うクラス
    """

    _valid_schema = ValidCellObservationSchema()
    _fields = CellReport._fields + Report._fields

    @classmethod
    def _from_json_value(cls, dct):
        """
        JSONデータから観測データを作成する際に呼ばれるメソッド
        radioフィールドの値がRadio型でない場合は、Radio型に変換する
        """
        if (
            "radio" in dct
            and dct["radio"] is not None
            and not type(dct["radio"]) == Radio
        ):
            dct["radio"] = Radio(dct["radio"])
        return super(CellObservation, cls)._from_json_value(dct)

    def _to_json_value(self):
        """
        観測データをJSONに変換する際に呼ばれるメソッド
        radioフィールドの値がRadio型の場合は、整数値に変換する
        """
        dct = super(CellObservation, self)._to_json_value()
        if "radio" in dct and type(dct["radio"]) == Radio:
            dct["radio"] = int(dct["radio"])
        return dct

    @property
    def weight(self):
        """
        観測データの重み付け計算を行うプロパティ
        携帯電話の無線アクセス技術(GSM、WCDMA、LTE)ごとに、信号強度をもとに重み付けを行う
        最終的に、base_weight(Report側で計算された基本の重み)に信号強度の重みを掛けた値を返す
        """
        offsets = {
            # GSM median signal is -95
            # Map -113: 0.52, -95: 1.0, -79: 2.0, -51: 10.2
            Radio.gsm: (-95, -5.0),
            # WCDMA median signal is -100
            # Map -121: 0.47, -100: 1.0, -80: 2.4, -50: 16, -25: 256
            Radio.wcdma: (-100, 0.0),
            # LTE median signal is -105
            # Map -140: 0.3, -105: 1.0, -89: 2.0, -55: 16.0, -43: 48.0
            Radio.lte: (-105, 5.0),
        }
        default, offset = offsets.get(self.radio, (None, 0.0))
        signal = self.signal if self.signal is not None else default
        signal_weight = 1.0
        if signal is not None:
            signal_weight = ((1.0 / (signal + offset) ** 2) * 10000) ** 2
        return signal_weight * self.base_weight

CellObservationクラスは、携帯電話基地局の観測データを表すクラスです。CellReport、Report、BaseObservationを継承しています。

  • _valid_schemaには、ValidCellObservationSchemaが設定されています。
  • _fieldsには、CellReportとReportのフィールドが含まれます。
  • _from_json_value()は、JSONからデータを読み込む際に呼ばれます。radioフィールドの値がRadioの列挙値でない場合は、Radio型に変換します。
  • _to_json_value()は、JSONへの変換時に呼ばれます。radioフィールドの値がRadio型の場合は、整数値に変換します。

weightプロパティでは、観測データの重み付け計算が行われています。携帯電話の無線アクセス技術(GSM、WCDMA、LTE)ごとに、信号強度をもとに重み付けを行っています。最終的に、base_weight(Report側で計算された基本の重み)に信号強度の重みを掛けた値が返されます。 このように、CellObservationクラスでは、携帯電話基地局の観測データを表現するための機能と、そのデータの重み付け計算ロジックが実装されています。

class Report(BaseReport):
    """
    レポートデータを扱うクラス
    accuracy_weight、age_weight、speed_weightなどの重み付け計算ロジックが含まれる
    """

    _max_observation_accuracy = constants.MAX_OBSERVATION_ACCURACY
    _valid_schema = ValidReportSchema()
    _fields = (
        "lat",
        "lon",
        "accuracy",
        "altitude",
        "altitude_accuracy",
        "heading",
        "pressure",
        "speed",
        "source",
        "timestamp",
    )

    @classmethod
    def combine(cls, *reports):
        """
        複数のレポートを1つのレポートに結合するメソッド
        """
        values = {}
        for report in reports:
            values.update(report.__dict__)
        return cls(**values)

    @property
    def base_weight(self):
        """
        レポートの基本的な重み付けを計算するプロパティ
        accuracy_weight、age_weight、speed_weightを掛け合わせた値を返す
        """
        return self.accuracy_weight * self.age_weight * self.speed_weight

    @property
    def accuracy_weight(self):
        """
        レポートの精度に基づいた重み付けを計算するプロパティ
        精度が高いほど重み付けが高くなる
        """
        # Default to 10.0 meters for unknown accuracy
        accuracy = self.accuracy is not None and abs(self.accuracy) or 10.0
        accuracy = max(accuracy, 10.0)
        # Don't differentiate values below 10 meters
        # Maps 10: 1, 20: 0.7, 40: 0.5, 80: 0.35, 100: 0.32, 200: 0.22
        if accuracy > self._max_observation_accuracy:
            return 0.0
        return math.sqrt(10 / accuracy)

    @property
    def age_weight(self):
        """
        レポートの年齢に基づいた重み付けを計算するプロパティ
        年齢が新しいほど重み付けが高くなる
        """
        # Default to 2000 ms for unknown age. Use positive numbers as
        # we only care about relative age difference.
        age = self.age is not None and abs(self.age) or 2000.0
        age = max(age, 2000.0)
        # Maps 0: 1.0, 2000: 1.0, 4000: 0.7: 8000: 0.5, 18000: 0.33
        if age > constants.MAX_OBSERVATION_AGE:
            return 0.0
        return min(math.sqrt(2000.0 / age), 1.0)

    @property
    def speed_weight(self):
        """
        レポートの速度に基づいた重み付けを計算するプロパティ
        速度が遅いほど重み付けが高くなる
        """
        # Default to 1 meter / second for unknown speed.
        speed = self.speed is not None and abs(self.speed) or 1.0
        speed = max(speed, 1.0)
        # Maps: 0: 1.0, 5.0: 1.0, 10.0: 0.7, 20.0: 0.5, 45.0: 0.33
        if speed > constants.MAX_OBSERVATION_SPEED:
            return 0.0
        return min(math.sqrt(5.0 / speed), 1.0)

Report クラスは、レポートデータを扱うクラスです。accuracy_weight、age_weight、speed_weight などの重み付け計算ロジックが含まれています。 combine メソッドは、複数のレポートを 1 つのレポートに結合するメソッドです。 base_weight プロパティは、レポートの基本的な重み付けを計算します。accuracy_weight、age_weight、speed_weight を掛け合わせた値を返します。 accuracy_weight プロパティは、レポートの精度に基づいた重み付けを計算します。精度が高いほど重み付けが高くなります。 age_weight プロパティは、レポートの年齢に基づいた重み付けを計算します。年齢が新しいほど重み付けが高くなります。 speed_weight プロパティは、レポートの速度に基づいた重み付けを計算します。速度が遅いほど重み付けが高くなります。 このように、Report クラスでは、レポートデータの重み付け計算ロジックが実装されています。accuracy_weight、age_weight、speed_weight などのプロパティで、それぞれの要素に応じた重み付けを行っています。

@property
def accuracy_weight(self):
    """
    レポートの精度に基づいた重み付けを計算するプロパティ
    精度が高いほど重み付けが高くなる
    """
    # Default to 10.0 meters for unknown accuracy
    accuracy = self.accuracy is not None and abs(self.accuracy) or 10.0
    accuracy = max(accuracy, 10.0)
    # Don't differentiate values below 10 meters
    # Maps 10: 1, 20: 0.7, 40: 0.5, 80: 0.35, 100: 0.32, 200: 0.22
    if accuracy > self._max_observation_accuracy:
        return 0.0
    return math.sqrt(10 / accuracy)

accuracy_weightプロパティは、レポートの精度に基づいた重み付けを計算するプロパティです。精度が高いほど、重み付けが高くなります。

具体的な計算方法は次のとおりです。

レポートの精度(accuracy)が不明な場合は、デフォルトで10.0メートルとみなします。 accuracyの値が10メートル未満の場合は、10メートルとみなします。 accuracyの値がself._max_observation_accuracy(デフォルトでは300メートル)を超える場合は、重み付けを0.0とします。 それ以外の場合は、sqrt(10 / accuracy)を計算し、その値を重み付けとします。 accuracyの値と重み付けの関係は以下のようになります。

10m: 1.0 20m: 0.7 40m: 0.5 80m: 0.35 100m: 0.32 200m: 0.22 つまり、精度が高いほど重み付けが高くなり、精度が低くなるにつれて重み付けが小さくなります。精度がself._max_observation_accuracyを超えると、その観測データは無視されます。

このaccuracy_weightは、Report.base_weightの計算に使用されます。base_weightは、accuracy_weight、age_weight、speed_weightを掛け合わせた値になります。

class ValidCellReportSchema(ValidCellKeySchema):
    """
    携帯電話基地局レポートの特定のフィールドを検証するスキーマ
    """

    age = DefaultNode(
        colander.Integer(),
        missing=None,
        validator=colander.Range(constants.MIN_AGE, constants.MAX_AGE),
    )

    asu = DefaultNode(
        colander.Integer(),
        missing=None,
        validator=colander.Range(
            min(constants.MIN_CELL_ASU.values()), max(constants.MAX_CELL_ASU.values())
        ),
    )

    signal = DefaultNode(
        colander.Integer(),
        missing=None,
        validator=colander.Range(
            min(constants.MIN_CELL_SIGNAL.values()),
            max(constants.MAX_CELL_SIGNAL.values()),
        ),
    )

    ta = DefaultNode(
        colander.Integer(),
        missing=None,
        validator=colander.Range(constants.MIN_CELL_TA, constants.MAX_CELL_TA),
    )

    def _signal_from_asu(self, radio, value):
        """
        ASU (Arbitrary Strength Unit) 値から信号強度 (dBm) を計算するメソッド
        無線アクセス技術 (GSM, WCDMA, LTE) ごとに異なる計算式を使用する
        """
        if radio is Radio.gsm:
            return (value * 2) - 113
        if radio is Radio.wcdma:
            return value - 116
        if radio is Radio.lte:
            return value - 140

    def deserialize(self, data):
        """
        デシリアライズ時の追加処理を行うメソッド
        ASU と信号強度の値がスワップされている場合の修正や
        ASU から信号強度への変換などを行う
        """
        if data:
            # Sometimes the asu and signal fields are swapped
            if (
                data.get("asu") is not None
                and data.get("asu", 0) < -5
                and (data.get("signal") is None or data.get("signal", 0) >= 0)
            ):
                # shallow copy
                data = dict(data)
                data["signal"] = data["asu"]
                data["asu"] = None

        data = super(ValidCellReportSchema, self).deserialize(data)

        if isinstance(data.get("radio"), Radio):
            radio = data["radio"]

            # Radio type specific checks for ASU field
            if data.get("asu") is not None:
                if not (
                    constants.MIN_CELL_ASU[radio]
                    <= data["asu"]
                    <= constants.MAX_CELL_ASU[radio]
                ):
                    data = dict(data)
                    data["asu"] = None

            # Radio type specific checks for signal field
            if data.get("signal") is not None:
                if not (
                    constants.MIN_CELL_SIGNAL[radio]
                    <= data["signal"]
                    <= constants.MAX_CELL_SIGNAL[radio]
                ):
                    data = dict(data)
                    data["signal"] = None

            # Radio type specific checks for TA field
            if data.get("ta") is not None and radio is Radio.wcdma:
                data = dict(data)
                data["ta"] = None

            # Calculate signal from ASU field
            if data.get("asu") is not None and data.get("signal") is None:
                if (
                    constants.MIN_CELL_ASU[radio]
                    <= data["asu"]
                    <= constants.MAX_CELL_ASU[radio]
                ):
                    data = dict(data)
                    data["signal"] = self._signal_from_asu(radio, data["asu"])

        return data

    def validator(self, node, cstruct):
        """
        検証ロジックを実行するメソッド
        基底クラスの validator を呼び出した上で、特定のフィールドが必須であることを検証する
        """
        super(ValidCellReportSchema, self).validator(node, cstruct)
        for field in ("radio", "mcc", "mnc", "lac", "cid"):
            if cstruct[field] is None or cstruct[field] is colander.null:
                raise colander.Invalid(node, "Cell %s is required." % field)

ValidCellReportSchemaは、携帯電話基地局のレポートデータの特定のフィールドを検証するスキーマです。ValidCellKeySchemaを継承しています。 age、asu、signal、taフィールドのノードが定義されており、それぞれの値の範囲が検証されます。 _signal_from_asuメソッドは、ASU (Arbitrary Strength Unit) 値から信号強度 (dBm) を計算するメソッドです。無線アクセス技術 (GSM, WCDMA, LTE) ごとに異なる計算式を使用します。 deserializeメソッドは、デシリアライズ時の追加処理を行うメソッドです。ASUと信号強度の値がスワップされている場合の修正や、ASUから信号強度への変換などを行います。 validatorメソッドは、検証ロジックを実行するメソッドです。基底クラスのvalidatorを呼び出した上で、特定のフィールド(radio、mcc、mnc、lac、cid)が必須であることを検証します。 このスキーマでは、携帯電話基地局のレポートデータの特定のフィールドの値の範囲や必須性を検証するロジックが実装されています。また、ASUと信号強度の値の処理や変換も行われています。

github.com

このコードは、Ichnaeaプロジェクトの一部で、モバイルデバイスからの観測データを処理し、BluetoothWi-Fi、携帯電話基地局の位置情報を更新するためのロジックを含んでいます。

主な機能は以下の通りです:

StationStateクラス: 観測データと既存の基地局情報を比較し、基地局の位置情報を更新するかどうかを判断するロジックが含まれています。 MacState、BlueState、WifiState、CellStateクラス: それぞれBluetoothWi-Fi、携帯電話基地局の位置情報を扱うための具体的なロジックが実装されています。 StationUpdaterクラス: 観測データをシャードごとに分割し、各シャードの観測データを処理するためのロジックが含まれています。また、更新された位置情報に基づいて関連する地域情報を更新するためのロジックも含まれています。 MacUpdater、BlueUpdater、WifiUpdater、CellUpdaterクラス: それぞれBluetoothWi-Fi、携帯電話基地局の位置情報を更新するための具体的な実装が含まれています。 全体的には、モバイルデバイスからの観測データを受け取り、それらを既存の位置情報と比較して、基地局の位置情報を更新するプロセスが実装されています。また、更新された位置情報に基づいて関連する地域情報も更新されます。このコードはデータベースとのやり取りや、観測データの重み付け、位置情報の計算などの複雑なロジックを含んでいます。