| import os
|
| import io
|
| import gzip
|
| import zlib
|
| from typing import BinaryIO
|
| import itertools
|
| from dataclasses import dataclass
|
| from collections.abc import Callable
|
| from attodict.structclass import structclass, structfield
|
|
|
| GZIP_FLAG_FEXTRA = 1 << 2
|
| GZIP_FLAG_FNAME = 1 << 3
|
| GZIP_FLAG_FCOMMENT = 1 << 4
|
| GZIP_FLAG_CRC16 = 1 << 1
|
| GZIP_SI_RANDOM_ACCESS = tuple(b'RA')
|
|
|
|
|
| _builtin_open = open
|
|
|
|
|
| def open(filename, mode="rb", *, fallback=True):
|
| """Open a DictZip file. By default, it falls back to regular Gzip
|
| if the file is not valid DictZip."""
|
|
|
| if mode != 'rb':
|
| raise ValueError("DictZip can only be opened in rb mode")
|
|
|
| if isinstance(filename, (str, bytes, os.PathLike)):
|
| fileobj = _builtin_open(filename, 'rb')
|
| elif hasattr(filename, 'read'):
|
| filename = None
|
| fileobj = filename
|
| else:
|
| raise TypeError('filename must be a str or bytes object, or a file')
|
|
|
|
|
| random_access = ZipRandomAccess.from_file(fileobj)
|
| if random_access.valid():
|
| return DictZip(filename, mode, fileobj, random_access)
|
| elif not fallback:
|
| raise OSError("Not a valid DictZip file")
|
| else:
|
| return gzip.open(filename, mode)
|
|
|
|
|
| @structclass(byte_order='<')
|
| class GzipMeta:
|
| """The Gzip file metadata starting from the start of the file"""
|
|
|
| id1: int = structfield('B', literal=0x1f)
|
| id2: int = structfield('B', literal=0x8b)
|
| compression: int = structfield('B')
|
| flags: int = structfield('B')
|
| modified: int = structfield('I')
|
| extra_flags: int = structfield('B')
|
| os: int = structfield('B')
|
|
|
|
|
|
|
| @structclass(byte_order='<', auto_parse_variable=False)
|
| class GzipFieldExtraData:
|
|
|
| """Extra field data, immediately following GzipMeta if GZIP_FLAG_FEXTRA
|
| is present in flags"""
|
|
|
| length: int = structfield('H')
|
| data: bytes = structfield(bytesize='length')
|
|
|
|
|
| @structclass(byte_order='<',auto_parse_variable=False)
|
| class GzipExtraField:
|
|
|
| """The data of every invidual field"""
|
|
|
| si1: int = structfield('B')
|
| si2: int = structfield('B')
|
| length: int = structfield('H')
|
| data: bytes = structfield(bytesize='length')
|
|
|
|
|
| @structclass(byte_order='<')
|
| class RandomAccessMeta:
|
|
|
| """ The particular contents of the extra with si1/si2 of
|
| GZIP_SI_RANDOM_ACCESS, that defines random access chunk
|
| sizes."""
|
|
|
| version: int = structfield('H', literal=1)
|
| chunk_length: int = structfield('H')
|
| chunk_count: int = structfield('H')
|
| chunk_sizes: list = structfield('H', count='chunk_count')
|
|
|
|
|
| @dataclass
|
| class ZipRandomAccess:
|
|
|
| """Table for random access of gzip in DictZip format
|
| using the Random Access table."""
|
|
|
| data_start: int = 0
|
| version: int = 0
|
| chunk_length: int = -1
|
| chunk_count: int = 1
|
| chunk_sizes: tuple = (-1, )
|
| table: tuple = (0, -1)
|
| metadata: RandomAccessMeta = None
|
|
|
| def valid(self):
|
| return self.table[-1] >= 0
|
|
|
| def lookup(self, offset: int) -> tuple:
|
| """Lookup the given offset in the uncompressed file.
|
| Return the index of the chunk offset to seek in the compressed
|
| file, and then the offset to seek in the first uncompressed
|
| chunk."""
|
|
|
| chunk_num, chunk_off = divmod(offset, self.chunk_length)
|
| if chunk_num >= self.chunk_count:
|
| return self.chunk_count - 1, table[-2], self.chunk_length
|
|
|
| return chunk_num, self.table[chunk_num], chunk_off
|
|
|
| @classmethod
|
| def from_metadata(cls, data_start: int, meta: RandomAccessMeta):
|
| """Create from the parsed metadata."""
|
| return cls(data_start, meta.version,
|
| meta.chunk_length, meta.chunk_count,
|
| tuple(meta.chunk_sizes),
|
| (0, ) + tuple(itertools.accumulate(meta.chunk_sizes)),
|
| metadata=meta)
|
|
|
|
|
| @classmethod
|
| def from_file(cls, fileob: BinaryIO):
|
| """Parse the open file directly."""
|
| gzip_meta = GzipMeta.parse_file(fileob)
|
| if not (gzip_meta.flags & GZIP_FLAG_FEXTRA):
|
| return cls()
|
|
|
| extra_meta = GzipFieldExtraData.parse_file(fileob)
|
| remaining = extra_meta.length
|
|
|
| access_meta = None
|
| while remaining > 0:
|
| extra_field = GzipExtraField.parse_file(fileob)
|
| if (extra_field.si1, extra_field.si2) != GZIP_SI_RANDOM_ACCESS:
|
| extra_field.skip_varfields_file(fileob)
|
| remaining -= extra_field.consumed_size
|
| continue
|
|
|
| remaining -= extra_field.consumed_size
|
|
|
| access_meta = RandomAccessMeta.parse_file(fileob)
|
| if extra_field.length > access_meta.consumed_size:
|
| fileob.seek(access_meta.consumed_size - extra_field.length,
|
| os.SEEK_CUR)
|
| remaining -= extra_field.length
|
|
|
| if access_meta is None:
|
| return cls()
|
|
|
| if gzip_meta.flags & GZIP_FLAG_FNAME:
|
| seek_to_byte(fileob, 0)
|
|
|
| if gzip_meta.flags & GZIP_FLAG_FCOMMENT:
|
| seek_to_byte(fileob, 0)
|
| if gzip_meta.flags & GZIP_FLAG_CRC16:
|
| file.seek(2, os.SEEK_CUR)
|
|
|
| return cls.from_metadata(fileob.tell(), access_meta)
|
|
|
|
|
| def seek_to_byte(fileob: BinaryIO, byte: int=0,
|
| bufsize: int=64*1024):
|
| """Seek to the given byte"""
|
| offset = -1
|
| while offset < 0:
|
| pos = file.tell()
|
| buf = fileob.read(bufsize)
|
| offset = buf.find(byte)
|
| file.seek(pos + offset)
|
|
|
|
|
| class DictZip(io.BufferedIOBase):
|
|
|
| def __init__(self, path: str=None, mode: str='rb',
|
| fileobj: BinaryIO=None,
|
| random_access: ZipRandomAccess=None):
|
| if mode != 'rb':
|
| raise ValueError("DictZip can only be opened in rb mode")
|
|
|
| if fileobj is None:
|
| fileobj = _builtin_open(path, 'rb')
|
|
|
|
|
| if random_access is None:
|
| random_access = ZipRandomAccess.from_file(fileobj)
|
|
|
| if not random_access.valid():
|
| raise OSError("Can't use DictZip on this file, use regular GzipFile")
|
|
|
| self.name = path
|
| self.fileobj = fileobj
|
| self.random_access = random_access
|
| self.base_offset = random_access.data_start
|
|
|
| self.position = 0
|
| self.chunk_index = 0
|
| self.skip_head = 0
|
| self.leftover_tail = b''
|
|
|
| self.fileobj.seek(0)
|
| self.decompressor_base = zlib.decompressobj(31)
|
| self.decompressor_base.decompress(self.fileobj.read(self.base_offset))
|
| self.decompressor = None
|
| self.seek(0)
|
|
|
| def seekable(self):
|
| return True
|
|
|
| def tell(self):
|
| return self.position
|
|
|
| def seek(self, offset, whence=os.SEEK_SET):
|
| if whence == os.SEEK_CUR:
|
| offset += self.position
|
| elif whence == os.SEEK_END:
|
| offset = self.random_access.table[-1] + offset
|
| seek_index, seek_below, seek_chunk = self.random_access.lookup(offset)
|
| self.fileobj.seek(self.base_offset + seek_below)
|
| self.chunk_index = seek_index
|
| self.skip_head = seek_chunk
|
| self.leftover_tail = b''
|
| self.position = offset
|
| self.decompressor = self.decompressor_base.copy()
|
|
|
| def read1(self, size=-1):
|
| """This is actually buffered, but can't read arbitrary size."""
|
| if not self.leftover_tail:
|
| # NOTE: For decompression to work, we either need to *always* read
|
| # at deflate block boundaries, *or* copy the initial decompressor
|
| # only once to read at a deflate block boundary, then continue
|
| # at regular chunks with the same decompressor. Just to be sure,
|
| # this code uses both: The reads are at deflate block boundaries,
|
| # and the decompressor is copied during seek() *once*.
|
| #
|
| # Either can be changed, and this will still work.
|
| try:
|
| bufsize = self.random_access.chunk_sizes[self.chunk_index]
|
| except IndexError:
|
| bufsize = 65536
|
|
|
| buf = self.fileobj.read(bufsize)
|
| decompressed = self.decompressor.decompress(buf)
|
| if len(decompressed) < self.skip_head:
|
| raise OSError("Bad DictZip table.")
|
| self.leftover_tail = decompressed[self.skip_head:]
|
| self.chunk_index += 1
|
|
|
| self.skip_head = 0
|
|
|
| if size >= 0:
|
| result = self.leftover_tail[:size]
|
| self.leftover_tail = self.leftover_tail[size:]
|
| else:
|
| result = self.leftover_tail
|
| self.leftover_tail = b''
|
|
|
| self.position += len(result)
|
|
|
| return result
|
|
|
| def read(self, size=-1):
|
| result = []
|
| while size != 0:
|
| buf = self.read1(size)
|
| if size >= 0:
|
| size -= len(buf)
|
| assert size >= 0, "Oops, read1() returned more data"
|
| if not buf:
|
| break
|
| result.append(buf)
|
| return b''.join(result)
|
|
|
|
|
| if __name__ == '__main__':
|
| import random
|
|
|
| path = '/usr/share/dictd/freedict-deu-eng.dict.dz'
|
|
|
| big_number = os.stat(path).st_size
|
|
|
| f1 = gzip.open(path)
|
| f2 = open(path, fallback=False)
|
|
|
| print (f1)
|
| print (f2)
|
|
|
| for x in range(30):
|
| pos = random.randrange(big_number)
|
| f1.seek(pos)
|
| f2.seek(pos)
|
| if random.choice([True, False]):
|
| print (f1.read(30) == f2.read(30))
|
| print (f1.read(256 * 1024) == f2.read(256 * 1024))
|
| else:
|
| print (f1.read(256 * 1024) == f2.read(256 * 1024))
|
| print (f1.read(30) == f2.read(30))
|
| import os
|
| import struct
|
| from typing import BinaryIO
|
| import dataclasses
|
| from operator import attrgetter, itemgetter
|
| from dataclasses import dataclass
|
| from collections.abc import Callable
|
|
|
|
|
| def structfield(format=None, bytesize=None, count=None, literal=None, factory=None):
|
| """Define a struct field's format. If any of bytesize or count, the
|
| field is variable, and allows a variable"""
|
|
|
| if format is None and factory is not None:
|
| format = factory.struct_format
|
| kwargs = dict(metadata=locals())
|
|
|
| get_bytesize = bytesize
|
|
|
| if count is not None:
|
| field_size = struct.calcsize(format)
|
| if callable(count):
|
| get_bytesize = lambda self: count(self) * field_size
|
| elif isinstance(count, str):
|
| get_bytesize = lambda self: getattr(self, count) * field_size
|
| else:
|
| get_bytesize = lambda ignored: count * field_size
|
|
|
| if get_bytesize is not None:
|
| if not callable(get_bytesize):
|
| if isinstance(bytesize, str):
|
| get_bytesize = attrgetter(bytesize)
|
| else:
|
| get_bytesize = lambda ignored: bytesize
|
|
|
| if format is not None:
|
| kwargs['default_factory'] = list
|
| else:
|
| kwargs['default'] = None
|
|
|
| kwargs['metadata']['get_bytesize'] = get_bytesize
|
|
|
| return dataclasses.field(**kwargs)
|
|
|
|
|
| @dataclass
|
| class VariableField:
|
| name: str
|
| get_total_size: Callable
|
| format: str = None
|
| element_size: int = None
|
| factory: Callable = None
|
|
|
|
|
| def _parse_variable_field(vf: VariableField,
|
| instance: object, buf: str, bufsize: int):
|
|
|
| if len(buf) != bufsize:
|
| raise struct.error('unpack requires a buffer of {} bytes'.format(bufsize))
|
|
|
| instance.consumed_size += len(buf)
|
|
|
| factory = vf.factory
|
| if vf.format is not None:
|
| if factory is None:
|
| factory = itemgetter(0)
|
| getattr(instance, vf.name).extend(map(factory, struct.iter_unpack(vf.format, buf)))
|
| else:
|
| if factory is None:
|
| setattr(instance, vf.name, buf)
|
| else:
|
| setattr(instance, vf.name, factory)
|
|
|
|
|
| def structclass(cls=None, byte_order='', auto_parse_variable: bool=True):
|
|
|
| """Create a dataclass parsing struct defined by structfield() on place
|
| of dataclass.field().
|
|
|
| If auto_parse_variable=False is passed, the variable data can be manually
|
| parsed, or skipped. Recommended if raw.
|
|
|
| The consumed_size on the object is set to the amount of read data
|
| up until this point.
|
| """
|
|
|
| def decorator(cls):
|
| cls = dataclass(cls)
|
| struct_format = []
|
| struct_variable = []
|
|
|
| for f in dataclasses.fields(cls):
|
| fmt = f.metadata.get('format')
|
| if f.metadata['get_bytesize'] is not None:
|
| struct_variable.append(
|
| VariableField(f.name, f.metadata['get_bytesize'],
|
| fmt,
|
| struct.calcsize(fmt) if fmt is not None else None,
|
| f.metadata['factory']))
|
| continue
|
| elif struct_variable:
|
| raise TypeError("variable fields can't follow regular fields")
|
| else:
|
| struct_format.append(fmt)
|
|
|
| cls.struct_format = byte_order + ''.join(struct_format)
|
| cls.struct_size = struct.calcsize(cls.struct_format)
|
| cls.struct_variable = tuple(struct_variable)
|
|
|
| @classmethod
|
| def parse_file(cls, file: BinaryIO,
|
| auto_parse_variable: bool=auto_parse_variable):
|
| """Parse the given file object. You can explicitly say whether to
|
| automatically parse the variables, whether it is the default for this
|
| class or not."""
|
|
|
| buf = file.read(cls.struct_size)
|
| args = struct.unpack(cls.struct_format, buf)
|
|
|
| self = cls(*args)
|
| self.consumed_size = len(buf)
|
|
|
| if auto_parse_variable:
|
| self.parse_varfields_file(file)
|
| return self
|
|
|
| @classmethod
|
| def parse_bytes(cls, data: bytes,
|
| auto_parse_variable: bool=auto_parse_variable,
|
| partial: bool=False, offset: int=None):
|
| """Parse the given bytestring"""
|
|
|
| if cls.struct_variable or partial or offset is not None:
|
| args = struct.unpack_from(cls.struct_format, data,
|
| offset or 0)
|
| else:
|
| args = struct.unpack(cls.struct_format, data)
|
| self = cls(*args)
|
| self.consumed_size = len(data)
|
|
|
| if auto_parse_variable:
|
| self.parse_varfields_bytes(file)
|
| return self
|
|
|
| def variable_fields_size(self) -> int:
|
| """Return the size of the variable fields"""
|
| result = 0
|
| for vf in type(self).struct_variable:
|
| result += vf.get_total_size(self)
|
| return result
|
|
|
| def skip_varfields_file(self, file: BinaryIO):
|
| """Skip the variable part without parsing it in a file."""
|
| old_pos = file.tell()
|
| for vf in type(self).struct_variable:
|
| bufsize = vf.get_total_size(self)
|
| pos = file.seek(bufsize, os.SEEK_CUR)
|
| if pos - old_pos != bufsize:
|
| raise struct.error('unpack requires a buffer of {} bytes'.format(bufsize))
|
| self.consumed_size += pos - old_pos
|
| old_pos = pos
|
|
|
| def skip_varfields_bytes(self, data: bytes,
|
| partial: bool=False, offset: int=None):
|
| """Skip the variable part without parsing it from a buffer."""
|
| old_pos = file.tell()
|
| if offset is None:
|
| custom_offset = False
|
| offset = self.consumed_size
|
| else:
|
| custom_offset = True
|
|
|
| varfield_size = self.variable_fields_size()
|
|
|
| if partial or custom_offset:
|
| if len(buf) - offset < varfield_size:
|
| raise struct.error('unpack requires a buffer of {} bytes'
|
| .format(varfield_size))
|
| else:
|
| if len(buf) - offset != varfield_size:
|
| raise struct.error('unpack requires a buffer of {} bytes'
|
| .format(varfield_size))
|
|
|
| def parse_varfields_file(self, file: BinaryIO):
|
| """Parse the variable part manually from a file."""
|
| for vf in type(self).struct_variable:
|
| bufsize = vf.get_total_size(self)
|
| buf = file.read(bufsize)
|
|
|
| _parse_variable_field(vf, self, buf, bufsize)
|
|
|
| def parse_varfields_bytes(self, data: bytes,
|
| partial: bool=False, offset: int=None):
|
| """Parse the variable part manually from bytes."""
|
|
|
| if offset is None:
|
| custom_offset = False
|
| offset = self.consumed_size
|
| else:
|
| custom_offset = True
|
|
|
| variable_fields = type(self).struct_variable
|
| last_field = len(variable_fields) - 1
|
|
|
| for i, vf in enumerate(variable_fields):
|
| bufsize = vf.get_total_size(self)
|
|
|
| if i != last_field or partial or custom_offset:
|
| buf = buf[offset:offset + bufsize]
|
| else:
|
| buf = buf[offset:]
|
|
|
| _parse_variable_field(vf, self, buf, bufsize)
|
|
|
| cls.parse_file = parse_file
|
| cls.parse_bytes = parse_bytes
|
| cls.variable_fields_size = variable_fields_size
|
| cls.parse_varfields_file = parse_varfields_file
|
| cls.parse_varfields_bytes = parse_varfields_bytes
|
| cls.skip_varfields_file = skip_varfields_file
|
| cls.skip_varfields_bytes = skip_varfields_bytes
|
| return cls
|
|
|
| if cls is not None:
|
| return decorator(cls)
|
|
|
| return decorator
|