Skip to content

Commit 4a487e3

Browse files
author
mat
committed
amb typing to match amb_ (add Future)
Curry flip amb Typing on fromfuture tests Doc: Remove 3rd observable in amb since it takes only 2 Doc: show that completion follows winner
1 parent fe265ef commit 4a487e3

File tree

3 files changed

+87
-88
lines changed

3 files changed

+87
-88
lines changed

reactivex/operators/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,8 @@ def amb(right_source: Observable[_T]) -> Callable[[Observable[_T]], Observable[_
102102
.. marble::
103103
:alt: amb
104104
105-
---8--6--9-----------|
105+
---8--6--9---------|
106106
--1--2--3---5--------|
107-
----------10-20-30---|
108107
[ amb() ]
109108
--1--2--3---5--------|
110109

reactivex/operators/_amb.py

Lines changed: 73 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,92 +1,91 @@
11
from asyncio import Future
2-
from typing import Callable, List, Optional, TypeVar, Union
2+
from typing import List, Optional, TypeVar, Union
33

44
from reactivex import Observable, abc, from_future
5+
from reactivex.curry import curry_flip
56
from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable
67

78
_T = TypeVar("_T")
89

910

11+
@curry_flip(1)
1012
def amb_(
11-
right_source: Union[Observable[_T], "Future[_T]"]
12-
) -> Callable[[Observable[_T]], Observable[_T]]:
13+
left_source: Observable[_T], right_source: Union[Observable[_T], "Future[_T]"]
14+
) -> Observable[_T]:
1315

1416
if isinstance(right_source, Future):
1517
obs: Observable[_T] = from_future(right_source)
1618
else:
1719
obs = right_source
1820

19-
def amb(left_source: Observable[_T]) -> Observable[_T]:
20-
def subscribe(
21-
observer: abc.ObserverBase[_T],
22-
scheduler: Optional[abc.SchedulerBase] = None,
23-
) -> abc.DisposableBase:
24-
choice: List[Optional[str]] = [None]
25-
left_choice = "L"
26-
right_choice = "R"
27-
left_subscription = SingleAssignmentDisposable()
28-
right_subscription = SingleAssignmentDisposable()
29-
30-
def choice_left():
31-
if not choice[0]:
32-
choice[0] = left_choice
33-
right_subscription.dispose()
34-
35-
def choice_right():
36-
if not choice[0]:
37-
choice[0] = right_choice
38-
left_subscription.dispose()
39-
40-
def on_next_left(value: _T) -> None:
41-
with left_source.lock:
42-
choice_left()
43-
if choice[0] == left_choice:
44-
observer.on_next(value)
45-
46-
def on_error_left(err: Exception) -> None:
47-
with left_source.lock:
48-
choice_left()
49-
if choice[0] == left_choice:
50-
observer.on_error(err)
51-
52-
def on_completed_left() -> None:
53-
with left_source.lock:
54-
choice_left()
55-
if choice[0] == left_choice:
56-
observer.on_completed()
57-
58-
left_d = left_source.subscribe(
59-
on_next_left, on_error_left, on_completed_left, scheduler=scheduler
60-
)
61-
left_subscription.disposable = left_d
62-
63-
def send_right(value: _T) -> None:
64-
with left_source.lock:
65-
choice_right()
66-
if choice[0] == right_choice:
67-
observer.on_next(value)
68-
69-
def on_error_right(err: Exception) -> None:
70-
with left_source.lock:
71-
choice_right()
72-
if choice[0] == right_choice:
73-
observer.on_error(err)
74-
75-
def on_completed_right() -> None:
76-
with left_source.lock:
77-
choice_right()
78-
if choice[0] == right_choice:
79-
observer.on_completed()
80-
81-
right_d = obs.subscribe(
82-
send_right, on_error_right, on_completed_right, scheduler=scheduler
83-
)
84-
right_subscription.disposable = right_d
85-
return CompositeDisposable(left_subscription, right_subscription)
86-
87-
return Observable(subscribe)
88-
89-
return amb
21+
def subscribe(
22+
observer: abc.ObserverBase[_T],
23+
scheduler: Optional[abc.SchedulerBase] = None,
24+
) -> abc.DisposableBase:
25+
choice: List[Optional[str]] = [None]
26+
left_choice = "L"
27+
right_choice = "R"
28+
left_subscription = SingleAssignmentDisposable()
29+
right_subscription = SingleAssignmentDisposable()
30+
31+
def choice_left():
32+
if not choice[0]:
33+
choice[0] = left_choice
34+
right_subscription.dispose()
35+
36+
def choice_right():
37+
if not choice[0]:
38+
choice[0] = right_choice
39+
left_subscription.dispose()
40+
41+
def on_next_left(value: _T) -> None:
42+
with left_source.lock:
43+
choice_left()
44+
if choice[0] == left_choice:
45+
observer.on_next(value)
46+
47+
def on_error_left(err: Exception) -> None:
48+
with left_source.lock:
49+
choice_left()
50+
if choice[0] == left_choice:
51+
observer.on_error(err)
52+
53+
def on_completed_left() -> None:
54+
with left_source.lock:
55+
choice_left()
56+
if choice[0] == left_choice:
57+
observer.on_completed()
58+
59+
left_d = left_source.subscribe(
60+
on_next_left, on_error_left, on_completed_left, scheduler=scheduler
61+
)
62+
left_subscription.disposable = left_d
63+
64+
def send_right(value: _T) -> None:
65+
with left_source.lock:
66+
choice_right()
67+
if choice[0] == right_choice:
68+
observer.on_next(value)
69+
70+
def on_error_right(err: Exception) -> None:
71+
with left_source.lock:
72+
choice_right()
73+
if choice[0] == right_choice:
74+
observer.on_error(err)
75+
76+
def on_completed_right() -> None:
77+
with left_source.lock:
78+
choice_right()
79+
if choice[0] == right_choice:
80+
observer.on_completed()
81+
82+
right_d = obs.subscribe(
83+
send_right, on_error_right, on_completed_right, scheduler=scheduler
84+
)
85+
right_subscription.disposable = right_d
86+
return CompositeDisposable(left_subscription, right_subscription)
87+
88+
return Observable(subscribe)
9089

9190

9291
__all__ = ["amb_"]

tests/test_observable/test_fromfuture.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import unittest
33
from asyncio import Future
4+
from typing import Any
45

56
import reactivex
67

@@ -11,15 +12,15 @@ def test_future_success(self):
1112
success = [False, True, False]
1213

1314
async def go():
14-
future = Future()
15+
future: Future[int] = Future()
1516
future.set_result(42)
1617

1718
source = reactivex.from_future(future)
1819

19-
def on_next(x):
20+
def on_next(x: int):
2021
success[0] = x == 42
2122

22-
def on_error(err):
23+
def on_error(_err: Exception):
2324
success[1] = False
2425

2526
def on_completed():
@@ -37,15 +38,15 @@ def test_future_failure(self):
3738
async def go():
3839
error = Exception("woops")
3940

40-
future = Future()
41+
future: Future[Any] = Future()
4142
future.set_exception(error)
4243

4344
source = reactivex.from_future(future)
4445

45-
def on_next(x):
46+
def on_next(x: Any):
4647
success[0] = False
4748

48-
def on_error(err):
49+
def on_error(err: Exception):
4950
success[1] = str(err) == str(error)
5051

5152
def on_completed():
@@ -61,13 +62,13 @@ def test_future_cancel(self):
6162
success = [True, False, True]
6263

6364
async def go():
64-
future = Future()
65+
future: Future[Any] = Future()
6566
source = reactivex.from_future(future)
6667

67-
def on_next(x):
68+
def on_next(x: Any):
6869
success[0] = False
6970

70-
def on_error(err):
71+
def on_error(err: Any):
7172
success[1] = type(err) == asyncio.CancelledError
7273

7374
def on_completed():
@@ -84,15 +85,15 @@ def test_future_dispose(self):
8485
success = [True, True, True]
8586

8687
async def go():
87-
future = Future()
88+
future: Future[int] = Future()
8889
future.set_result(42)
8990

9091
source = reactivex.from_future(future)
9192

92-
def on_next(x):
93+
def on_next(x: int):
9394
success[0] = False
9495

95-
def on_error(err):
96+
def on_error(err: Exception):
9697
success[1] = False
9798

9899
def on_completed():

0 commit comments

Comments
 (0)