From bcb50840b15a618a549d6236d91819dac6aa8272 Mon Sep 17 00:00:00 2001 From: Oleg Broytman Date: Sat, 23 Aug 2025 23:49:08 +0300 Subject: [PATCH] Feat: Check size/hashes of files listed in a torrent metafile --- check-file-hashes | 85 +++++++++++++++++++++++++++++++++++++++++++++++ check-file-sizes | 52 +++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100755 check-file-hashes create mode 100755 check-file-sizes diff --git a/check-file-hashes b/check-file-hashes new file mode 100755 index 0000000..df48f69 --- /dev/null +++ b/check-file-hashes @@ -0,0 +1,85 @@ +#! /usr/bin/env python3 + +from hashlib import sha1 +import os.path +import sys + +from eff_bdecode import eff_bdecode + + +try: + torrent_fname = sys.argv[1] + data_directory = sys.argv[2] +except IndexError: + sys.exit('Usage: %s torrent_file data_directory' % sys.argv[0]) + +torrent_file = open(torrent_fname, 'rb') +data = torrent_file.read() +torrent_file.close() + +data = eff_bdecode(data) + +info = data[b'info'] +name = info[b'name'] +files = info.get(b'files', None) + +block_size = info[b'piece length'] +block_hashes_bytes = info[b'pieces'] + +chunk_size = 20 # 20 bytes == 160 bits == size of sha1 digest +block_hashes = [ + block_hashes_bytes[i:i+chunk_size] + for i in range(0, block_size, chunk_size) +] + +block = b'' +last_hash = 0 +root_dir = os.path.abspath(data_directory).encode() + + +def check_block_hash(block_no: int) -> None: + global block, last_hash + digest = sha1(block).digest() + block_hash = block_hashes[last_hash] + last_hash += 1 + if digest == block_hash: + print(' block %d: Ok' % block_no) + block = b'' + else: + sys.exit(' block %d: Error, digest does not match' + % block_no) + + +def check_file_hash(path: bytes) -> None: + global block + full_path = os.path.join(root_dir, path) + if not os.path.exists(full_path): + sys.exit('Error: file does not exist: "%s"' % full_path.decode()) + if not os.path.isfile(full_path): + sys.exit('Error: name is not a regular file: "%s"' + % full_path.decode()) + block_no = 0 + print('File "%s"' % full_path.decode()) + with open(full_path, 'rb') as fp: + while True: + block += fp.read(block_size - len(block)) + if len(block) < block_size: + break # Read next file + assert len(block) == block_size, 'Internal error!' + check_block_hash(block_no) + block_no += 1 + assert 0 <= len(block) < block_size, 'Internal error!' + return block_no + + +if files is None: # `name` is the single file + block_no = check_file_hash(name) + if block: # The last unchecked block + check_block_hash(block_no + 1) +else: + root_dir = os.path.join(root_dir, name) + for file_info in info[b'files']: + path = b'/'.join(file_info[b'path']) + block_no = check_file_hash(path) + if block: # The last unchecked block + check_block_hash(block_no + 1) diff --git a/check-file-sizes b/check-file-sizes new file mode 100755 index 0000000..ab61e23 --- /dev/null +++ b/check-file-sizes @@ -0,0 +1,52 @@ +#! /usr/bin/env python3 + +import os.path +import sys + +from eff_bdecode import eff_bdecode + + +try: + torrent_fname = sys.argv[1] + data_directory = sys.argv[2] +except IndexError: + sys.exit('Usage: %s torrent_file data_directory' % sys.argv[0]) + +torrent_file = open(torrent_fname, 'rb') +data = torrent_file.read() +torrent_file.close() + +data = eff_bdecode(data) + +info = data[b'info'] +name = info[b'name'] +files = info.get(b'files', None) + +root_dir = os.path.abspath(data_directory).encode() + + +def check_file_size(path: bytes, size: int) -> None: + full_path = os.path.join(root_dir, path) + if not os.path.exists(full_path): + sys.exit('Error: file does not exist: "%s"' % full_path.decode()) + if not os.path.isfile(full_path): + sys.exit('Error: name is not a regular file: "%s"' + % full_path.decode()) + file_size = os.path.getsize(full_path) + if file_size == size: + print('File Ok: "%s", size %d' % (full_path.decode(), size)) + else: + print('Error: file size does not match: "%s", expected %d, got %d' + % (full_path.decode(), size, file_size), + file=sys.stderr) + + +if files is None: # `name` is the single file + size = info[b'length'] + check_file_size(name, size) +else: + root_dir = os.path.join(root_dir, name) + for file_info in info[b'files']: + size = file_info[b'length'] + path = b'/'.join(file_info[b'path']) + check_file_size(path, size) -- 2.39.5