Multithreaded compression/decompression library with python bindings?

Thomas Nyberg tomuxiong at gmx.com
Thu Oct 5 04:38:33 EDT 2017


On 10/04/2017 05:08 PM, Steve D'Aprano wrote:
> pbip2? Never heard of it, and googling comes up with nothing relevant.
> 
> Got a link?

Sorry it was a typo as Paul Moore said. pbzip2 is a parellelized
implementation of bzip2:

	http://compression.ca/pbzip2/

> Why obviously?

Sorry again. I certainly shouldn't have said obviously. In fact, I may
be wrong. However, due to the fact that the compression algorithms are
almost entirely CPU-bound, I would be impressed if someone managed to
parallelize it within python itself due to the gil etc. But I don't want
to discount the possibility. In my experience many people throw their
hands up too early and decide something is impossible due to the gil
even when it's not. (As apparently I did...)

Basically here's the gist of the problem (at least as I understand it).
The way that bzip2 works is that it breaks data into blocks of a certain
size and then compresses those blocks individually. This effectively
makes the operation embarrassingly parallel on these blocks. The pbzip2
program takes advantage of this and does the compression and
decompression in parallel on these blocks leading to an almost linear
speedup in the number of cpus available. Both the inputs and outputs of
bzip2 and pbzip2 are compatible, though to gain the speedup the files
must be compressed with pbzip2. (I.e. for some reason pbzip2 cannot
decompress a file in parallel if it's been compressed with bzip2. There
is apparently some freedom in the object format. I'm not totally sure
why this is.)

I did some work on this yesterday so I'll put in the code I've gotten so
far which mostly works (I have an issue with waiting on a subprocess as
explained below). Also I'm running debian and using that heavily so I
presume nothing here works on Windows (sorry :( ).

In my case what I essentially wanted was just a very simple "compressed
archiver". I.e. a bzip2-compressed tar archive that I could fill up in
one pass and read from in one pass. I already had one using python's bz2
module, but I wanted a parallel version. The version I came up with is
the following (which is essentially what Stephen Houben was proposing):


pbz2.py
--------------------------------------------
import io
import tarfile
from subprocess import Popen, PIPE


class Archive:
    def __init__(self, filepath, mode="r"):
        assert mode in {"r", "w"}
        if mode == "r":
            self._p = Popen(["pbzip2", "-dc", filepath], stdout=PIPE)
            self._tar = tarfile.open(filepath, mode="r|",
fileobj=self._p.stdout)
            # Seems like an odd way to do this, but works for now.
            def tar_yielder():
                for tarinfo in self._tar:
                    file_name = tarinfo.name
                    file_bytes = self._tar.extractfile(tarinfo).read()
                    file_contents = file_bytes.decode('utf-8')
                    yield file_name, file_contents
            self._tar_yielder = tar_yielder()
        else:
            self._p = Popen(["pbzip2", "-zc"], stdin=PIPE,
stdout=open(filepath, "w"))
            self._tar = tarfile.open(filepath, mode="w|",
fileobj=self._p.stdin)
    def __enter__(self):
        return self
    def __exit__(self, *args):
        self.close()
    def __iter__(self):
        return self
    def __next__(self):
        return next(self._tar_yielder)
    def write(self, file_name, file_contents):
        file_contents = file_contents.encode('utf-8')
        # The tar archiver requires file objects.
        bytesstring = io.BytesIO(file_contents)
        info = tarfile.TarInfo(name=file_name)
        info.size=len(bytesstring.getbuffer())
        self._tar.addfile(tarinfo=info, fileobj=bytesstring)
    def close(self):
        self._tar.close()
--------------------------------------------

You could use it as follows:

--------------------------------------------
with Archive("archive.tar.bz2", mode="w") as outfile:
    outfile.write("file1", "file 1 contents")
    outfile.write("file2", "file 2 contents")
    outfile.write("file3", "file 3 contents")
--------------------------------------------

And then can iterate this way:

--------------------------------------------
with Archive("archive.tar.bz2") as infile:
    for file_name, file_contents in infile:
        print(file_name)
        print(file_contents)
--------------------------------------------

One problem though is that I have to run those two parts as two
_separate_ scripts. Otherwise I get errors from pbzip2. I'm pretty sure
it's a race condition. I.e. when the first archive finishes, it is not
waiting on the subprocess and so the second archive may try to iterate
before that part is finished. Does anyone here know the best way to wait
on the subprocess? Naively the right things seems to be to add a wait in
to the close method as follows:

--------------------------------------------
    def close(self):
        self._tar.close()
        self._p.wait()
--------------------------------------------

However if I do that, the (parent) process will simply hang. If I don't
have it everything seems to work fine as long as I let the parent
process itself finish. Does anyone here know the best way to wait on the
subprocess?

Thanks for any help!

Cheers,
Thomas



More information about the Python-list mailing list