gh-103847: fix cancellation safety of asyncio.create_subprocess_exec (#140805)
This commit is contained in:
@@ -26,6 +26,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
|
||||
self._pending_calls = collections.deque()
|
||||
self._pipes = {}
|
||||
self._finished = False
|
||||
self._pipes_connected = False
|
||||
|
||||
if stdin == subprocess.PIPE:
|
||||
self._pipes[0] = None
|
||||
@@ -213,6 +214,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
|
||||
else:
|
||||
if waiter is not None and not waiter.cancelled():
|
||||
waiter.set_result(None)
|
||||
self._pipes_connected = True
|
||||
|
||||
def _call(self, cb, *data):
|
||||
if self._pending_calls is not None:
|
||||
@@ -256,6 +258,15 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
|
||||
assert not self._finished
|
||||
if self._returncode is None:
|
||||
return
|
||||
if not self._pipes_connected:
|
||||
# self._pipes_connected can be False if not all pipes were connected
|
||||
# because either the process failed to start or the self._connect_pipes task
|
||||
# got cancelled. In this broken state we consider all pipes disconnected and
|
||||
# to avoid hanging forever in self._wait as otherwise _exit_waiters
|
||||
# would never be woken up, we wake them up here.
|
||||
for waiter in self._exit_waiters:
|
||||
if not waiter.cancelled():
|
||||
waiter.set_result(self._returncode)
|
||||
if all(p is not None and p.disconnected
|
||||
for p in self._pipes.values()):
|
||||
self._finished = True
|
||||
|
||||
@@ -11,7 +11,7 @@ from asyncio import base_subprocess
|
||||
from asyncio import subprocess
|
||||
from test.test_asyncio import utils as test_utils
|
||||
from test import support
|
||||
from test.support import os_helper
|
||||
from test.support import os_helper, warnings_helper, gc_collect
|
||||
|
||||
if not support.has_subprocess_support:
|
||||
raise unittest.SkipTest("test module requires subprocess")
|
||||
@@ -879,6 +879,44 @@ class SubprocessMixin:
|
||||
|
||||
self.loop.run_until_complete(main())
|
||||
|
||||
@warnings_helper.ignore_warnings(category=ResourceWarning)
|
||||
def test_subprocess_read_pipe_cancelled(self):
|
||||
async def main():
|
||||
loop = asyncio.get_running_loop()
|
||||
loop.connect_read_pipe = mock.AsyncMock(side_effect=asyncio.CancelledError)
|
||||
with self.assertRaises(asyncio.CancelledError):
|
||||
await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED, stderr=asyncio.subprocess.PIPE)
|
||||
|
||||
asyncio.run(main())
|
||||
gc_collect()
|
||||
|
||||
@warnings_helper.ignore_warnings(category=ResourceWarning)
|
||||
def test_subprocess_write_pipe_cancelled(self):
|
||||
async def main():
|
||||
loop = asyncio.get_running_loop()
|
||||
loop.connect_write_pipe = mock.AsyncMock(side_effect=asyncio.CancelledError)
|
||||
with self.assertRaises(asyncio.CancelledError):
|
||||
await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED, stdin=asyncio.subprocess.PIPE)
|
||||
|
||||
asyncio.run(main())
|
||||
gc_collect()
|
||||
|
||||
@warnings_helper.ignore_warnings(category=ResourceWarning)
|
||||
def test_subprocess_read_write_pipe_cancelled(self):
|
||||
async def main():
|
||||
loop = asyncio.get_running_loop()
|
||||
loop.connect_read_pipe = mock.AsyncMock(side_effect=asyncio.CancelledError)
|
||||
loop.connect_write_pipe = mock.AsyncMock(side_effect=asyncio.CancelledError)
|
||||
with self.assertRaises(asyncio.CancelledError):
|
||||
await asyncio.create_subprocess_exec(
|
||||
*PROGRAM_BLOCKED,
|
||||
stdin=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
|
||||
asyncio.run(main())
|
||||
gc_collect()
|
||||
|
||||
if sys.platform != 'win32':
|
||||
# Unix
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
Fix hang when cancelling process created by :func:`asyncio.create_subprocess_exec` or :func:`asyncio.create_subprocess_shell`. Patch by Kumar Aditya.
|
||||
Reference in New Issue
Block a user