This paste expires on 2023-05-22 21:37:13.297242. Repaste, or download this paste. . Pasted through web.

import os
import io
import gzip
import zlib
from typing import BinaryIO
import itertools
from dataclasses import dataclass
from collections.abc import Callable
from attodict.structclass import structclass, structfield
GZIP_FLAG_FEXTRA = 1 << 2
GZIP_FLAG_FNAME = 1 << 3
GZIP_FLAG_FCOMMENT = 1 << 4
GZIP_FLAG_CRC16 = 1 << 1
GZIP_SI_RANDOM_ACCESS = tuple(b'RA')
_builtin_open = open
def open(filename, mode="rb", *, fallback=True):
    """Open a DictZip file. By default, it falls back to regular Gzip
    if the file is not valid DictZip."""
    if mode != 'rb':
        raise ValueError("DictZip can only be opened in rb mode")
    if isinstance(filename, (str, bytes, os.PathLike)):
        fileobj = _builtin_open(filename, 'rb')
    elif hasattr(filename, 'read'):
        filename = None
        fileobj = filename
    else:
        raise TypeError('filename must be a str or bytes object, or a file')
    random_access = ZipRandomAccess.from_file(fileobj)
    if random_access.valid():
        return DictZip(filename, mode, fileobj, random_access)
    elif not fallback:
        raise OSError("Not a valid DictZip file")
    else:
        return gzip.open(filename, mode)
@structclass(byte_order='<')
class GzipMeta:
    """The Gzip file metadata starting from the start of the file"""
    id1: int = structfield('B', literal=0x1f)
    id2: int = structfield('B', literal=0x8b)
    compression: int = structfield('B')
    flags: int = structfield('B')
    modified: int = structfield('I')
    extra_flags: int = structfield('B')
    os: int = structfield('B')
@structclass(byte_order='<', auto_parse_variable=False)
class GzipFieldExtraData:
    """Extra field data, immediately following GzipMeta if GZIP_FLAG_FEXTRA
    is present in flags"""
    length: int = structfield('H')
    data: bytes = structfield(bytesize='length')
@structclass(byte_order='<',auto_parse_variable=False)
class GzipExtraField:
    """The data of every invidual field"""
    si1: int = structfield('B')
    si2: int = structfield('B')
    length: int = structfield('H')
    data: bytes = structfield(bytesize='length')
@structclass(byte_order='<')
class RandomAccessMeta:
    """ The particular contents of the extra with si1/si2 of
    GZIP_SI_RANDOM_ACCESS, that defines random access chunk
    sizes."""
    version: int = structfield('H', literal=1)
    chunk_length: int = structfield('H')
    chunk_count: int = structfield('H')
    chunk_sizes: list = structfield('H', count='chunk_count')
@dataclass
class ZipRandomAccess:
    """Table for random access of gzip in DictZip format
    using the Random Access table."""
    data_start: int = 0
    version: int = 0
    chunk_length: int = -1
    chunk_count: int = 1
    chunk_sizes: tuple = (-1, )
    table: tuple = (0, -1)
    metadata: RandomAccessMeta = None
    def valid(self):
        return self.table[-1] >= 0
    def lookup(self, offset: int) -> tuple:
        """Lookup the given offset in the uncompressed file.
        Return the index of the chunk offset to seek in the compressed
        file, and then the offset to seek in the first uncompressed
        chunk."""
        chunk_num, chunk_off = divmod(offset, self.chunk_length)
        if chunk_num >= self.chunk_count:
            return self.chunk_count - 1, table[-2], self.chunk_length
        return chunk_num, self.table[chunk_num], chunk_off
    @classmethod
    def from_metadata(cls, data_start: int, meta: RandomAccessMeta):
        """Create from the parsed metadata."""
        return cls(data_start, meta.version,
                   meta.chunk_length, meta.chunk_count,
                   tuple(meta.chunk_sizes),
                   (0, ) + tuple(itertools.accumulate(meta.chunk_sizes)),
                   metadata=meta)
    @classmethod
    def from_file(cls, fileob: BinaryIO):
        """Parse the open file directly."""
        gzip_meta = GzipMeta.parse_file(fileob)
        if not (gzip_meta.flags & GZIP_FLAG_FEXTRA):
            return cls()
        extra_meta = GzipFieldExtraData.parse_file(fileob)
        remaining = extra_meta.length
        access_meta = None
        while remaining > 0:
            extra_field = GzipExtraField.parse_file(fileob)
            if (extra_field.si1, extra_field.si2) != GZIP_SI_RANDOM_ACCESS:
                extra_field.skip_varfields_file(fileob)
                remaining -= extra_field.consumed_size
                continue
            remaining -= extra_field.consumed_size
            access_meta = RandomAccessMeta.parse_file(fileob)
            if extra_field.length > access_meta.consumed_size:
                fileob.seek(access_meta.consumed_size - extra_field.length,
                            os.SEEK_CUR)
            remaining -= extra_field.length
        if access_meta is None:
            return cls()
        if gzip_meta.flags & GZIP_FLAG_FNAME:
            seek_to_byte(fileob, 0)
        if gzip_meta.flags & GZIP_FLAG_FCOMMENT:
            seek_to_byte(fileob, 0)
        if gzip_meta.flags & GZIP_FLAG_CRC16:
            file.seek(2, os.SEEK_CUR)
        return cls.from_metadata(fileob.tell(), access_meta)
def seek_to_byte(fileob: BinaryIO, byte: int=0,
                 bufsize: int=64*1024):
    """Seek to the given byte"""
    offset = -1
    while offset < 0:
        pos = file.tell()
        buf = fileob.read(bufsize)
        offset = buf.find(byte)
    file.seek(pos + offset)
class DictZip(io.BufferedIOBase):
    def __init__(self, path: str=None, mode: str='rb',
                       fileobj: BinaryIO=None,
                       random_access: ZipRandomAccess=None):
        if mode != 'rb':
            raise ValueError("DictZip can only be opened in rb mode")
        if fileobj is None:
            fileobj = _builtin_open(path, 'rb')
        if random_access is None:
            random_access = ZipRandomAccess.from_file(fileobj)
        if not random_access.valid():
            raise OSError("Can't use DictZip on this file, use regular GzipFile")
        self.name = path
        self.fileobj = fileobj
        self.random_access = random_access
        self.base_offset = random_access.data_start
        self.position = 0
        self.chunk_index = 0
        self.skip_head = 0
        self.leftover_tail = b''
        self.fileobj.seek(0)
        self.decompressor_base = zlib.decompressobj(31)
        self.decompressor_base.decompress(self.fileobj.read(self.base_offset))
        self.decompressor = None
        self.seek(0)
    def seekable(self):
        return True
    def tell(self):
        return self.position
    def seek(self, offset, whence=os.SEEK_SET):
        if whence == os.SEEK_CUR:
            offset += self.position
        elif whence == os.SEEK_END:
            offset = self.random_access.table[-1] + offset
        seek_index, seek_below, seek_chunk = self.random_access.lookup(offset)
        self.fileobj.seek(self.base_offset + seek_below)
        self.chunk_index = seek_index
        self.skip_head = seek_chunk
        self.leftover_tail = b''
        self.position = offset
        self.decompressor = self.decompressor_base.copy()
    def read1(self, size=-1):
        """This is actually buffered, but can't read arbitrary size."""
        if not self.leftover_tail:
            # NOTE: For decompression to work, we either need to *always* read
            # at deflate block boundaries, *or* copy the initial decompressor
            # only once to read at a deflate block boundary, then continue
            # at regular chunks with the same decompressor. Just to be sure,
            # this code uses both: The reads are at deflate block boundaries,
            # and the decompressor is copied during seek() *once*.
            #
            # Either can be changed, and this will still work.
            try:
                bufsize = self.random_access.chunk_sizes[self.chunk_index]
            except IndexError:
                bufsize = 65536
            buf = self.fileobj.read(bufsize)
            decompressed = self.decompressor.decompress(buf)
            if len(decompressed) < self.skip_head:
                raise OSError("Bad DictZip table.")
            self.leftover_tail = decompressed[self.skip_head:]
            self.chunk_index += 1
        self.skip_head = 0
        if size >= 0:
            result = self.leftover_tail[:size]
            self.leftover_tail = self.leftover_tail[size:]
        else:
            result = self.leftover_tail
            self.leftover_tail = b''
        self.position += len(result)
        return result
    def read(self, size=-1):
        result = []
        while size != 0:
            buf = self.read1(size)
            if size >= 0:
                size -= len(buf)
                assert size >= 0, "Oops, read1() returned more data"
            if not buf:
                break
            result.append(buf)
        return b''.join(result)
if __name__ == '__main__':
    import random
    path = '/usr/share/dictd/freedict-deu-eng.dict.dz'
    big_number = os.stat(path).st_size
    f1 = gzip.open(path)
    f2 = open(path, fallback=False)
    print (f1)
    print (f2)
    for x in range(30):
        pos = random.randrange(big_number)
        f1.seek(pos)
        f2.seek(pos)
        if random.choice([True, False]):
            print (f1.read(30) == f2.read(30))
            print (f1.read(256 * 1024) == f2.read(256 * 1024))
        else:
            print (f1.read(256 * 1024) == f2.read(256 * 1024))
            print (f1.read(30) == f2.read(30))
Filename: None. Size: 10kb. View raw, , hex, or download this file.
import os
import struct
from typing import BinaryIO
import dataclasses
from operator import attrgetter, itemgetter
from dataclasses import dataclass
from collections.abc import Callable
def structfield(format=None, bytesize=None, count=None, literal=None, factory=None):
    """Define a struct field's format. If any of bytesize or count, the
    field is variable, and allows a variable"""
    if format is None and factory is not None:
        format = factory.struct_format
    kwargs = dict(metadata=locals())
    get_bytesize = bytesize
    if count is not None:
        field_size = struct.calcsize(format)
        if callable(count):
            get_bytesize = lambda self: count(self) * field_size
        elif isinstance(count, str):
            get_bytesize = lambda self: getattr(self, count) * field_size
        else:
            get_bytesize = lambda ignored: count * field_size
    if get_bytesize is not None:
        if not callable(get_bytesize):
            if isinstance(bytesize, str):
                get_bytesize = attrgetter(bytesize)
            else:
                get_bytesize = lambda ignored: bytesize
        if format is not None:
            kwargs['default_factory'] = list
        else:
            kwargs['default'] = None
    kwargs['metadata']['get_bytesize'] = get_bytesize
    return dataclasses.field(**kwargs)
@dataclass
class VariableField:
    name: str
    get_total_size: Callable
    format: str = None
    element_size: int = None
    factory: Callable = None
def _parse_variable_field(vf: VariableField,
                          instance: object, buf: str, bufsize: int):
    if len(buf) != bufsize:
        raise struct.error('unpack requires a buffer of {} bytes'.format(bufsize))
    instance.consumed_size += len(buf)
    factory = vf.factory
    if vf.format is not None:
        if factory is None:
            factory = itemgetter(0)
        getattr(instance, vf.name).extend(map(factory, struct.iter_unpack(vf.format, buf)))
    else:
        if factory is None:
            setattr(instance, vf.name, buf)
        else:
            setattr(instance, vf.name, factory)
def structclass(cls=None, byte_order='', auto_parse_variable: bool=True):
    """Create a dataclass parsing struct defined by structfield() on place
    of dataclass.field().
    If auto_parse_variable=False is passed, the variable data can be manually
    parsed, or skipped. Recommended if raw.
    The consumed_size on the object is set to the amount of read data
    up until this point.
    """
    def decorator(cls):
        cls = dataclass(cls)
        struct_format = []
        struct_variable = []
        for f in dataclasses.fields(cls):
            fmt = f.metadata.get('format')
            if f.metadata['get_bytesize'] is not None:
                struct_variable.append(
                    VariableField(f.name, f.metadata['get_bytesize'],
                                  fmt,
                                  struct.calcsize(fmt) if fmt is not None else None,
                                  f.metadata['factory']))
                continue
            elif struct_variable:
                raise TypeError("variable fields can't follow regular fields")
            else:
                struct_format.append(fmt)
        cls.struct_format = byte_order + ''.join(struct_format)
        cls.struct_size = struct.calcsize(cls.struct_format)
        cls.struct_variable = tuple(struct_variable)
        @classmethod
        def parse_file(cls, file: BinaryIO,
                            auto_parse_variable: bool=auto_parse_variable):
            """Parse the given file object. You can explicitly say whether to
            automatically parse the variables, whether it is the default for this
            class or not."""
            buf = file.read(cls.struct_size)
            args = struct.unpack(cls.struct_format, buf)
            self = cls(*args)
            self.consumed_size = len(buf)
            if auto_parse_variable:
                self.parse_varfields_file(file)
            return self
        @classmethod
        def parse_bytes(cls, data: bytes,
                             auto_parse_variable: bool=auto_parse_variable,
                             partial: bool=False, offset: int=None):
            """Parse the given bytestring"""
            if cls.struct_variable or partial or offset is not None:
                args = struct.unpack_from(cls.struct_format, data,
                                          offset or 0)
            else:
                args = struct.unpack(cls.struct_format, data)
            self = cls(*args)
            self.consumed_size = len(data)
            if auto_parse_variable:
                self.parse_varfields_bytes(file)
            return self
        def variable_fields_size(self) -> int:
            """Return the size of the variable fields"""
            result = 0
            for vf in type(self).struct_variable:
                result += vf.get_total_size(self)
            return result
        def skip_varfields_file(self, file: BinaryIO):
            """Skip the variable part without parsing it in a file."""
            old_pos = file.tell()
            for vf in type(self).struct_variable:
                bufsize = vf.get_total_size(self)
                pos = file.seek(bufsize, os.SEEK_CUR)
                if pos - old_pos != bufsize:
                    raise struct.error('unpack requires a buffer of {} bytes'.format(bufsize))
                self.consumed_size += pos - old_pos
                old_pos = pos
        def skip_varfields_bytes(self, data: bytes,
                                 partial: bool=False, offset: int=None):
            """Skip the variable part without parsing it from a buffer."""
            old_pos = file.tell()
            if offset is None:
                custom_offset = False
                offset = self.consumed_size
            else:
                custom_offset = True
            varfield_size = self.variable_fields_size()
            if partial or custom_offset:
                if len(buf) - offset < varfield_size:
                    raise struct.error('unpack requires a buffer of {} bytes'
                                            .format(varfield_size))
            else:
                if len(buf) - offset != varfield_size:
                    raise struct.error('unpack requires a buffer of {} bytes'
                                            .format(varfield_size))
        def parse_varfields_file(self, file: BinaryIO):
            """Parse the variable part manually from a file."""
            for vf in type(self).struct_variable:
                bufsize = vf.get_total_size(self)
                buf = file.read(bufsize)
                _parse_variable_field(vf, self, buf, bufsize)
        def parse_varfields_bytes(self, data: bytes,
                                        partial: bool=False, offset: int=None):
            """Parse the variable part manually from bytes."""
            if offset is None:
                custom_offset = False
                offset = self.consumed_size
            else:
                custom_offset = True
            variable_fields = type(self).struct_variable
            last_field = len(variable_fields) - 1
            for i, vf in enumerate(variable_fields):
                bufsize = vf.get_total_size(self)
                if i != last_field or partial or custom_offset:
                    buf = buf[offset:offset + bufsize]
                else:
                    buf = buf[offset:]
                _parse_variable_field(vf, self, buf, bufsize)
        cls.parse_file = parse_file
        cls.parse_bytes = parse_bytes
        cls.variable_fields_size = variable_fields_size
        cls.parse_varfields_file = parse_varfields_file
        cls.parse_varfields_bytes = parse_varfields_bytes
        cls.skip_varfields_file = skip_varfields_file
        cls.skip_varfields_bytes = skip_varfields_bytes
        return cls
    if cls is not None:
        return decorator(cls)
    return decorator
Filename: None. Size: 8kb. View raw, , hex, or download this file.