Skip to content

Commit 1151e05

Browse files
Release Python Connector 1.3.3 (#740)
* fix python lint and reformat scripts (#668) * Reduce memory usage in parquet format code path (#737) * refactor: Resolve lint errors in python release script. (#660) * Reduce memory usage in delta format code paths (#723) * Update Python connector version to 1.3.3 --------- Co-authored-by: Kyle Chui <48545987+kylechui@users.noreply.github.com>
1 parent 1115317 commit 1151e05

File tree

18 files changed

+842
-543
lines changed

18 files changed

+842
-543
lines changed

client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,8 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
179179
Table(name = "table_with_cm_id", schema = "default", share = "share8"),
180180
Table(name = "deletion_vectors_with_dvs_dv_property_on", schema = "default", share = "share8"),
181181
Table(name = "dv_and_cm_table", schema = "default", share = "share8"),
182-
Table(name = "timestampntz_cdf_table", schema = "default", share = "share8")
182+
Table(name = "timestampntz_cdf_table", schema = "default", share = "share8"),
183+
Table(name = "12k_rows", schema = "default", share = "share8")
183184
)
184185
assert(expected == client.listAllTables().toSet)
185186
} finally {

dev/python_release.sh

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,32 @@
1-
#!/bin/bash -e -pipe
1+
#!/usr/bin/env bash
2+
set -euo pipefail
23

34
# Switch to the project root directory
4-
cd $( dirname $0 )
5-
cd ..
5+
pushd "$(dirname "$0")/.."
66

77
# Clean existing artifacts
8-
cd python
8+
pushd python
99
python3 setup.py clean --all
1010
rm -rf delta_sharing.egg-info dist
11-
cd ..
11+
popd
1212

1313
printf "Please type the python release version: "
14-
read VERSION
15-
echo $VERSION
14+
read -r VERSION
15+
echo "$VERSION"
1616

1717
# Update the Python connector version
18-
sed -i '' "s/__version__ = \".*\"/__version__ = \"$VERSION\"/g" python/delta_sharing/version.py
19-
git add python/delta_sharing/version.py
18+
sed -i'' "s/^__version__ = \".*\"$/__version__ = \"$VERSION\"/" ./python/delta_sharing/version.py
19+
git add ./python/delta_sharing/version.py
2020
# Use --allow-empty so that we can re-run this script even if the Python connector version has been updated
2121
git commit -m "Update Python connector version to $VERSION" --allow-empty
2222

2323
# This creates a lightweight tag that points to the current commit.
24-
git tag py-v$VERSION
24+
git tag "py-v$VERSION"
2525

2626
# Generate Python artifacts
27-
cd python/
27+
pushd python
2828
python3 setup.py sdist bdist_wheel
29-
cd ..
29+
popd
3030

3131
echo "=== Generated all release artifacts ==="
32+
popd

python/delta_sharing/_internal_auth.py

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,16 @@
3535

3636

3737
class AuthConfig:
38-
def __init__(self, token_exchange_max_retries=5,
39-
token_exchange_max_retry_duration_in_seconds=60,
40-
token_renewal_threshold_in_seconds=600):
38+
def __init__(
39+
self,
40+
token_exchange_max_retries=5,
41+
token_exchange_max_retry_duration_in_seconds=60,
42+
token_renewal_threshold_in_seconds=600,
43+
):
4144
self.token_exchange_max_retries = token_exchange_max_retries
4245
self.token_exchange_max_retry_duration_in_seconds = (
43-
token_exchange_max_retry_duration_in_seconds)
46+
token_exchange_max_retry_duration_in_seconds
47+
)
4448
self.token_renewal_threshold_in_seconds = token_renewal_threshold_in_seconds
4549

4650

@@ -90,7 +94,10 @@ def __init__(self, endpoint: str, username: str, password: str):
9094

9195
def add_auth_header(self, session: requests.Session) -> None:
9296
session.auth = (self.username, self.password)
93-
session.post(self.endpoint, data={"grant_type": "client_credentials"},)
97+
session.post(
98+
self.endpoint,
99+
data={"grant_type": "client_credentials"},
100+
)
94101

95102
def is_expired(self) -> bool:
96103
return False
@@ -107,23 +114,22 @@ def __init__(self, access_token: str, expires_in: int, creation_timestamp: int):
107114

108115

109116
class OAuthClient:
110-
def __init__(self,
111-
token_endpoint: str,
112-
client_id: str,
113-
client_secret: str,
114-
scope: Optional[str] = None):
117+
def __init__(
118+
self, token_endpoint: str, client_id: str, client_secret: str, scope: Optional[str] = None
119+
):
115120
self.token_endpoint = token_endpoint
116121
self.client_id = client_id
117122
self.client_secret = client_secret
118123
self.scope = scope
119124

120125
def client_credentials(self) -> OAuthClientCredentials:
121126
credentials = base64.b64encode(
122-
f"{self.client_id}:{self.client_secret}".encode('utf-8')).decode('utf-8')
127+
f"{self.client_id}:{self.client_secret}".encode("utf-8")
128+
).decode("utf-8")
123129
headers = {
124-
'accept': 'application/json',
125-
'authorization': f'Basic {credentials}',
126-
'content-type': 'application/x-www-form-urlencoded'
130+
"accept": "application/json",
131+
"authorization": f"Basic {credentials}",
132+
"content-type": "application/x-www-form-urlencoded",
127133
}
128134
body = f"grant_type=client_credentials{f'&scope={self.scope}' if self.scope else ''}"
129135
response = requests.post(self.token_endpoint, headers=headers, data=body)
@@ -136,9 +142,9 @@ def parse_oauth_token_response(self, response: str) -> OAuthClientCredentials:
136142
# Parsing the response per oauth spec
137143
# https://datatracker.ietf.org/doc/html/rfc6749#section-5.1
138144
json_node = json.loads(response)
139-
if 'access_token' not in json_node or not isinstance(json_node['access_token'], str):
145+
if "access_token" not in json_node or not isinstance(json_node["access_token"], str):
140146
raise RuntimeError("Missing 'access_token' field in OAuth token response")
141-
if 'expires_in' not in json_node:
147+
if "expires_in" not in json_node:
142148
raise RuntimeError("Missing 'expires_in' field in OAuth token response")
143149
try:
144150
# OAuth spec requires 'expires_in' to be an integer, e.g., 3600.
@@ -153,15 +159,13 @@ def parse_oauth_token_response(self, response: str) -> OAuthClientCredentials:
153159
# -d "client_id=$CLIENT_ID" \
154160
# -d "client_secret=$CLIENT_SECRET" \
155161
# -d "scope=https://graph.microsoft.com/.default"
156-
expires_in = int(json_node['expires_in']) # Convert to int if it's a string
162+
expires_in = int(json_node["expires_in"]) # Convert to int if it's a string
157163
except ValueError:
158164
raise RuntimeError(
159165
"'expires_in' field must be an integer or a string convertible to integer"
160166
)
161167
return OAuthClientCredentials(
162-
json_node['access_token'],
163-
expires_in,
164-
int(datetime.now().timestamp())
168+
json_node["access_token"], expires_in, int(datetime.now().timestamp())
165169
)
166170

167171

@@ -172,7 +176,7 @@ def __init__(self, oauth_client: OAuthClient, auth_config: AuthConfig = AuthConf
172176
self.current_token: Optional[OAuthClientCredentials] = None
173177
self.lock = threading.RLock()
174178

175-
def add_auth_header(self,session: requests.Session) -> None:
179+
def add_auth_header(self, session: requests.Session) -> None:
176180
token = self.maybe_refresh_token()
177181
with self.lock:
178182
session.headers.update(
@@ -199,9 +203,7 @@ def get_expiration_time(self) -> Optional[str]:
199203

200204

201205
class AuthCredentialProviderFactory:
202-
__oauth_auth_provider_cache : Dict[
203-
DeltaSharingProfile,
204-
OAuthClientCredentialsAuthProvider] = {}
206+
__oauth_auth_provider_cache: Dict[DeltaSharingProfile, OAuthClientCredentialsAuthProvider] = {}
205207

206208
@staticmethod
207209
def create_auth_credential_provider(profile: DeltaSharingProfile):
@@ -210,14 +212,17 @@ def create_auth_credential_provider(profile: DeltaSharingProfile):
210212
return AuthCredentialProviderFactory.__oauth_client_credentials(profile)
211213
elif profile.type == "basic":
212214
return AuthCredentialProviderFactory.__auth_basic(profile)
213-
elif (profile.share_credentials_version == 1 and
214-
(profile.type is None or profile.type == "bearer_token")):
215+
elif profile.share_credentials_version == 1 and (
216+
profile.type is None or profile.type == "bearer_token"
217+
):
215218
return AuthCredentialProviderFactory.__auth_bearer_token(profile)
216219

217220
# any other scenario is unsupported
218-
raise RuntimeError(f"unsupported profile.type: {profile.type}"
219-
f" profile.share_credentials_version"
220-
f" {profile.share_credentials_version}")
221+
raise RuntimeError(
222+
f"unsupported profile.type: {profile.type}"
223+
f" profile.share_credentials_version"
224+
f" {profile.share_credentials_version}"
225+
)
221226

222227
@staticmethod
223228
def __oauth_client_credentials(profile):
@@ -235,11 +240,10 @@ def __oauth_client_credentials(profile):
235240
token_endpoint=profile.token_endpoint,
236241
client_id=profile.client_id,
237242
client_secret=profile.client_secret,
238-
scope=profile.scope
243+
scope=profile.scope,
239244
)
240245
provider = OAuthClientCredentialsAuthProvider(
241-
oauth_client=oauth_client,
242-
auth_config=AuthConfig()
246+
oauth_client=oauth_client, auth_config=AuthConfig()
243247
)
244248
AuthCredentialProviderFactory.__oauth_auth_provider_cache[profile] = provider
245249
return provider

python/delta_sharing/delta_sharing.py

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,7 @@ def _parse_url(url: str) -> Tuple[str, str, str, str]:
5151
return (profile, share, schema, table)
5252

5353

54-
def get_table_version(
55-
url: str,
56-
starting_timestamp: Optional[str] = None
57-
) -> int:
54+
def get_table_version(url: str, starting_timestamp: Optional[str] = None) -> int:
5855
"""
5956
Get the shared table version using the given url.
6057
@@ -66,8 +63,7 @@ def get_table_version(
6663
profile = DeltaSharingProfile.read_from_file(profile_json)
6764
rest_client = DataSharingRestClient(profile)
6865
response = rest_client.query_table_version(
69-
Table(name=table, share=share, schema=schema),
70-
starting_timestamp
66+
Table(name=table, share=share, schema=schema), starting_timestamp
7167
)
7268
return response.delta_table_version
7369

@@ -117,6 +113,7 @@ def load_as_pandas(
117113
timestamp: Optional[str] = None,
118114
jsonPredicateHints: Optional[str] = None,
119115
use_delta_format: Optional[bool] = None,
116+
convert_in_batches: bool = False,
120117
) -> pd.DataFrame:
121118
"""
122119
Load the shared table using the given url as a pandas DataFrame.
@@ -128,6 +125,12 @@ def load_as_pandas(
128125
:param version: an optional non-negative int. Load the snapshot of table at version
129126
:param jsonPredicateHints: Predicate hints to be applied to the table. For more details see:
130127
https://github.com/delta-io/delta-sharing/blob/main/PROTOCOL.md#json-predicates-for-filtering
128+
:param use_delta_format: Whether to use delta format or parquet format. If not set, table
129+
metadata will be used to determine whether to use delta or parquet format.
130+
:param convert_in_batches: Whether to convert the parquet files to pandas one batch at a time
131+
rather than one file at a time. This may reduce memory consumption at the cost of taking
132+
longer or downloading more data, with parquet format queries being more likely to see
133+
improvements.
131134
:return: A pandas DataFrame representing the shared table.
132135
"""
133136
profile_json, share, schema, table = _parse_url(url)
@@ -139,14 +142,13 @@ def load_as_pandas(
139142
limit=limit,
140143
version=version,
141144
timestamp=timestamp,
142-
use_delta_format=use_delta_format
145+
use_delta_format=use_delta_format,
146+
convert_in_batches=convert_in_batches,
143147
).to_pandas()
144148

145149

146150
def load_as_spark(
147-
url: str,
148-
version: Optional[int] = None,
149-
timestamp: Optional[str] = None
151+
url: str, version: Optional[int] = None, timestamp: Optional[str] = None
150152
) -> "PySparkDataFrame": # noqa: F821
151153
"""
152154
Load the shared table using the given url as a Spark DataFrame. `PySpark` must be installed,
@@ -182,7 +184,7 @@ def load_table_changes_as_spark(
182184
starting_version: Optional[int] = None,
183185
ending_version: Optional[int] = None,
184186
starting_timestamp: Optional[str] = None,
185-
ending_timestamp: Optional[str] = None
187+
ending_timestamp: Optional[str] = None,
186188
) -> "PySparkDataFrame": # noqa: F821
187189
"""
188190
Load the table changes of a shared table as a Spark DataFrame using the given url.
@@ -203,7 +205,8 @@ def load_table_changes_as_spark(
203205
from pyspark.sql import SparkSession
204206
except ImportError:
205207
raise ImportError(
206-
"Unable to import pyspark. `load_table_changes_as_spark` requires PySpark.")
208+
"Unable to import pyspark. `load_table_changes_as_spark` requires PySpark."
209+
)
207210

208211
spark = SparkSession.getActiveSession()
209212
assert spark is not None, (
@@ -228,7 +231,8 @@ def load_table_changes_as_pandas(
228231
ending_version: Optional[int] = None,
229232
starting_timestamp: Optional[str] = None,
230233
ending_timestamp: Optional[str] = None,
231-
use_delta_format: Optional[bool] = None
234+
use_delta_format: Optional[bool] = None,
235+
convert_in_batches: bool = False,
232236
) -> pd.DataFrame:
233237
"""
234238
Load the table changes of shared table as a pandas DataFrame using the given url.
@@ -241,23 +245,32 @@ def load_table_changes_as_pandas(
241245
:param ending_version: The ending version of table changes.
242246
:param starting_timestamp: The starting timestamp of table changes.
243247
:param ending_timestamp: The ending timestamp of table changes.
248+
:param use_delta_format: Whether to use delta format or parquet format. Default is parquet
249+
format.
250+
:param convert_in_batches: Whether to convert the parquet files to pandas one batch at a time
251+
rather than one file at a time. This may reduce memory consumption at the cost of taking
252+
longer or downloading more data, with parquet format queries being more likely to see
253+
improvements.
244254
:return: A pandas DataFrame representing the shared table.
245255
"""
246256
profile_json, share, schema, table = _parse_url(url)
247257
profile = DeltaSharingProfile.read_from_file(profile_json)
248258
return DeltaSharingReader(
249259
table=Table(name=table, share=share, schema=schema),
250260
rest_client=DataSharingRestClient(profile),
251-
use_delta_format=use_delta_format
252-
).table_changes_to_pandas(CdfOptions(
253-
starting_version=starting_version,
254-
ending_version=ending_version,
255-
starting_timestamp=starting_timestamp,
256-
ending_timestamp=ending_timestamp,
257-
# when using delta format, we need to get metadata changes and
258-
# handle them properly when replaying the delta log
259-
include_historical_metadata=use_delta_format
260-
))
261+
use_delta_format=use_delta_format,
262+
convert_in_batches=convert_in_batches,
263+
).table_changes_to_pandas(
264+
CdfOptions(
265+
starting_version=starting_version,
266+
ending_version=ending_version,
267+
starting_timestamp=starting_timestamp,
268+
ending_timestamp=ending_timestamp,
269+
# when using delta format, we need to get metadata changes and
270+
# handle them properly when replaying the delta log
271+
include_historical_metadata=use_delta_format,
272+
)
273+
)
261274

262275

263276
class SharingClient:

python/delta_sharing/protocol.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def from_json(json) -> "DeltaSharingProfile":
9797
type=type,
9898
endpoint=endpoint,
9999
bearer_token=json["bearerToken"],
100-
expiration_time=json.get("expirationTime")
100+
expiration_time=json.get("expirationTime"),
101101
)
102102
elif type == "basic":
103103
return DeltaSharingProfile(
@@ -109,8 +109,8 @@ def from_json(json) -> "DeltaSharingProfile":
109109
)
110110
else:
111111
raise ValueError(
112-
f"The current release does not supports {type} type. "
113-
"Please check type.")
112+
f"The current release does not supports {type} type. " "Please check type."
113+
)
114114
else:
115115
raise ValueError(
116116
"'shareCredentialsVersion' in the profile is "
@@ -153,8 +153,7 @@ class Table:
153153
def from_json(json) -> "Table":
154154
if isinstance(json, (str, bytes, bytearray)):
155155
json = loads(json)
156-
return Table(name=json["name"], share=json["share"],
157-
schema=json["schema"])
156+
return Table(name=json["name"], share=json["share"], schema=json["schema"])
158157

159158

160159
@dataclass(frozen=True)
@@ -234,7 +233,7 @@ def from_json(json) -> "Metadata":
234233
version=json.get("version", None),
235234
size=json.get("size", None),
236235
num_files=json.get("numFiles", None),
237-
created_time=delta_metadata.get("createdTime", None)
236+
created_time=delta_metadata.get("createdTime", None),
238237
)
239238
else:
240239
configuration = json.get("configuration", {})
@@ -248,7 +247,7 @@ def from_json(json) -> "Metadata":
248247
partition_columns=json["partitionColumns"],
249248
version=json.get("version", None),
250249
size=json.get("size", None),
251-
num_files=json.get("numFiles", None)
250+
num_files=json.get("numFiles", None),
252251
)
253252

254253

0 commit comments

Comments
 (0)