Skip to content

Commit

Permalink
greatly simplify syncing of indexed-but-moved dirs
Browse files Browse the repository at this point in the history
Personal benchmarks show a very significant performance gain when
calling get_path(). This is because we are no longer scanning all the
contents of every indexed-but-moved directory in _sync_all(), but rather
intelligently appending the new paths of the directories to check for
dirtiness to a deque used in the body of _sync_all().

Also fixes a few bugs (sorry, couldn't split this commit up):
- Indexing symlinks always indexes the real path that symlink refers to
- Fixes bug with the way new paths were calculated in recursive moves
  • Loading branch information
dlqqq committed Aug 17, 2022
1 parent 8da5648 commit fb8cc40
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 86 deletions.
5 changes: 3 additions & 2 deletions jupyter_server/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,12 +475,13 @@ def delete_fid_db(fid_db_path):


@pytest.fixture
def fid_manager(fid_db_path, tmp_path):
def fid_manager(fid_db_path, jp_root_dir):
"""Fixture returning a test-configured instance of `FileIdManager`."""
fid_manager = FileIdManager(db_path=fid_db_path, root_dir=str(tmp_path))
fid_manager = FileIdManager(db_path=fid_db_path, root_dir=str(jp_root_dir))
# disable journal so no temp journal file is created under `tmp_path`.
# reduces test flakiness since sometimes journal file has same ino and
# crtime as a deleted file, so FID manager detects it wrongly as a move
# also makes tests run faster :)
fid_manager.con.execute("PRAGMA journal_mode = OFF")
return fid_manager

Expand Down
199 changes: 120 additions & 79 deletions jupyter_server/services/contents/fileidmanager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import sqlite3
import stat
from typing import Optional
from collections import deque
from typing import Deque, Optional

from jupyter_core.paths import jupyter_data_dir
from traitlets import TraitError, Unicode, validate
Expand All @@ -13,6 +14,7 @@ class StatStruct:
crtime: Optional[int]
mtime: int
is_dir: bool
is_symlink: bool


class FileIdManager(LoggingConfigurable):
Expand Down Expand Up @@ -100,83 +102,103 @@ def _sync_all(self):
-----
A dirty directory is a directory that is either:
- unindexed
- indexed but moved
- indexed but with different `mtime`
Dirty directories contain possibly indexed but moved files as children.
Hence we need to call _sync_file() on their contents via _sync_dir().
Indexed directories that are dirty solely because of mtime difference
are included in the below SELECT query. Unindexed or indexed-but-moved
dirty directories are not included in the query, and hence must be
handled in _sync_dir().
Indexed directories with mtime difference are handled in this method
body. Unindexed dirty directories are handled immediately when
encountered in _sync_dir().
sync_deque is an additional deque of directories that should be checked
for dirtiness, and is appended to whenever _sync_file() encounters an
indexed directory that was moved out-of-band. This is necessary because
the SELECT query is not guaranteed to include the new paths following
the move.
"""
sync_deque: Deque = deque()
cursor = self.con.execute("SELECT id, path, mtime FROM Files WHERE is_dir = 1")
for dir in cursor:
dir = cursor.fetchone()
while dir:
id, path, old_mtime = dir
stat_info = self._stat(path)

# ignore directories that no longer exist
if stat_info is None:
continue
# ignores directories that no longer exist
if stat_info is not None:
new_mtime = stat_info.mtime
dir_dirty = new_mtime != old_mtime
if dir_dirty:
self._sync_dir(path, sync_deque)
self._update(id, stat_info)

new_mtime = stat_info.mtime
dir_dirty = new_mtime != old_mtime
if dir_dirty:
self._sync_dir(path)
self._update(id, stat_info)
dir = sync_deque.popleft() if sync_deque else cursor.fetchone()

def _sync_dir(self, dir_path):
def _sync_dir(self, dir_path, sync_deque):
"""
Syncs the contents of a directory. If a child directory is dirty because
it is either unindexed or indexed-but-moved, then the contents of that
child directory are synced. See _sync_all() for more on dirty
directories.
it is unindexed, then the contents of that child directory are synced.
See _sync_all() for more on dirty directories.
Parameters
----------
dir_path : string
Path of the directory to sync contents of.
sync_deque: deque
Deque of directory records to be checked for dirtiness in
_sync_all().
"""
with os.scandir(dir_path) as scan_iter:
for entry in scan_iter:
stat_info = self._stat(entry.path)
id, is_dirty_dir = self._sync_file(entry.path, stat_info)
id = self._sync_file(entry.path, stat_info, sync_deque)

# if entry is unindexed directory, create new record
# if entry is unindexed directory, create new record and sync
# contents recursively.
if stat_info.is_dir and id is None:
self._create(entry.path, stat_info)

# sync dirty dir contents if it is either unindexed or
# indexed-but-moved
if is_dirty_dir:
self._sync_dir(entry.path)
self._sync_dir(entry.path, sync_deque)

scan_iter.close()

def _sync_file(self, path, stat_info):
def _sync_file(self, path, stat_info, sync_deque=None):
"""
Syncs the file at `path` with the Files table by detecting whether the
file was previously indexed but moved. Updates the record with the new
path. This ensures that the file at path is associated with the correct
file ID. This method does nothing if the file at `path` was not
previously indexed.
Parameters
----------
path : string
Path of the file to sync.
stat_info : StatStruct
Stat info of the file to sync.
sync_deque : deque, optional
Deque of directory records to be checked for dirtiness in
_sync_all(). If specified, this method appends to sync_deque any
moved indexed directory and all of its children recursively.
Returns
-------
Returns a two-tuple containing the elements
id : int, optional
ID of the file if it was previously indexed. None otherwise.
dir_dirty: bool
Whether the file is a dirty directory and should be traversed by
_sync_dir(). Not necessarily true even if the `mtime` differs, since
directories which are dirty only because of `mtime` difference are
included in the query run by _sync_all(). See _sync_all() for more
on dirty directories.
ID of the file if it is a real file (not a symlink) and it was
previously indexed. None otherwise.
"""
# if file is symlink, do nothing
if stat_info.is_symlink:
return None

src = self.con.execute(
"SELECT id, path, crtime, mtime FROM Files WHERE ino = ?", (stat_info.ino,)
).fetchone()

# if no record with matching ino, then return None
if not src:
return None, stat_info.is_dir
return None

id, old_path, src_crtime, src_mtime = src
src_timestamp = src_crtime if src_crtime is not None else src_mtime
Expand All @@ -185,13 +207,20 @@ def _sync_file(self, path, stat_info):
# if record has identical ino and crtime/mtime to an existing record,
# update it with new destination path and stat info, returning its id
if src_timestamp == dst_timestamp:
self._update(id, stat_info, path)
return id, stat_info.is_dir and old_path != path
self._update_with_path(id, stat_info, path)

# update paths of indexed children under moved directories
if stat_info.is_dir and old_path != path:
self._move_recursive(old_path, path, sync_deque)
if sync_deque is not None:
sync_deque.appendleft((id, path, src_mtime))

return id

# otherwise delete the existing record with identical `ino`, since inos
# must be unique. then return None
self.con.execute("DELETE FROM Files WHERE id = ?", (id,))
return None, stat_info.is_dir
return None

def _normalize_path(self, path):
"""Normalizes a given file path."""
Expand All @@ -217,14 +246,15 @@ def _parse_raw_stat(self, raw_stat):
)
stat_info.mtime = raw_stat.st_mtime_ns
stat_info.is_dir = stat.S_ISDIR(raw_stat.st_mode)
stat_info.is_symlink = stat.S_ISLNK(raw_stat.st_mode)

return stat_info

def _stat(self, path):
"""Returns stat info on a path in a StatStruct object.Returns None if
file does not exist at path."""
try:
raw_stat = os.stat(path)
raw_stat = os.lstat(path)
except OSError:
return None

Expand All @@ -240,23 +270,24 @@ def _create(self, path, stat_info):

return cursor.lastrowid

def _update(self, id, stat_info, path=None):
"""Updates a record given its file ID, stat info, and possibly path."""
if path is not None:
self.con.execute(
"UPDATE Files SET path = ?, ino = ?, crtime = ?, mtime = ? WHERE id = ?",
(path, stat_info.ino, stat_info.crtime, stat_info.mtime, id),
)
else:
self.con.execute(
# updating `ino` and `crtime` is a conscious design decision because
# this method is called by `move()`. these values are only preserved
# by fs moves done via the `rename()` syscall, like `mv`. we don't
# care how the contents manager moves a file; it could be deleting
# and creating a new file (which will change the stat info).
"UPDATE Files SET ino = ?, crtime = ?, mtime = ? WHERE id = ?",
(stat_info.ino, stat_info.crtime, stat_info.mtime, id),
)
def _update_with_path(self, id, stat_info, path):
"""Same as _update(), but accepts and updates path."""
self.con.execute(
"UPDATE Files SET path = ?, ino = ?, crtime = ?, mtime = ? WHERE id = ?",
(path, stat_info.ino, stat_info.crtime, stat_info.mtime, id),
)

def _update(self, id, stat_info):
"""Updates a record given its file ID and stat info."""
# updating `ino` and `crtime` is a conscious design decision because
# this method is called by `move()`. these values are only preserved by
# fs moves done via the `rename()` syscall, like `mv`. we don't care how
# the contents manager moves a file; it could be deleting and creating a
# new file (which will change the stat info).
self.con.execute(
"UPDATE Files SET ino = ?, crtime = ?, mtime = ? WHERE id = ?",
(stat_info.ino, stat_info.crtime, stat_info.mtime, id),
)

def index(self, path, stat_info=None, commit=True):
"""Returns the file ID for the file at `path`, creating a new file ID if
Expand All @@ -266,8 +297,12 @@ def index(self, path, stat_info=None, commit=True):
if not stat_info:
return None

# if file is symlink, then index the path it refers to instead
if stat_info.is_symlink:
return self.index(os.path.realpath(path))

# sync file at path and return file ID if it exists
id, _ = self._sync_file(path, stat_info)
id = self._sync_file(path, stat_info)
if id is not None:
return id

Expand All @@ -287,7 +322,7 @@ def get_id(self, path):
return None

# then sync file at path and retrieve id, if any
id, _ = self._sync_file(path, stat_info)
id = self._sync_file(path, stat_info)
self.con.commit()
return id

Expand All @@ -306,6 +341,27 @@ def get_path(self, id):

return path

def _move_recursive(self, old_path, new_path, sync_deque=None):
"""Updates path of all indexed files prefixed with `old_path` and
replaces the prefix with `new_path`. If `sync_deque` is specified, moved
indexed directories are appended to `sync_deque`."""
old_path_glob = os.path.join(old_path, "*")
records = self.con.execute(
"SELECT id, path, mtime FROM Files WHERE path GLOB ?", (old_path_glob,)
).fetchall()

for record in records:
id, old_recpath, mtime = record
new_recpath = os.path.join(new_path, os.path.relpath(old_recpath, start=old_path))
stat_info = self._stat(new_recpath)
if not stat_info:
continue

self._update_with_path(id, stat_info, new_recpath)

if sync_deque is not None and stat_info.is_dir:
sync_deque.append((id, new_recpath, mtime))

def move(self, old_path, new_path, recursive=False):
"""Handles file moves by updating the file path of the associated file
ID. Returns the file ID. Returns None if file does not exist at new_path."""
Expand All @@ -320,22 +376,7 @@ def move(self, old_path, new_path, recursive=False):
self.log.debug(f"FileIdManager : Moving file from ${old_path} to ${new_path}")

if recursive:
old_path_glob = os.path.join(old_path, "*")
records = self.con.execute(
"SELECT id, path FROM Files WHERE path GLOB ?", (old_path_glob,)
).fetchall()
for record in records:
if not record:
continue
id, old_recpath = record
new_recpath = os.path.join(new_path, os.path.basename(old_recpath))
rec_stat_info = self._stat(new_recpath)
if not rec_stat_info:
continue
self.con.execute(
"UPDATE Files SET path = ?, mtime = ? WHERE id = ?",
(new_recpath, rec_stat_info.mtime, id),
)
self._move_recursive(old_path, new_path)

# attempt to fetch ID associated with old path
# we avoid using get_id() here since that will always return None as file no longer exists at old path
Expand All @@ -349,7 +390,7 @@ def move(self, old_path, new_path, recursive=False):
# update existing record with new path and stat info
# TODO: make sure is_dir for existing record matches that of file at new_path
id = row[0]
self._update(id, stat_info, new_path)
self._update_with_path(id, stat_info, new_path)
self.con.commit()
return id

Expand Down Expand Up @@ -387,8 +428,8 @@ def copy(self, from_path, to_path, recursive=False):
),
)

self.index(from_path, commit=False)
# transaction committed in index()
self.index(from_path)
return self.index(to_path)

def delete(self, path, recursive=False):
Expand Down
Loading

0 comments on commit fb8cc40

Please sign in to comment.