]> git.phdru.name Git - bittorrent.git/commitdiff
Feat: Check size/hashes of files listed in a torrent metafile
authorOleg Broytman <phd@phdru.name>
Sat, 23 Aug 2025 20:49:08 +0000 (23:49 +0300)
committerOleg Broytman <phd@phdru.name>
Sun, 31 Aug 2025 20:49:39 +0000 (23:49 +0300)
check-file-hashes [new file with mode: 0755]
check-file-sizes [new file with mode: 0755]

diff --git a/check-file-hashes b/check-file-hashes
new file mode 100755 (executable)
index 0000000..df48f69
--- /dev/null
@@ -0,0 +1,85 @@
+#! /usr/bin/env python3
+
+from hashlib import sha1
+import os.path
+import sys
+
+from eff_bdecode import eff_bdecode
+
+
+try:
+    torrent_fname = sys.argv[1]
+    data_directory = sys.argv[2]
+except IndexError:
+    sys.exit('Usage: %s torrent_file data_directory' % sys.argv[0])
+
+torrent_file = open(torrent_fname, 'rb')
+data = torrent_file.read()
+torrent_file.close()
+
+data = eff_bdecode(data)
+
+info = data[b'info']
+name = info[b'name']
+files = info.get(b'files', None)
+
+block_size = info[b'piece length']
+block_hashes_bytes = info[b'pieces']
+
+chunk_size = 20  # 20 bytes == 160 bits == size of sha1 digest
+block_hashes = [
+    block_hashes_bytes[i:i+chunk_size]
+    for i in range(0, block_size, chunk_size)
+]
+
+block = b''
+last_hash = 0
+root_dir = os.path.abspath(data_directory).encode()
+
+
+def check_block_hash(block_no: int) -> None:
+    global block, last_hash
+    digest = sha1(block).digest()
+    block_hash = block_hashes[last_hash]
+    last_hash += 1
+    if digest == block_hash:
+        print('    block %d: Ok' % block_no)
+        block = b''
+    else:
+        sys.exit('    block %d: Error, digest does not match'
+                 % block_no)
+
+
+def check_file_hash(path: bytes) -> None:
+    global block
+    full_path = os.path.join(root_dir, path)
+    if not os.path.exists(full_path):
+        sys.exit('Error: file does not exist: "%s"' % full_path.decode())
+    if not os.path.isfile(full_path):
+        sys.exit('Error: name is not a regular file: "%s"'
+                 % full_path.decode())
+    block_no = 0
+    print('File "%s"' % full_path.decode())
+    with open(full_path, 'rb') as fp:
+        while True:
+            block += fp.read(block_size - len(block))
+            if len(block) < block_size:
+                break  # Read next file
+            assert len(block) == block_size, 'Internal error!'
+            check_block_hash(block_no)
+            block_no += 1
+    assert 0 <= len(block) < block_size, 'Internal error!'
+    return block_no
+
+
+if files is None:  # `name` is the single file
+    block_no = check_file_hash(name)
+    if block:  # The last unchecked block
+        check_block_hash(block_no + 1)
+else:
+    root_dir = os.path.join(root_dir, name)
+    for file_info in info[b'files']:
+        path = b'/'.join(file_info[b'path'])
+        block_no = check_file_hash(path)
+    if block:  # The last unchecked block
+        check_block_hash(block_no + 1)
diff --git a/check-file-sizes b/check-file-sizes
new file mode 100755 (executable)
index 0000000..ab61e23
--- /dev/null
@@ -0,0 +1,52 @@
+#! /usr/bin/env python3
+
+import os.path
+import sys
+
+from eff_bdecode import eff_bdecode
+
+
+try:
+    torrent_fname = sys.argv[1]
+    data_directory = sys.argv[2]
+except IndexError:
+    sys.exit('Usage: %s torrent_file data_directory' % sys.argv[0])
+
+torrent_file = open(torrent_fname, 'rb')
+data = torrent_file.read()
+torrent_file.close()
+
+data = eff_bdecode(data)
+
+info = data[b'info']
+name = info[b'name']
+files = info.get(b'files', None)
+
+root_dir = os.path.abspath(data_directory).encode()
+
+
+def check_file_size(path: bytes, size: int) -> None:
+    full_path = os.path.join(root_dir, path)
+    if not os.path.exists(full_path):
+        sys.exit('Error: file does not exist: "%s"' % full_path.decode())
+    if not os.path.isfile(full_path):
+        sys.exit('Error: name is not a regular file: "%s"'
+                 % full_path.decode())
+    file_size = os.path.getsize(full_path)
+    if file_size == size:
+        print('File Ok: "%s", size %d' % (full_path.decode(), size))
+    else:
+        print('Error: file size does not match: "%s", expected %d, got %d'
+              % (full_path.decode(), size, file_size),
+              file=sys.stderr)
+
+
+if files is None:  # `name` is the single file
+    size = info[b'length']
+    check_file_size(name, size)
+else:
+    root_dir = os.path.join(root_dir, name)
+    for file_info in info[b'files']:
+        size = file_info[b'length']
+        path = b'/'.join(file_info[b'path'])
+        check_file_size(path, size)