Skip to content

Commit fe265ef

Browse files
author
mat
committed
Remove average extra check
Fix typing on timestamp
1 parent 5356830 commit fe265ef

File tree

5 files changed

+68
-73
lines changed

5 files changed

+68
-73
lines changed

reactivex/operators/_average.py

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from dataclasses import dataclass
2-
from typing import Any, Callable, Optional, TypeVar, cast
2+
from typing import Any, Optional, TypeVar, cast
33

44
from reactivex import Observable, operators, typing
5+
from reactivex.curry import curry_flip
56

67
_T = TypeVar("_T")
78

@@ -12,51 +13,47 @@ class AverageValue:
1213
count: int
1314

1415

16+
@curry_flip(1)
1517
def average_(
18+
source: Observable[Any],
1619
key_mapper: Optional[typing.Mapper[_T, float]] = None,
17-
) -> Callable[[Observable[_T]], Observable[float]]:
18-
def average(source: Observable[Any]) -> Observable[float]:
19-
"""Partially applied average operator.
20+
) -> Observable[float]:
21+
"""Partially applied average operator.
2022
21-
Computes the average of an observable sequence of values that
22-
are in the sequence or obtained by invoking a transform
23-
function on each element of the input sequence if present.
23+
Computes the average of an observable sequence of values that
24+
are in the sequence or obtained by invoking a transform
25+
function on each element of the input sequence if present.
2426
25-
Examples:
26-
>>> res = average(source)
27+
Examples:
28+
>>> res = average(source)
2729
28-
Args:
29-
source: Source observable to average.
30+
Args:
31+
source: Source observable to average.
3032
31-
Returns:
32-
An observable sequence containing a single element with the
33-
average of the sequence of values.
34-
"""
33+
Returns:
34+
An observable sequence containing a single element with the
35+
average of the sequence of values.
36+
"""
3537

36-
key_mapper_: typing.Mapper[_T, float] = key_mapper or (
37-
lambda x: float(cast(Any, x))
38-
)
38+
key_mapper_: typing.Mapper[_T, float] = key_mapper or (
39+
lambda x: float(cast(Any, x))
40+
)
3941

40-
def accumulator(prev: AverageValue, cur: float) -> AverageValue:
41-
return AverageValue(sum=prev.sum + cur, count=prev.count + 1)
42+
def accumulator(prev: AverageValue, cur: float) -> AverageValue:
43+
return AverageValue(sum=prev.sum + cur, count=prev.count + 1)
4244

43-
def mapper(s: AverageValue) -> float:
44-
if s.count == 0:
45-
raise Exception("The input sequence was empty")
45+
def mapper(s: AverageValue) -> float:
46+
return s.sum / float(s.count)
4647

47-
return s.sum / float(s.count)
48+
seed = AverageValue(sum=0, count=0)
4849

49-
seed = AverageValue(sum=0, count=0)
50-
51-
ret = source.pipe(
52-
operators.map(key_mapper_),
53-
operators.scan(accumulator, seed),
54-
operators.last(),
55-
operators.map(mapper),
56-
)
57-
return ret
58-
59-
return average
50+
ret = source.pipe(
51+
operators.map(key_mapper_),
52+
operators.scan(accumulator, seed),
53+
operators.last(),
54+
operators.map(mapper),
55+
)
56+
return ret
6057

6158

6259
__all__ = ["average_"]

reactivex/operators/_take.py

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,52 @@
1-
from typing import Callable, Optional, TypeVar
1+
from typing import Optional, TypeVar, cast
22

33
from reactivex import Observable, abc, empty
4+
from reactivex.curry import curry_flip
45
from reactivex.internal import ArgumentOutOfRangeException
56

67
_T = TypeVar("_T")
78

89

9-
def take_(count: int) -> Callable[[Observable[_T]], Observable[_T]]:
10+
@curry_flip(1)
11+
def take_(source: Observable[_T], count: int) -> Observable[_T]:
1012
if count < 0:
1113
raise ArgumentOutOfRangeException()
1214

13-
def take(source: Observable[_T]) -> Observable[_T]:
14-
"""Returns a specified number of contiguous elements from the start of
15-
an observable sequence.
15+
"""Returns a specified number of contiguous elements from the start of
16+
an observable sequence.
1617
17-
>>> take(source)
18+
>>> take(source)
1819
19-
Keyword arguments:
20-
count -- The number of elements to return.
20+
Keyword arguments:
21+
count -- The number of elements to return.
2122
22-
Returns an observable sequence that contains the specified number of
23-
elements from the start of the input sequence.
24-
"""
23+
Returns an observable sequence that contains the specified number of
24+
elements from the start of the input sequence.
25+
"""
2526

26-
if not count:
27-
return empty()
27+
if not count:
28+
return cast(Observable[_T], empty())
2829

29-
def subscribe(
30-
observer: abc.ObserverBase[_T],
31-
scheduler: Optional[abc.SchedulerBase] = None,
32-
):
33-
remaining = count
30+
def subscribe(
31+
observer: abc.ObserverBase[_T],
32+
scheduler: Optional[abc.SchedulerBase] = None,
33+
):
34+
remaining = count
3435

35-
def on_next(value: _T) -> None:
36-
nonlocal remaining
36+
def on_next(value: _T) -> None:
37+
nonlocal remaining
3738

38-
if remaining > 0:
39-
remaining -= 1
40-
observer.on_next(value)
41-
if not remaining:
42-
observer.on_completed()
39+
if remaining > 0:
40+
remaining -= 1
41+
observer.on_next(value)
42+
if not remaining:
43+
observer.on_completed()
4344

44-
return source.subscribe(
45-
on_next, observer.on_error, observer.on_completed, scheduler=scheduler
46-
)
45+
return source.subscribe(
46+
on_next, observer.on_error, observer.on_completed, scheduler=scheduler
47+
)
4748

48-
return Observable(subscribe)
49-
50-
return take
49+
return Observable(subscribe)
5150

5251

5352
__all__ = ["take_"]

reactivex/operators/_throttlefirst.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from datetime import datetime
2-
from typing import Callable, Optional, TypeVar
2+
from typing import Optional, TypeVar
33

44
from reactivex import Observable, abc, typing
55
from reactivex.curry import curry_flip
@@ -13,7 +13,7 @@ def throttle_first_(
1313
source: Observable[_T],
1414
window_duration: typing.RelativeTime,
1515
scheduler: Optional[abc.SchedulerBase] = None,
16-
) -> Callable[[Observable[_T]], Observable[_T]]:
16+
) -> Observable[_T]:
1717
"""Returns an observable that emits only the first item emitted
1818
by the source Observable during sequential time windows of a
1919
specified duration.

reactivex/operators/_timestamp.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from dataclasses import dataclass
22
from datetime import datetime
3-
from typing import Callable, Generic, Optional, TypeVar
3+
from typing import Generic, Optional, TypeVar
44

55
from reactivex import Observable, abc, defer, operators
66
from reactivex.curry import curry_flip
@@ -19,7 +19,7 @@ class Timestamp(Generic[_T]):
1919
def timestamp_(
2020
source: Observable[_T],
2121
scheduler: Optional[abc.SchedulerBase] = None,
22-
) -> Callable[[Observable[_T]], Observable[Timestamp[_T]]]:
22+
) -> Observable[Timestamp[_T]]:
2323
"""Records the timestamp for each value in an observable sequence.
2424
2525
Examples:

tests/test_observable/test_average.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import unittest
22

33
from reactivex import operators as _
4+
from reactivex.internal.exceptions import SequenceContainsNoElementsError
45
from reactivex.testing import ReactiveTest, TestScheduler
56

67
on_next = ReactiveTest.on_next
@@ -19,9 +20,7 @@ def test_average_int32_empty(self):
1920
xs = scheduler.create_hot_observable(msgs)
2021
res = scheduler.start(create=lambda: xs.pipe(_.average())).messages
2122

22-
assert len(res) == 1
23-
assert res[0].value.kind == "E" and res[0].value.exception != None
24-
assert res[0].time == 250
23+
assert res == [on_error(250, SequenceContainsNoElementsError())]
2524

2625
def test_average_int32_return(self):
2726
scheduler = TestScheduler()

0 commit comments

Comments
 (0)