Skip to content

Commit 961683c

Browse files
authored
fix-s3-categorical-integer-columns (#312)
1 parent 15b62f2 commit 961683c

File tree

4 files changed

+52
-4
lines changed

4 files changed

+52
-4
lines changed

.github/workflows/license-check.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ jobs:
5050
with:
5151
requirements: "backend/requirements-all.txt"
5252
fail: "Copyleft,Other,Error"
53-
exclude: '(psycopg2.*2\.9\.3|fqdn.*1\.5\.1|pyzmq.*25\.1\.2|debugpy.*1\.6\.7|certifi.*2023\.11\.17|tqdm.*4\.66\.1|webencodings.*0\.5\.1|torch.*1\.10\.2.*|torch.*1\.11\.0.*|pytorch-ignite.*0\.4\.10.*|torchaudio.*0\.11\.0.*|torchvision.*0\.12\.0.*|terminado.*0\.15\.0|qudida.*0\.0\.4|expiringdict.*1\.2\.2|botocore.*1\.29\.80|orderedmultidict.*1\.0\.1|deepchecks.*)'
53+
exclude: '(envier.*0\.5\.0|psycopg2.*2\.9\.3|fqdn.*1\.5\.1|pyzmq.*25\.1\.2|debugpy.*1\.6\.7|certifi.*2023\.11\.17|tqdm.*4\.66\.1|webencodings.*0\.5\.1|torch.*1\.10\.2.*|torch.*1\.11\.0.*|pytorch-ignite.*0\.4\.10.*|torchaudio.*0\.11\.0.*|torchvision.*0\.12\.0.*|terminado.*0\.15\.0|qudida.*0\.0\.4|expiringdict.*1\.2\.2|botocore.*1\.29\.80|orderedmultidict.*1\.0\.1|deepchecks.*)'
5454
# psycopg2 is LGPL 2
5555
# pyzmq is Revised BSD https://github.com/zeromq/pyzmq/blob/main/examples/LICENSE
5656
# debugpy is MIT https://github.com/microsoft/debugpy/blob/main/LICENSE

backend/deepchecks_monitoring/ee/bgtasks/object_storage_ingestor.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@
2828
from deepchecks_monitoring.public_models.task import BackgroundWorker, Task
2929
from deepchecks_monitoring.resources import ResourcesProvider
3030
from deepchecks_monitoring.schema_models import Model, ModelVersion
31-
from deepchecks_monitoring.schema_models.column_type import SAMPLE_ID_COL, SAMPLE_TS_COL
31+
from deepchecks_monitoring.schema_models.column_type import SAMPLE_ID_COL, SAMPLE_TS_COL, ColumnType
3232
from deepchecks_monitoring.schema_models.data_sources import DataSource
3333
from deepchecks_monitoring.utils import database
34+
from deepchecks_monitoring.utils.other import datetime_formatter, string_formatter
3435

3536
__all__ = ['ObjectStorageIngestor']
3637

@@ -147,6 +148,12 @@ async def run(self, task: 'Task', session: AsyncSession, resources_provider: Res
147148
for prefix in version_prefixes:
148149
for df, time in self.ingest_prefix(s3, bucket, f'{version_path}/{prefix}', version.latest_file_time,
149150
errors, version.model_id, version.id, need_ts=True):
151+
for name, kind in version.features_columns.items():
152+
if kind == ColumnType.CATEGORICAL and name in df.columns:
153+
df[name] = df[name].apply(string_formatter)
154+
elif kind == ColumnType.DATETIME and name in df.columns:
155+
df[name] = df[name].apply(datetime_formatter)
156+
150157
# For each file, set lock expiry to 360 seconds from now
151158
await lock.extend(360, replace_ttl=True)
152159
await self.ingestion_backend.log_samples(version, df, session, organization_id, new_scan_time)

backend/deepchecks_monitoring/utils/other.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
"""Represent global utility functions."""
22
import typing as t
3+
from datetime import datetime
34
from ssl import SSLContext
45

6+
import numpy as np
7+
import pandas as pd
58
import pendulum as pdl
9+
import rfc3339_validator
610
from aiokafka.admin import AIOKafkaAdminClient
711
from aiokafka.admin import __version__ as aiokafka_version
812
from aiokafka.client import AIOKafkaClient
13+
from pendulum.datetime import DateTime as PendulumDateTime
914
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession
1015

1116
from deepchecks_monitoring.public_models import Organization, User, UserOAuthDTO
1217
from deepchecks_monitoring.schema_models.model_version import ModelVersion
1318

14-
__all__ = ['generate_random_user', 'generate_test_user', 'datetime_sample_formatter']
19+
__all__ = ['generate_random_user', 'generate_test_user', 'datetime_sample_formatter',
20+
'datetime_formatter', 'string_formatter', 'parse_timestamp']
1521

1622

1723
class ExtendedAIOKafkaAdminClient(AIOKafkaAdminClient): # pylint: disable=missing-class-docstring
@@ -108,3 +114,37 @@ def datetime_sample_formatter(sample: t.Dict, model_version: ModelVersion):
108114
continue
109115
if model_columns[col_name].get('format') == 'date-time':
110116
sample[col_name] = pdl.parse(val)
117+
118+
119+
def datetime_formatter(datetime_obj):
120+
if datetime_obj is None:
121+
return None
122+
if isinstance(datetime_obj, pd.Period):
123+
datetime_obj = datetime_obj.to_timestamp()
124+
elif isinstance(datetime_obj, np.datetime64):
125+
datetime_obj = pd.Timestamp(datetime_obj.to_timestamp())
126+
return parse_timestamp(datetime_obj).to_iso8601_string()
127+
128+
129+
def string_formatter(some_obj):
130+
if pd.isna(some_obj):
131+
return None
132+
return str(some_obj)
133+
134+
135+
def parse_timestamp(timestamp: t.Union[int, datetime, str]) -> 'PendulumDateTime':
136+
"""Parse timestamp to datetime object."""
137+
# If no timezone in datetime, assumed to be UTC and converted to local timezone
138+
if isinstance(timestamp, int) or np.issubdtype(type(timestamp), np.integer):
139+
return pdl.from_timestamp(timestamp, pdl.local_timezone())
140+
elif isinstance(timestamp, PendulumDateTime):
141+
return timestamp
142+
elif isinstance(timestamp, datetime):
143+
return pdl.instance(timestamp, pdl.local_timezone())
144+
elif isinstance(timestamp, str):
145+
if rfc3339_validator.validate_rfc3339(timestamp):
146+
return pdl.parse(timestamp)
147+
else:
148+
raise ValueError(f'Not supported timestamp format for: {timestamp}')
149+
else:
150+
raise ValueError(f'Not supported timestamp type: {type(timestamp)}')

backend/requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,5 @@ ddtrace>=1.0,<2
3838
python-json-logger>=2.0.7
3939
mixpanel==4.10.0
4040
typing_extensions==4.7.1
41-
pyarrow==14.0.1
41+
pyarrow==14.0.1
42+
rfc3339-validator==0.1.4 # Used to validate date-time format in jsonschema

0 commit comments

Comments
 (0)