aboutsummaryrefslogtreecommitdiff
path: root/contrib/libmpq/bindings/python/mpq.py
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/libmpq/bindings/python/mpq.py')
-rw-r--r--contrib/libmpq/bindings/python/mpq.py322
1 files changed, 0 insertions, 322 deletions
diff --git a/contrib/libmpq/bindings/python/mpq.py b/contrib/libmpq/bindings/python/mpq.py
deleted file mode 100644
index cf6ecaae800..00000000000
--- a/contrib/libmpq/bindings/python/mpq.py
+++ /dev/null
@@ -1,322 +0,0 @@
-"""wrapper for libmpq"""
-
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-
-import ctypes
-import ctypes.util
-import os
-
-libmpq = ctypes.CDLL(ctypes.util.find_library("mpq"))
-
-class Error(Exception):
- pass
-
-errors = {
- -1: (IOError, "open"),
- -2: (IOError, "close"),
- -3: (IOError, "seek"),
- -4: (IOError, "read"),
- -5: (IOError, "write"),
- -6: (MemoryError,),
- -7: (Error, "file is not an mpq or is corrupted"),
- -8: (AssertionError, "not initialized"),
- -9: (AssertionError, "buffer size too small"),
- -10: (IndexError, "file not in archive"),
- -11: (AssertionError, "decrypt"),
- -12: (AssertionError, "unpack"),
-}
-
-def check_error(result, func, arguments, errors=errors):
- try:
- error = errors[result]
- except KeyError:
- return result
- else:
- raise error[0](*error[1:])
-
-libmpq.libmpq__version.restype = ctypes.c_char_p
-
-libmpq.libmpq__archive_open.errcheck = check_error
-libmpq.libmpq__archive_close.errcheck = check_error
-libmpq.libmpq__archive_packed_size.errcheck = check_error
-libmpq.libmpq__archive_unpacked_size.errcheck = check_error
-libmpq.libmpq__archive_offset.errcheck = check_error
-libmpq.libmpq__archive_version.errcheck = check_error
-libmpq.libmpq__archive_files.errcheck = check_error
-
-libmpq.libmpq__file_packed_size.errcheck = check_error
-libmpq.libmpq__file_unpacked_size.errcheck = check_error
-libmpq.libmpq__file_offset.errcheck = check_error
-libmpq.libmpq__file_blocks.errcheck = check_error
-libmpq.libmpq__file_encrypted.errcheck = check_error
-libmpq.libmpq__file_compressed.errcheck = check_error
-libmpq.libmpq__file_imploded.errcheck = check_error
-libmpq.libmpq__file_number.errcheck = check_error
-libmpq.libmpq__file_read.errcheck = check_error
-
-libmpq.libmpq__block_open_offset.errcheck = check_error
-libmpq.libmpq__block_close_offset.errcheck = check_error
-libmpq.libmpq__block_unpacked_size.errcheck = check_error
-libmpq.libmpq__block_read.errcheck = check_error
-
-__version__ = libmpq.libmpq__version()
-
-
-class Reader(object):
- def __init__(self, file, libmpq=libmpq):
- self._file = file
- self._pos = 0
- self._buf = []
- self._cur_block = 0
- libmpq.libmpq__block_open_offset(self._file._archive._mpq,
- self._file.number)
-
- def __iter__(self):
- return self
-
- def __repr__(self):
- return "iter(%r)" % self._file
-
- def seek(self, offset, whence=os.SEEK_SET, os=os):
- if whence == os.SEEK_SET:
- pass
- elif whence == os.SEEK_CUR:
- offset += self._pos
- elif whence == os.SEEK_END:
- offset += self._file.unpacked_size
- else:
- raise ValueError, "invalid whence"
-
- if offset >= self._pos:
- self.read(offset - self._pos)
- else:
- self._pos = 0
- self._buf = []
- self._cur_block = 0
- self.read(offset)
-
- def tell(self):
- return self._pos
-
- def _read_block(self, ctypes=ctypes, libmpq=libmpq):
- block_size = ctypes.c_uint64()
- libmpq.libmpq__block_unpacked_size(self._file._archive._mpq,
- self._file.number, self._cur_block, ctypes.byref(block_size))
- block_data = ctypes.create_string_buffer(block_size.value)
- libmpq.libmpq__block_read(self._file._archive._mpq,
- self._file.number, self._cur_block,
- block_data, ctypes.c_uint64(len(block_data)), None)
- self._buf.append(block_data.raw)
- self._cur_block += 1
-
- def read(self, size=-1):
- while size < 0 or sum(map(len, self._buf)) < size:
- if self._cur_block == self._file.blocks:
- break
- self._read_block()
- buf = "".join(self._buf)
- if size < 0:
- ret = buf
- self._buf = []
- else:
- ret = buf[:size]
- self._buf = [buf[size:]]
- self._pos += len(ret)
- return ret
-
- def readline(self, os=os):
- line = []
- while True:
- char = self.read(1)
- if char == "":
- break
- if char not in '\r\n' and line and line[-1] in '\r\n':
- self.seek(-1, os.SEEK_CUR)
- break
- line.append(char)
- return ''.join(line)
-
- def next(self):
- line = self.readline()
- if not line:
- raise StopIteration
- return line
-
- def readlines(self, sizehint=-1):
- res = []
- while sizehint < 0 or sum(map(len, res)) < sizehint:
- line = self.readline()
- if not line:
- break
- res.append(line)
- return res
-
- xreadlines = __iter__
-
- def __del__(self, libmpq=libmpq):
- libmpq.libmpq__block_close_offset(self._file._archive._mpq,
- self._file.number)
-
-
-class File(object):
- def __init__(self, archive, number, ctypes=ctypes, libmpq=libmpq):
- self._archive = archive
- self.number = number
-
- for name, atype in [
- ("packed_size", ctypes.c_uint64),
- ("unpacked_size", ctypes.c_uint64),
- ("offset", ctypes.c_uint64),
- ("blocks", ctypes.c_uint32),
- ("encrypted", ctypes.c_uint32),
- ("compressed", ctypes.c_uint32),
- ("imploded", ctypes.c_uint32),
- ]:
- data = atype()
- func = getattr(libmpq, "libmpq__file_"+name)
- func(self._archive._mpq, self.number, ctypes.byref(data))
- setattr(self, name, data.value)
-
- def __str__(self, ctypes=ctypes, libmpq=libmpq):
- data = ctypes.create_string_buffer(self.unpacked_size)
- libmpq.libmpq__file_read(self._archive._mpq, self.number,
- data, ctypes.c_uint64(len(data)), None)
- return data.raw
-
- def __repr__(self):
- return "%r[%i]" % (self._archive, self.number)
-
- def __iter__(self, Reader=Reader):
- return Reader(self)
-
-
-class Archive(object):
- def __init__(self, source, ctypes=ctypes, File=File, libmpq=libmpq):
- self._source = source
- if isinstance(source, File):
- assert not source.encrypted
- assert not source.compressed
- assert not source.imploded
- self.filename = source._archive.filename
- offset = source._archive.offset + source.offset
- else:
- self.filename = source
- offset = -1
-
- self._mpq = ctypes.c_void_p()
- libmpq.libmpq__archive_open(ctypes.byref(self._mpq), self.filename,
- ctypes.c_uint64(offset))
- self._opened = True
-
- for field_name, field_type in [
- ("packed_size", ctypes.c_uint64),
- ("unpacked_size", ctypes.c_uint64),
- ("offset", ctypes.c_uint64),
- ("version", ctypes.c_uint32),
- ("files", ctypes.c_uint32),
- ]:
- func = getattr(libmpq, "libmpq__archive_" + field_name)
- data = field_type()
- func(self._mpq, ctypes.byref(data))
- setattr(self, field_name, data.value)
-
- def __del__(self, libmpq=libmpq):
- if getattr(self, "_opened", False):
- libmpq.libmpq__archive_close(self._mpq)
-
- def __len__(self):
- return self.files
-
- def __contains__(self, item, ctypes=ctypes, libmpq=libmpq):
- if isinstance(item, str):
- data = ctypes.c_uint32()
- try:
- libmpq.libmpq__file_number(self._mpq, ctypes.c_char_p(item),
- ctypes.byref(data))
- except IndexError:
- return False
- return True
- return 0 <= item < self.files
-
- def __getitem__(self, item, ctypes=ctypes, File=File, libmpq=libmpq):
- if isinstance(item, str):
- data = ctypes.c_int()
- libmpq.libmpq__file_number(self._mpq, ctypes.c_char_p(item),
- ctypes.byref(data))
- item = data.value
- else:
- if not 0 <= item < self.files:
- raise IndexError, "file not in archive"
- return File(self, item)
-
- def __repr__(self):
- return "mpq.Archive(%r)" % self._source
-
-# Remove clutter - everything except Error and Archive.
-del os, check_error, ctypes, errors, File, libmpq, Reader
-
-if __name__ == "__main__":
- import sys, random
- archive = Archive(sys.argv[1])
- print repr(archive)
- for k, v in archive.__dict__.iteritems():
- #if k[0] == '_': continue
- print " " * (4 - 1), k, v
- assert '(listfile)' in archive
- assert 0 in archive
- assert len(archive) == archive.files
- files = [x.strip() for x in archive['(listfile)']]
- files.extend(xrange(archive.files))
- for key in files: #sys.argv[2:] if sys.argv[2:] else xrange(archive.files):
- file = archive[key]
- print
- print " " * (4 - 1), repr(file)
- for k, v in file.__dict__.iteritems():
- #if k[0] == '_': continue
- print " " * (8 - 1), k, v
-
- a = str(file)
-
- b = iter(file).read()
-
- reader = iter(file)
- c = []
- while True:
- l = random.randrange(1, 10)
- d = reader.read(l)
- if not d: break
- assert len(d) <= l
- c.append(d)
- c = "".join(c)
-
- d = []
- reader.seek(0)
- for line in reader:
- d.append(line)
- d = "".join(d)
-
- assert a == b == c == d, map(hash, [a,b,c,d])
- assert len(a) == file.unpacked_size
-
- repr(iter(file))
-
-
- reader.seek(0)
- a = reader.readlines()
-
- reader.seek(0)
- b = list(reader)
-
- assert a == b