-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhandler.py
More file actions
308 lines (260 loc) · 12.8 KB
/
Copy pathhandler.py
File metadata and controls
308 lines (260 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
"""
handler.py — Live event handler
Plugs into watchdog's Observer and responds to file system events in real time.
Responsibilities:
- Filter events by extension whitelist and ignore_prefixes
- Detect actual moves vs. phantom delete+create pairs
- Update the snapshot and log every event to the database
Utility functions (compute_hash, get_file_info, classify_path_change)
live in utils.py and are shared with diff.py to avoid duplication.
Performance optimisations:
- 64KB hash chunk size — faster I/O on modern drives
- Single background sweep thread instead of one thread per deleted file
- Batch snapshot upserts in move detection
"""
import os
import time
import threading
from watchdog.events import FileSystemEventHandler
from logger import get_logger
from utils import compute_hash, get_file_info, classify_path_change
log = get_logger(__name__)
class FileWatchHandler(FileSystemEventHandler):
"""
Subclass of watchdog's FileSystemEventHandler.
watchdog calls on_created / on_modified / on_deleted / on_moved
automatically whenever the OS fires a matching file system event.
Move detection uses a single background sweep thread rather than
spawning one thread per deletion. This prevents thread explosion
when many files are deleted in a batch.
"""
def __init__(self, db, config):
# Build a set of lowercase extensions for fast O(1) lookup
self.watch_extensions = set(
config["filters"]["watch_extensions"].replace(" ", "").split(",")
)
# List of filename prefixes to ignore (e.g. ~$, .~)
self.ignore_prefixes = (
config["filters"]["ignore_prefixes"].replace(" ", "").split(",")
)
self.hash_algorithm = config["snapshot"]["hash_algorithm"]
self.db = db
# Move detection: { (normcase_path, md5_hash): (original_path, deadline_timestamp) }
# Composite key prevents two distinct collision bugs:
# 1. Files with no snapshot entry all return hash=None — a hash-only
# key collapses every hashless deletion onto the same None key,
# so each overwrites the last.
# 2. Two files with identical content (e.g. both empty) share the
# same MD5 — a hash-only key causes the second deletion to
# silently clobber the first.
self.pending_deletes: dict[tuple, tuple] = {}
# pending_creates: { md5_hash: (new_path, deadline_timestamp) }
# Handles CREATE-before-DELETE move ordering on UNC network shares.
# ReadDirectoryChangesW over SMB can fire the CREATE event before the
# DELETE event for the same move operation — sometimes seconds apart.
# When on_created finds no matching pending delete, it parks the new
# file here. on_deleted then checks pending_creates for a hash match
# and resolves it as a MOVE instead of logging DELETED + CREATED.
self.pending_creates: dict[str, tuple] = {}
self.move_window: float = config["watcher"].getfloat("move_window", 2.0)
self.watch_directory: str = config["watcher"]["watch_directory"]
self._lock = threading.Lock()
# Start the single background sweep thread
self._sweep_thread_running = True
self._sweep_thread = threading.Thread(
target=self._sweep_loop, daemon=True, name="move-sweep"
)
self._sweep_thread.start()
log.info("Move-detection sweep thread started (window=%.1fs).",
self.move_window)
# ------------------------------------------------------------------
# FILTER
# ------------------------------------------------------------------
def _should_watch(self, path: str) -> bool:
"""
Returns True only if the file:
1. Does NOT start with any ignored prefix
2. Has an extension in our whitelist
"""
filename = os.path.basename(path)
ext = os.path.splitext(filename)[1].lower()
for prefix in self.ignore_prefixes:
if prefix and filename.startswith(prefix):
return False
return ext in self.watch_extensions
# ------------------------------------------------------------------
# SINGLE BACKGROUND SWEEP THREAD
# ------------------------------------------------------------------
def _sweep_loop(self):
"""
Runs continuously in a single daemon thread.
Every ~1 second, checks pending deletes whose window has expired
and logs them as genuine DELETEs.
"""
poll_interval = 1.0
while self._sweep_thread_running:
time.sleep(poll_interval)
try:
self._sweep_expired()
except Exception as e:
log.warning("Sweep thread error: %s", e, exc_info=True)
def _sweep_expired(self):
"""
Check each pending delete — if deadline passed, log as genuine DELETE.
Both the identification of expired entries and the pop/log must happen
inside a single lock acquisition. If they were split across two separate
with self._lock blocks, on_created could acquire the lock between them,
claim the entry as a MOVE and pop it, then the sweep would re-enter,
find pop() returning None, and still unconditionally log a phantom DELETE.
"""
now = time.time()
to_delete = []
with self._lock:
for key, (orig_path, deadline) in list(self.pending_deletes.items()):
if now >= deadline:
to_delete.append((key, orig_path))
for key, orig_path in to_delete:
self.pending_deletes.pop(key, None)
self.db.log_event("DELETED", orig_path)
self.db.delete_snapshot(orig_path)
# Sweep expired pending_creates — log them as genuine CREATEs
creates_to_flush = [
(h, path, deadline)
for h, (path, deadline) in self.pending_creates.items()
if now >= deadline
]
for file_hash, new_path, _ in creates_to_flush:
self.pending_creates.pop(file_hash, None)
self.db.log_event("CREATED", new_path)
def _find_pending_delete_key(
self, new_path: str, file_hash: str | None
) -> tuple | None:
"""
Search pending_deletes for an entry whose hash matches file_hash
and whose source path differs from new_path.
Must be called with self._lock already held.
Returns the matching key, or None if no match found.
"""
if file_hash is None:
return None
for key, (orig_path, _deadline) in self.pending_deletes.items():
_stored_path, stored_hash = key
if stored_hash == file_hash and orig_path != new_path:
return key
return None
# ------------------------------------------------------------------
# WATCHDOG CALLBACKS
# ------------------------------------------------------------------
def on_created(self, event):
"""
Fires when a new file appears.
Checks pending_deletes for a hash match before logging as CREATED —
a match means this is actually a MOVE from outside the watched dir.
"""
if event.is_directory or not self._should_watch(event.src_path):
return
path = event.src_path
# On UNC network shares, the CREATE event fires before the file
# is fully written over SMB. Retry hashing up to 5 times with a
# short delay to give the transfer time to complete.
file_hash = None
size, mtime = None, None
for attempt in range(5):
file_hash = compute_hash(path, self.hash_algorithm)
size, mtime = get_file_info(path)
if file_hash is not None:
break
log.debug("Hash not ready for %s (attempt %d/5), retrying...", path, attempt + 1)
time.sleep(0.5)
with self._lock:
match_key = self._find_pending_delete_key(path, file_hash)
if match_key is not None:
# DELETE already arrived — resolve as MOVE immediately
old_path, _ = self.pending_deletes.pop(match_key)
event_type = classify_path_change(old_path, path)
self.db.log_event(event_type, old_path, dest_path=path,
file_size=size, md5_hash=file_hash)
self.db.delete_snapshot(old_path)
elif file_hash is not None:
# DELETE hasn't arrived yet — park this CREATE and wait
deadline = time.time() + self.move_window
self.pending_creates[file_hash] = (path, deadline)
# Snapshot the new file so it's tracked regardless of outcome
if size is not None:
self.db.upsert_snapshot(path, size, mtime, file_hash)
return
else:
self.db.log_event("CREATED", path,
file_size=size, md5_hash=file_hash)
if size is not None:
self.db.upsert_snapshot(path, size, mtime, file_hash)
def on_modified(self, event):
"""
Fires when an existing file's contents or metadata change.
Only logs if the hash actually changed — skips metadata-only touches.
prev_hash captures the before state for before/after comparison.
NOTE: There is an inherent TOCTOU window between get_snapshot_hash()
and compute_hash(). A second write in that gap means prev_hash reflects
version N-1 while file_hash reflects N+1 — version N is never recorded.
This is unfixable without holding a file lock across both calls, which
would block writers. The OS also coalesces rapid writes into one event,
so intermediate versions can be missed regardless of implementation.
"""
if event.is_directory or not self._should_watch(event.src_path):
return
path = event.src_path
prev_hash = self.db.get_snapshot_hash(path)
file_hash = compute_hash(path, self.hash_algorithm)
size, mtime = get_file_info(path)
if file_hash == prev_hash:
return
self.db.log_event("MODIFIED", path, file_size=size,
md5_hash=file_hash, prev_hash=prev_hash)
if size is not None:
self.db.upsert_snapshot(path, size, mtime, file_hash)
def on_deleted(self, event):
"""
Fires when a file disappears.
Stores the file's hash in pending_deletes. The sweep thread handles
expiry and logs genuine DELETEs. on_created() claims hash matches
before expiry and logs them as MOVEs instead.
"""
if event.is_directory or not self._should_watch(event.src_path):
return
path = event.src_path
file_hash = self.db.get_snapshot_hash(path)
log.info("DEBUG on_deleted hash: %s for %s", file_hash, path)
# Key is (normcase_path, hash) — unique per source file even when
# hash is None or when multiple files share the same hash.
key = (os.path.normcase(path), file_hash)
deadline = time.time() + self.move_window
with self._lock:
# Check if CREATE already arrived for this hash
if file_hash and file_hash in self.pending_creates:
new_path, _ = self.pending_creates.pop(file_hash)
event_type = classify_path_change(path, new_path)
self.db.log_event(event_type, path, dest_path=new_path,
md5_hash=file_hash)
self.db.delete_snapshot(path)
else:
# CREATE hasn't arrived yet — park this DELETE and wait
key = (os.path.normcase(path), file_hash)
deadline = time.time() + self.move_window
self.pending_deletes[key] = (path, deadline)
def on_moved(self, event):
"""
Fires when both source and destination are inside the watched directory.
Clean case — watchdog sees both sides, no hash matching needed.
"""
if event.is_directory or not self._should_watch(event.src_path):
return
src = event.src_path
dest = event.dest_path
file_hash = compute_hash(dest, self.hash_algorithm)
size, mtime = get_file_info(dest)
event_type = classify_path_change(src, dest)
self.db.log_event(event_type, src, dest_path=dest,
file_size=size, md5_hash=file_hash)
self.db.delete_snapshot(src)
if size is not None:
self.db.upsert_snapshot(dest, size, mtime, file_hash)