Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fork deadlock #299

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 69 additions & 36 deletions diskcache/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,44 +707,48 @@ def transact(self, retry=False):

@cl.contextmanager
def _transact(self, retry=False, filename=None):
sql = self._sql
filenames = []
_disk_remove = self._disk.remove
tid = threading.get_ident()
txn_id = self._txn_id
_acquireLock()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried that acquiring the lock at the start of _transact and releasing it at the end is holding it for too long. The yield in the middle of the context manager means the lock could be held for a long while. I haven't thought of a specific scenario that makes this problematic but I'm thinking about it.

The fork system call copies all memory but not all threads. Only the currently executing thread is copied to the forked process. So if one thread holds the SQLite transaction lock and another thread forks ... what's the problem? I don't see how the SQLite transaction remains forever locked.

try:
sql = self._sql
filenames = []
_disk_remove = self._disk.remove
tid = threading.get_ident()
txn_id = self._txn_id

if tid == txn_id:
begin = False
else:
while True:
try:
sql('BEGIN IMMEDIATE')
begin = True
self._txn_id = tid
break
except sqlite3.OperationalError:
if retry:
continue
if filename is not None:
_disk_remove(filename)
raise Timeout from None
if tid == txn_id:
begin = False
else:
while True:
try:
sql('BEGIN IMMEDIATE')
begin = True
self._txn_id = tid
break
except sqlite3.OperationalError:
if retry:
continue
if filename is not None:
_disk_remove(filename)
raise Timeout from None

try:
yield sql, filenames.append
except BaseException:
if begin:
assert self._txn_id == tid
self._txn_id = None
sql('ROLLBACK')
raise
else:
if begin:
assert self._txn_id == tid
self._txn_id = None
sql('COMMIT')
for name in filenames:
if name is not None:
_disk_remove(name)
try:
yield sql, filenames.append
except BaseException:
if begin:
assert self._txn_id == tid
self._txn_id = None
sql('ROLLBACK')
raise
else:
if begin:
assert self._txn_id == tid
self._txn_id = None
sql('COMMIT')
for name in filenames:
if name is not None:
_disk_remove(name)
finally:
_releaseLock()

def set(self, key, value, expire=None, read=False, tag=None, retry=False):
"""Set `key` and `value` item in cache.
Expand Down Expand Up @@ -2450,3 +2454,32 @@ def reset(self, key, value=ENOVAL, update=True):

setattr(self, key, value)
return value

if hasattr(os, 'register_at_fork'):
_lock = threading.RLock()

def _acquireLock():
global _lock
try:
_lock.acquire()
except BaseException:
_lock.release()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could an exception occur here in which the lock is released twice?

raise

def _releaseLock():
global _lock
_lock.release()

def _after_at_fork_child_reinit_locks():
global _lock
_lock = threading.RLock()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why re-init the lock in the child? I would expect the child to already have a copy of the lock.


os.register_at_fork(before=_acquireLock,
after_in_child=_after_at_fork_child_reinit_locks,
after_in_parent=_releaseLock)
else:
def _acquireLock():
pass

def _releaseLock():
pass
71 changes: 71 additions & 0 deletions tests/test_fork_multithreading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
Test diskcache.core.Cache behaviour when process is forking.
Make sure it does not deadlock on the sqlite3 transaction lock if
forked while the lock is in use.
"""

import errno
import hashlib
import io
import os
import os.path as op
import sys
import pathlib
import pickle
import shutil
import sqlite3
import subprocess as sp
import tempfile
import threading
import time
import warnings
from threading import Thread
from unittest import mock

if sys.platform != "win32":
import signal

import pytest

import diskcache as dc

REPEATS = 1000

@pytest.fixture
def cache():
with dc.Cache() as cache:
yield cache
shutil.rmtree(cache.directory, ignore_errors=True)

def _test_thread_imp(cache):
for i in range(REPEATS * 10):
cache.set(i, i)

def _test_wait_pid(pid):
_, status = os.waitpid(pid, 0)
assert status == 0, "Child died unexpectedly"

@pytest.mark.skipif(sys.platform == "win32", reason="no fork on Windows")
def test_fork_multithreading(cache):
thread = Thread(target=_test_thread_imp, args=(cache,))
thread.start()
try:
for i in range(REPEATS):
pid = os.fork()
if pid == 0:
cache.set(i, 0)
os._exit(0)
else:
thread = Thread(target=_test_wait_pid, args=(pid,))
thread.start()
thread.join(timeout=10)
if thread.is_alive():
os.kill(pid, signal.SIGKILL)
thread.join()
assert False, "Deadlock detected."
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some more comments to explain how this test works? I appreciate the test a lot but am not sure how to follow it.

except OSError as e:
if e.errno != errno.EINTR:
raise

thread.join()