Skip to content

Commit a5ff5ef

Browse files
murilommenAnthony Naddeo
andauthored
Refac: Add generic process status check for ProcessRollingLogger (#1443)
## Description This repository needs to make the `status` method that is declared on ProcessRollingLogger to be placed 1 structure above, so that any ProcessActor might benefit from it. - [x] I have reviewed the [Guidelines for Contributing](CONTRIBUTING.md) and the [Code of Conduct](CODE_OF_CONDUCT.md). --------- Co-authored-by: Anthony Naddeo <[email protected]>
1 parent 3502a63 commit a5ff5ef

File tree

9 files changed

+218
-167
lines changed

9 files changed

+218
-167
lines changed

python/Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pre-commit: proto
2828
poetry run pyright
2929

3030
.PHONY: dist clean clean-test help format lint test install coverage docs livedocs default proto proto-v0 github release
31-
.PHONY: test-system-python format-fix blackd jupyter-kernel docs livedocs
31+
.PHONY: test-system-python format-fix blackd jupyter-kernel docs livedocs fix
3232

3333
ifeq ($(shell which poetry), )
3434
$(error "Can't find poetry on the path. Install it at https://python-poetry.org/docs.")
@@ -133,6 +133,8 @@ lint-fix: ## Automatically fix linting issues.
133133
@$(call i, Running the linter)
134134
poetry run autoflake --in-place --remove-all-unused-imports --remove-unused-variables $(src.python) $(tst.python)
135135

136+
fix: lint-fix format-fix ## Automatically fix linting and formatting issues.
137+
136138
format: ## Check style formatting.
137139
@$(call w, format is deprecated running pre-commit instead)
138140
poetry run pre-commit run --all-files

python/tests/api/logger/experimental/logger/actor/test_actor_loggers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ def test_process_throws_after_killed(actor: Tuple[DataLogger, FakeWriter]) -> No
402402
won't have time to start returning false. This tests the case where something kills the process
403403
while something else is trying to use it.
404404
"""
405-
logger, writer = actor
405+
logger, _ = actor
406406
if isinstance(logger, ProcessRollingLogger):
407407
logger = cast(ProcessRollingLogger, logger) # type: ignore
408408
ms = 1689881671000
@@ -423,7 +423,7 @@ def test_process_throws_after_killed_delay(actor: Tuple[DataLogger, FakeWriter])
423423
Very similar to test_process_throws_after_killed but there is a delay after the process is killed
424424
before logging so the log() call will throw before doing any actual work with a clear error message.
425425
"""
426-
logger, writer = actor
426+
logger, _ = actor
427427
if isinstance(logger, ProcessRollingLogger):
428428
logger = cast(ProcessRollingLogger, logger) # type: ignore
429429
ms = 1689881671000

python/tests/api/logger/experimental/logger/actor/test_actors.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import multiprocessing as mp
22
import os
33
from dataclasses import dataclass
4-
from typing import List, Type, Union
4+
from typing import List, Type, TypeVar, Union
55

66
import pytest
77

@@ -23,6 +23,9 @@ class Message2:
2323
pass
2424

2525

26+
StatusType = TypeVar("StatusType")
27+
28+
2629
Messages = Union[Message1, Message2]
2730

2831

@@ -52,7 +55,7 @@ def process_batch(
5255
raise Exception(f"Unknown batch type: {batch_type}")
5356

5457

55-
class CountingMPProcessActor(ProcessActor[Messages]):
58+
class CountingMPProcessActor(ProcessActor[Messages, StatusType]):
5659
def __init__(self, queue_config: QueueConfig = QueueConfig()) -> None:
5760
super().__init__(queue_config, queue_type=QueueType.MP)
5861
self.counter = Counter()
@@ -63,7 +66,7 @@ def process_batch(
6366
self.counter.process_batch(batch, batch_type)
6467

6568

66-
class CountingFasterFifoProcessActor(ProcessActor[Messages]):
69+
class CountingFasterFifoProcessActor(ProcessActor[Messages, StatusType]):
6770
def __init__(self, queue_config: QueueConfig = QueueConfig()) -> None:
6871
super().__init__(queue_config, queue_type=QueueType.FASTER_FIFO)
6972
self.counter = Counter()
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# flake8: noqa
22
from whylogs.api.usage_stats import emit_usage
33

4-
from .profiler import fugue_profile
4+
# This import has a side effect
5+
from .profiler import fugue_profile # type: ignore
56

67
emit_usage("fugue")

python/whylogs/api/logger/experimental/logger/actor/actor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,11 @@ def _load_messages(self) -> Optional[List[Union[MessageType, CloseMessage]]]:
184184

185185
while self._polling_condition(len(batch), max, last_message_time, self._queue.size()):
186186
try:
187-
batch += self._queue.get_many(timeout=self._queue_config.message_poll_wait, max=max)
188-
self._logger.info(f"Got {len(batch)} messages. {self._queue.size()} remaining")
187+
next_batch = self._queue.get_many(timeout=self._queue_config.message_poll_wait, max=max)
188+
batch += next_batch
189+
self._logger.debug(
190+
f"Adding {len(next_batch)} to poll batch of length {len(batch)}. {self._queue.size()} remaining"
191+
)
189192
except queue.Empty:
190193
if self.is_closed() and self.close_message_handled():
191194
self._logger.info("Queue closed and no more messages to process.")
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import logging
2+
import threading as th
3+
from concurrent.futures import Future
4+
from queue import Empty
5+
from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar
6+
7+
from whylogs.api.logger.experimental.logger.actor.actor import QueueConfig
8+
from whylogs.api.logger.experimental.logger.actor.faster_fifo_queue_wrapper import (
9+
FasterQueueWrapper,
10+
)
11+
12+
T = TypeVar("T")
13+
14+
15+
class PipeSignaler(th.Thread, Generic[T]):
16+
"""
17+
A thread that listens on a pipe for messages and signals the corresponding futures.
18+
This class is used in the process logger to enable synchronous logging requests across processes.
19+
It's essentially a dictionary of futures that are registered by the main process and signaled by the
20+
child process. A lot of the behavior is implicit because it involves properties of processes, so it's
21+
worth documenting here.
22+
- This thread has to be started from the main process, which means it has to be started right before the
23+
process logger is started (before the os.fork under the hood). It has to be started from the main process
24+
because the main process will be registering futures on it, and those can't cross the process boundary.
25+
- The parent and child process each have references to the pipes and they each need to close their references,
26+
which means close_child has to be called from the child process and close has to be called from the parent.
27+
Calling close_child in the main processing code will have right effect.
28+
- The process actor does message batching so multiple ids may be signaled even though a single batch was processed
29+
because that batch could have contained multiple messages.
30+
- The signaler uses Events under the hood to know when to stop working. They can be th.Events even though this
31+
is being used in a multiprocessing environment because nothing the child does can affect them. Keep in mind
32+
that introducing any behavior on the child side that depends on knowing whether those events are set won't work
33+
though, they would have to be switched to mp.Events for that.
34+
This class should really never be used by anyone in most cases. It will just slow down the main process by making
35+
it wait for logging to complete, but it enables a lot of testing and debugging.
36+
"""
37+
38+
def __init__(self) -> None:
39+
super().__init__()
40+
self.daemon = True
41+
self._logger = logging.getLogger(__name__)
42+
self._queue_config = QueueConfig()
43+
self.queue: FasterQueueWrapper[Tuple[str, Optional[Exception], Optional[T]]] = FasterQueueWrapper(
44+
self._queue_config
45+
)
46+
self.futures: Dict[str, "Future[Any]"] = {}
47+
self._end_polling = th.Event()
48+
self._done = th.Event()
49+
50+
def signal(self, result: Tuple[str, Optional[Exception], Optional[T]]) -> None:
51+
"""
52+
Signal that a message was handled by sending a tuple of (message id, exception, data).
53+
data and exception can be None.
54+
This should be called from the child process.
55+
"""
56+
self.queue.send(result)
57+
58+
def signal_many(self, results: List[Tuple[str, Optional[Exception], Optional[T]]]) -> None:
59+
self.queue.send_many(results)
60+
61+
def register(self, future: "Future[T]", message_id: str) -> None:
62+
"""
63+
Register a future to be signaled when the message id is received.
64+
This should be called from the parent process.
65+
"""
66+
self._logger.debug(f"Received register request for id {message_id}")
67+
self.futures[message_id] = future
68+
69+
def _start_poll_conn(self) -> None:
70+
while not self._end_polling.is_set():
71+
try:
72+
messages = self.queue.get_many(
73+
timeout=self._queue_config.message_poll_wait,
74+
max=self._queue_config.max_batch_size,
75+
)
76+
77+
for message_id, exception, data in messages:
78+
self._logger.debug(f"Received message id {message_id}")
79+
future: Optional["Future[Any]"] = self.futures.pop(message_id, None)
80+
if future is not None:
81+
self._logger.debug(f"Setting result for message id {message_id} {exception}")
82+
if exception is not None:
83+
future.set_exception(exception)
84+
else:
85+
# Doing this in the else avoid testing `data is None`, which would be
86+
# wrong because data can potentially be None if T is Optional
87+
future.set_result(data)
88+
89+
except Empty:
90+
continue
91+
except Exception as e:
92+
self._logger.exception(f"Error in queue {e}")
93+
break
94+
95+
self._done.set()
96+
97+
def run(self) -> None:
98+
self._start_poll_conn()
99+
100+
def close_child(self) -> None:
101+
"""
102+
This method is no longer needed as queues do not require manual closing of file descriptors.
103+
"""
104+
105+
def close(self) -> None:
106+
"""
107+
Closes the thread and all resources. This should be
108+
called from the parent side.
109+
"""
110+
self._end_polling.set()
111+
self._done.wait()
112+
self.queue.close() # type: ignore[reportUnknownMemberType]
113+
self.join()

python/whylogs/api/logger/experimental/logger/actor/process_actor.py

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,46 @@
11
import multiprocessing as mp
22
import signal
33
import sys
4+
from concurrent.futures import Future
45
from enum import Enum
5-
from typing import Generic, TypeVar
6+
from typing import Generic, Optional, Sequence, TypeVar, Union
67

78
from whylogs.api.logger.experimental.logger.actor.actor import (
89
Actor,
910
QueueConfig,
1011
QueueWrapper,
1112
)
13+
from whylogs.api.logger.experimental.logger.actor.future_util import wait_result_while
14+
from whylogs.api.logger.experimental.logger.actor.pipe_signaler import PipeSignaler
15+
from whylogs.api.logger.experimental.logger.actor.process_rolling_logger_messages import (
16+
ProcessStatusMessage,
17+
SyncMessage,
18+
)
1219
from whylogs.api.logger.experimental.logger.actor.signal_util import suspended_signals
1320

21+
StatusType = TypeVar("StatusType")
22+
ProcessMessageType = TypeVar("ProcessMessageType")
23+
1424

1525
class QueueType(Enum):
1626
MP = "MP"
1727
FASTER_FIFO = "FASTER_FIFO"
1828

1929

20-
ProcessMessageType = TypeVar("ProcessMessageType")
21-
22-
23-
class ProcessActor(Actor[ProcessMessageType], mp.Process, Generic[ProcessMessageType]):
30+
class ProcessActor(
31+
Actor[Union[ProcessMessageType, ProcessStatusMessage]], mp.Process, Generic[ProcessMessageType, StatusType]
32+
):
2433
"""
2534
Subclass of Actor that uses a process to process messages.
2635
"""
2736

2837
_wrapper: QueueWrapper[ProcessMessageType]
2938

3039
def __init__(
31-
self, queue_config: QueueConfig = QueueConfig(), queue_type: QueueType = QueueType.FASTER_FIFO
40+
self,
41+
queue_config: QueueConfig = QueueConfig(),
42+
queue_type: QueueType = QueueType.FASTER_FIFO,
43+
sync_enabled: bool = False,
3244
) -> None:
3345
if queue_type == QueueType.MP:
3446
from whylogs.api.logger.experimental.logger.actor.mp_queue_wrapper import (
@@ -45,6 +57,8 @@ def __init__(
4557
else:
4658
raise ValueError(f"Unknown queue type: {queue_type}")
4759

60+
self._sync_enabled = sync_enabled
61+
self._pipe_signaler: Optional[PipeSignaler[StatusType]] = PipeSignaler() if self._sync_enabled is True else None
4862
self._event = mp.Event()
4963
self._is_closed = mp.Event()
5064
self._close_handled = mp.Event()
@@ -87,6 +101,31 @@ def close(self) -> None:
87101
super().close()
88102
self._wrapper.close()
89103

104+
def _signal(self, messages: Sequence[SyncMessage] = [], error: Optional[Exception] = None) -> None:
105+
if self._pipe_signaler is None:
106+
return
107+
108+
for message in messages:
109+
if message.sync:
110+
self._pipe_signaler.signal((message.id, error, None))
111+
112+
def status(self, timeout: Optional[float] = 1.0) -> StatusType:
113+
"""
114+
Get the internal status of the Process Actor. Used for diagnostics and debugging.
115+
This is always synchronous and requires the ProcessActor to be created with sync_enabled=True.
116+
"""
117+
if self._pipe_signaler is None:
118+
raise Exception(
119+
"Can't log synchronously without a pipe signaler. Initialize the process logger with sync_enabled=True."
120+
)
121+
122+
# add a sync flag to Message
123+
message = ProcessStatusMessage()
124+
future: "Future[StatusType]" = Future()
125+
self._pipe_signaler.register(future, message.id)
126+
self.send(message)
127+
return wait_result_while(future, self.is_alive)
128+
90129
def run(self) -> None:
91130
try:
92131
with suspended_signals(signal.SIGINT, signal.SIGTERM):

0 commit comments

Comments
 (0)